【Alibaba中间件技术系列】「RocketMQ技术专题」分析消息队列中的消费失败重试机制的原理和实践

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Alibaba中间件技术系列】「RocketMQ技术专题」分析消息队列中的消费失败重试机制的原理和实践

RocketMQ消费失败重试机制分析


今天我们分析一下RocketMQ消费重试机制,如果执行以下的RocketMQ的消费服务的代码

try {
            try {
                if (messageExtWrappers.size() > 0) {
                    try {
                        var22 = messageExtWrappers.iterator();
                        while(var22.hasNext()) {
                            messageExt = (MessageExt)var22.next();
                        }
                    } catch (Throwable var16) {
                        ;
                    }
                    this.consume(messageExtWrappers, context);
                }
                LOGGER.info("MQ_CON_SUCCESS {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId});
                ConsumeConcurrentlyStatus var23 = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                return var23;
            } catch (MessageListenerConcurrentlyException var17) {
                LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var17});
                throw var17;
            } catch (Throwable var18) {
                LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var18});
                LOGGER.info("MQ_CON_RECONSUME {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId});
                if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) {
                    context.setDelayLevelWhenNextConsume(-1);
                }
            }
复制代码



当进行消费数据后得出的结果日志如下:


第一次消费

MQ_CON_MSG topic MSG MessageExt [queueId=1, storeSize=453, queueOffset=25, sysFlag=0, bornTimestamp=1566785215908, bornHost=/10.42.0.77:54608, storeTimestamp=1566785215908, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B77CE84, commitLogOffset=192401028, bodyCRC=53737244, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={MIN_OFFSET=0, _catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15071, HASH_CODE=690132963, MAX_OFFSET=26, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785215911, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15072, UNIQ_KEY=0A2A004D000938AF386882EAA5A40112, WAIT=true}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 48, 52, 54, 56, 57, 52, 48, 52, 48, 56, 48], transactionId='null’}]



第一次retry(reconsumeTimes=1,DELAY=3)


MQ_CON_MSG %RETRY%consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1187, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785226241, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B785900, commitLogOffset=192436480, bodyCRC=893293938, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785226242, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1188, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=3, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]



第二次retry(reconsumeTimes=2, DELAY=4)


MQ_CON_MSG %RETRY%consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1209, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785256680, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B791399, commitLogOffset=192484249, bodyCRC=893293938, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785256728, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1210, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=4, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]




第三次retry(reconsumeTimes=3, DELAY=5)


MQ_CON_MSG %RETRY%consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1228, sysFlag=0, bornTimestamp=1566785215923, b


RocketMQ的默认的配置是messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,分别代表延迟level1-level18,为什么不是从1开始呢?

if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) {
        context.setDelayLevelWhenNextConsume(-1);
}
复制代码


当重试达到满足条件的时候,不再重试,直接放到dlq队列里面。如果不控制的,会一直重试到最高DelayLevel 18。


从DefaultMQPullConsumerImpl类里面找到一段代码

public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            String brokerAddr = null != brokerName ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
            if (UtilAll.isBlank(consumerGroup)) {
                consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
            }
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000L, this.defaultMQPullConsumer.getMaxReconsumeTimes());
        } catch (Exception var8) {
            this.log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), var8);
            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody());
            String originMsgId = MessageAccessor.getOriginMessageId(msg);
            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
            newMsg.setFlag(msg.getFlag());
            MessageAccessor.setProperties(newMsg, msg.getProperties());
            MessageAccessor.putProperty(newMsg, "RETRY_TOPIC", msg.getTopic());
            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes()));
            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
        }
    }
复制代码

从代码中看到DelayTimeLevel =3+reconsumeTime

newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());



所以默认重试时,实际是从3开始的,从时间的角度,也验证为什么会重试4次,而且每次间隔的时间是10s/30s/1m .


  • 此外发送时,延时等级设置的是9,那么5m之后消费者才能从broker取到该消息消费。消费失败或者异常时,消费者返回给broker,ConsumeConcurrentlyStatus.RECONSUME_LATER,所以broker会重发该消息,由于消费者设置了再次消费的延时级别为5,所以1m之后,消费者才能从broker取出该重发消息。由于consumer没有设置重试次数,所以,broker默认重发该消息16次,然后放入删除队列。


  • DefaultRocketMQListenerContainer实现了MessageListenerConcurrently方法,它会循环调用rocketMQListener.onMessage,出现异常会设置delayLevelWhenNextConsume,然后立即返回ConsumeConcurrentlyStatus.RECONSUME_LATER




相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
23天前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
37 3
|
23天前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
86 2
|
23天前
|
消息中间件 Cloud Native Serverless
RabbitMQ 与云原生技术的融合
【8月更文第28天】随着微服务架构和容器化的普及,云原生技术已成为构建现代应用的标准方式。云原生应用程序利用了诸如容器化、微服务、声明式API等技术,以提高可伸缩性、可靠性和可维护性。消息队列作为服务间通信的关键组件,在云原生环境中扮演着重要角色。本文将探讨如何将RabbitMQ与云原生技术(如Service Mesh和Serverless平台)相结合,并通过具体的代码示例来展示其集成方法。
27 2
|
2月前
|
消息中间件 存储 监控
|
24天前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
42 1
|
23天前
|
存储 中间件 PHP
Python编程入门:从零到一的代码实践深入理解 PHP 中的中间件模式
【8月更文挑战第28天】本文旨在通过浅显易懂的方式,向初学者介绍Python编程的基础知识,并结合具体代码示例,带领读者一步步实现从零基础到能够独立编写简单程序的转变。文章将围绕Python语言的核心概念进行讲解,并通过实例展示如何应用这些概念解决实际问题。无论你是编程新手还是希望扩展技能的专业人士,这篇文章都将为你打开编程世界的大门。 【8月更文挑战第28天】在PHP的世界中,设计模式是构建可维护和可扩展软件的重要工具。本文将通过浅显易懂的语言和生动的比喻,带领读者深入理解中间件模式如何在PHP应用中发挥魔力,实现请求处理的高效管理。我们将一步步揭开中间件的神秘面纱,从它的定义、工作原理到
|
1月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
116 0
RocketMQ—一次连接namesvr失败的案例分析
|
2月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
2月前
|
消息中间件 存储 负载均衡
中间件消息队列与发布/订阅模型
【7月更文挑战第15天】
75 6
|
2月前
|
JSON 中间件 数据处理
实践出真知:通过项目学习Python Web框架的路由与中间件设计
【7月更文挑战第19天】探索Python Web开发,掌握Flask或Django的关键在于理解路由和中间件。路由连接URL与功能,如Flask中@app.route()定义请求响应路径。中间件在请求处理前后执行,提供扩展功能,如日志、认证。通过实践项目,不仅学习理论,还能提升构建高效Web应用的能力。示例代码展示路由定义及模拟中间件行为,强调动手实践的重要性。
42 1

相关产品

  • 云消息队列 MQ