快捷搜索: 王者荣耀 脱发

RocketMQ常见问题-如何保证消息传递的可靠性

作为一个消息中间件,RocketMQ的消息可靠性就是指确保消息数据不丢失。具体而言就是从消息在生产者产生,经过服务端投递,一定能被消费者消费。在rocketMQ中会返回消息发送状态码,rocketMQ还提供了生产者事务操作。 消息生产者Producer消息发送有三种方式:同步,异步,单向(Oneway) 1.同步发送 ,需要同时等待

SendResult sendResult = producer.send(message);

2.异步发送,异步线程发送出去消息,速度快//重点在SendCallback这里 异步发送回调,可靠性在于需要根据返回结果在回调里面处理业务。

producer.send(message, new SendCallback() {
           @Override
           public void onSuccess(SendResult sendResult) {
               System.out.printf(sendResult.getSendStatus()+"");
           }
 
           @Override
           public void onException(Throwable throwable) {
                    //根据业务处理
           }
       });

3.oneway 方式,只管发送,不在意是否成功,日志处理一般这样

producer.sendOneway(msg);

4.事务消息,通过实现TransactionMQProducer,并且编写本地事务监听器。

TransactionCheckListener transactionCheckListener

@Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
        if (null == this.transactionCheckListener) {
            throw new MQClientException("localTransactionBranchCheckListener is null", null);
        }

        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
    }

关键点在于: 1.重试时Message Key必须保证唯一,因为重试原因不确定我们无法保证消息是否已经发生过一次,Message Key唯一能最大程度保证业务的一致性 2.日志的保存,关键字段,请求的操作人,时间,重试次数,请求体,返回结果(可以和请求体分开保存日志,避免请求中断带来的不确定性。

Consumer保证消息可靠性

1.重试队列 Consumer端因为各种类型异常导致消费失败,为防止消息丢失而需要将其重新回发给Broker端消息队列保存,称之为重试队列。RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列,用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中 2,.死信队列 由于有些原因导致Consumer端长时间的无法正常消费从Broker端Pull过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中。在RocketMQ中,SubscriptionGroupConfig配置常量默认地设置了两个参数,一个是retryQueueNums为1(重试队列数量为1个),另外一个是retryMaxTimes为16。Broker端通过校验判断,如果超过了最大重试消费次数则会将消息移至这里所说的死信队列。这里,RocketMQ会为每个消费组都设置一个Topic命名为“%DLQ%+consumerGroup"的死信队列。一般在实际应用中,移入至死信队列的消息,需要人工干预处理。

Broker端保证消息可靠性

Broker端的消息可靠性保证更多的要从架构层次来说明。常见的架构策略:

双主双从架构,NameServer多节点,同步双写,异步刷盘,消息在内存中,突然断电消息丢失,同步刷盘可靠性更高,消息持续化到磁盘,同城双活,异地多活,跨国多活

经验分享 程序员 微信小程序 职场和发展