Springboot使用rabbitmq的延时队列
使用rabbitmq的延时消息队列处理定时业务的场景,比如下单后必须在5分钟内完成支付,否则5分钟后不让支付订单。具体实现如下:
1 发送延时消息
/** * 发送延迟消息 */ public void sendMessage(Order order) { if (order != null) { String msg = JSON.toJSONString(order); // rabbitTemplate.convertAndSend(QueueEnum.MESSAGE_STOCK_QUEUE.getExchange(), QueueEnum.MESSAGE_STOCK_QUEUE.getRouteKey(), msg); // 执行发送消息到指定队列 CorrelationData correlationData = new CorrelationData(order.getOrderUuid()); rabbitTemplate.convertAndSend(QueueEnum.MESSAGE_TTL_QUEUE.getExchange(), QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(), msg, message -> { // 设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(cancel * 1000 * 60)); return message; }, correlationData); } else { log.warn("消息内容为空!!!!!"); } }
2 延时消息绑定
3 监听延时消息
@RabbitListener(queues = QueueContent.MESSAGE_QUEUE_NAME) @RabbitHandler public void processFixedOrderNoPay(String msg, Channel channel, Message message) throws IOException { log.info("processFixedOrderNoPay:{} ", message); // String orderId = null; try { // Order order = JSON.parseObject(msg, Order.class); // orderId = order.getOrderUuid(); // orderService.fixedOrderNoPayHandle(order); } catch (Exception e) { log.error("确认消费异常", e); //记录下这条消息 // if (StringUtil.isEmpty(orderId)) // redisService.set("orderNoPay:" + orderId, msg, 5); } finally { // 通过finally块来保证Ack/Nack会且只会执行一次 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
4 完毕