Spring Cloud Stream RabbitMQ死信队列学习笔记(三)

死信产生

  在mq里头死信的产生:

  1. 消费失败重试到最大次数
  2. 消息ttl(也就是生存时间达到最大值)

这些都会进入到RabbitMQ DLX Exchange交换器进行重新扔到对应的队列进行消费

如何处理消费过程的异常

  因为如果消费过程出现问题,mq会认为消费失败,重新消费。有时也可能你消费超时了是吧。所以有了去除重复消费的观点。 那么如何在消费消息时处理异常? 伪代码

@StreamListener(Processor.INPUT)

    public void input(Message<String> message) throws Exception {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(df.format(System.currentTimeMillis()) + " " + "一般监听收到:" + message.getPayload());
        try{
            a();
        }catch (Exception e){
            System.out.println("记录异常");
        }


    }
    @Transactional(rollbackFor = Exception.class)
    public void a(){
        throw new RuntimeException();
    }

死信处理

TTL就是配置消息的消费时间,如果超过则列为死信队列里头。 deadLetterExchange指定绑定的交换器,下面那个队列名其实可以不用配置还是会分配到交互器对应的队列里头进行消费。

show y the code

在消费时进行延迟消费

int i = 0;

    // 监听 binding 为 Processor.INPUT 的消息
    @StreamListener(Processor.INPUT)
    public void input(Message<String> message) throws Exception {
        if (i <= 1) {
            Thread.sleep(5000);
            i++;
        }

        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(df.format(System.currentTimeMillis()) + " " + "一般监听收到:" + message.getPayload());
    }

定义死信消费渠道

interface DeadProcessor {

    String INPUT_ORDER = "inputDead";
    String OUTPUT_ORDER = "outputDead";

    @Input(INPUT_ORDER)
    SubscribableChannel inputOrder();

    @Output(OUTPUT_ORDER)
    MessageChannel outputOrder();
}

相关配置

还有关于死信相关配置,在上面

spring:
  cloud:
    stream:
      defaultBinder:
      bindings:
        inputDead:
          destination: mqTestDead
        outputDead:
          destination: mqTestDead

定义死信队列消费的逻辑

@StreamListener(DeadProcessor.INPUT_ORDER)
    public void inputDead(Message<String> message) throws Exception {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(df.format(System.currentTimeMillis()) + " " + "一般监听收到死信:" + message.getPayload());
        //throw new RuntimeException();
    }

输出

2020-01-27 10:54:16 一般监听收到死信:大家好 2020-01-27 10:54:16 一般监听收到死信:大家好 2020-01-27 10:54:16 一般监听收到死信:大家好 2020-01-27 10:54:16 一般监听收到死信:大家好 2020-01-27 10:54:16 一般监听收到死信:大家好 2020-01-27 10:54:19 一般监听收到:大家好

RabbitMQ控制台

github

经验分享 程序员 微信小程序 职场和发展