<>前期准备
下面展示一些 内联代码片。
<>1.配置文件,导入jar包
server: port: 8983 spring: application: name: API-RABBITMQ datasource: type:
com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/yingxue?characterEncoding=UTF-8&useSSL=false
username: root password: 123 rabbitmq: host: localhost port: 5672 username:
guestpassword: guest virtual-host: / mybatis: #mapper配置文件的位置 mapper-locations:
classpath:com.hou.mapper/*.xml #别名 type-aliases-package: com.hou.entity #
自定义过期时间 order: delay: time: 12000 # 单位毫秒
<>2. 使用rabbitMQ插件绑定延时队列,插件可自动实现死信队列,无需配置
//** * mq 配置类 */ @Configuration public class RabbitMQConfig { //声明1个路由key 1个队列
1个交换机 //延迟交换机 public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
//延迟队列 public static final String DELAY_QUEUE_NAME = "delay.queue"; //延迟队列路由key
public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.routingkey";
// 声明延迟交换机,由插件处理代码 @Bean("delayExchange") public CustomExchange delayExchange()
{ HashMap<String, Object> args = new HashMap<>(); args.put("x-delayed-type",
"direct"); // 自定义的交换机 return new CustomExchange(DELAY_EXCHANGE_NAME,
"d-delayed-message", true, false, args); } // 声明延迟队列, @Bean("delayQueue") public
QueuedelayQueueA() { return new Queue(DELAY_QUEUE_NAME); } // 声明延迟队列B的绑定关系
@Beanpublic Binding delayBindingB(@Qualifier("delayQueue") Queue queue, @
Qualifier("delayExchange") CustomExchange exchange) { return BindingBuilder.bind
(queue) .to(exchange) .with(DELAY_QUEUE_ROUTING_KEY) .noargs(); } }
<>3. 生产者代码实现,
/** * 生产者代码 */ @Component public class DelayMessageProducer { @Resource private
RabbitTemplate rabbitTemplate; public void send(String message, Integer
delayTime) { // 三个参数 rabbitTemplate.convertAndSend(RabbitMQConfig.
DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_ROUTING_KEY, message1 -> {
message1.getMessageProperties().setDelay(delayTime); return message1; }); } }
<>4. service层代码, 创建订单时,同步发送消息到mq,指定超时时间
public class OrderServiceImpl implements OrderService { @Autowired private
OrderMapper orderMapper; @Autowired // 注入生产者 private DelayMessageProducer
producer; @Value("${order.delay.time}") private Integer orderDelayTime; /** *
保存新增订单 */ @Override @Transactional public Map<String, Object> save(Order order)
{ // 订单编号 order.setOrderSn(IdUtil.getSnowflake(1, 1).nextIdStr()); // 订单状态 order
.setOrderStatus(OrderStatus.no_confirm.getStatus()); // 支付状态 order.setPayStatus(
PayStatus.no_pay.getStatus()); // 下单时间 order.setOrderTime(new Date()); //
使用plus插入一条记录 int result = orderMapper.insert(order); HashMap<String, Object> map
= new HashMap<>(); // 插入成功 if (result > 0) { map.put("code", 200); map.put(
"message", "订单已提交"); map.put("data", order); // 发送消息到延迟队列, 设置消息过期时间 producer.
send(order.getOrderSn(), orderDelayTime); return map; } else { map.put("code",
400); map.put("message", "订单提交失败"); map.put("data", null); return map; } }
<>5.消费者代码实现,修改订单的状态, 保存未支付订单表中数据
/** * 消息消费者 */ @Slf4j @Component public class DeadLetterQueueConsumer {
@Resourceprivate OrderMapper orderMapper; @Resource private OrderActionMapper
orderActionMapper; // 监听延迟队列A @RabbitListener(queues = RabbitMQConfig.
DELAY_QUEUE_NAME) public void receiveA(Message message, Channel channel) {
String orderSn= new String(message.getBody()); log.info("当前时间: {}, 延迟队列收到订单编号:
{}", LocalDateTime.now(), orderSn); // 根据订单编号查询订单 QueryWrapper<Order> wrapper =
new QueryWrapper<>(); wrapper.ge("orderSn", orderSn); Order order = orderMapper.
selectOne(wrapper); if (order != null && order.getOrderStatus().equals(
OrderStatus.no_confirm.getStatus()) && order.getPayStatus().equals(PayStatus.
no_pay.getStatus())) { // 设置订单的状态为已取消 order.setOrderStatus(OrderStatus.cancel.
getStatus()); // 修改状态 int result = orderMapper.updateById(order); if (result > 0
) { // 订单取消表中插入数据 OrderAction orderAction = new OrderAction(); orderAction.
setOrderSn(orderSn); orderAction.setOrderStatus(OrderStatus.cancel.getStatus());
orderAction.setPayStatus(PayStatus.no_pay.getStatus()); orderAction.
setActionNote("支付超时,订单已取消"); orderAction.setActionTime(new Date()); orderAction.
setStatusDesc("支付超时,订单已取消"); //新增取消订单 表 orderActionMapper.insert(orderAction); }
} } }