Rocketmq整合Spring中的推消费和litepull消费

简介: RocketMQ整合Spring中推拉模式的相关配置介绍和遇到推拉配置出现的问题的解决

RocketMQ整合Spring的一个项目在apache中可以看到是rocketmq-spring。其提供了整合spring的方便使用方式。

一、rocketmq-spring中推拉模式的配置

下面我们来看看拉取和监听两种方式的消费模式的相关配置。

在Rocketmq整合Spring的过程中,我之前配置过这样的配置:

rocketmq:  name-server: 192.168.0.103:8080; 192.168.0.102:8080  tag: TAG_TEST
  consumer:    access-key: test_uat
    secret-key: 123456    tag: TAG_TEST
    topic: TEST_TOPIC
    group: GID_TEST_GROUP
    enable-msg-trace: true    access-channel: CLOUD
  pull-consumer:      group: GID_TEST_GROUP
    topic: TEST_TOPIC

这个配置其实是配置了两种模式的配置,可以在消息诊断可以看到会存在多个消费者线程的情况,虽然诊断的结果是没有问题的,但是消费会出现消费不到,产生消息丢失的情况。为什么这么说呢?


二、监听下的DefaultMQPushConsumer

从RocketMQ-Spring中,我们可以看到我们如果使用监听的方式进行消费的话,其实其会有一个配置是支持我们去做消费的,那就是RocketMQMessageListener这个类中的配置:org.apache.rocketmq.spring.annotation.RocketMQMessageListener

StringNAME_SERVER_PLACEHOLDER="${rocketmq.name-server:}";
StringACCESS_KEY_PLACEHOLDER="${rocketmq.push-consumer.access-key:}";
StringSECRET_KEY_PLACEHOLDER="${rocketmq.push-consumer.secret-key:}";
StringTRACE_TOPIC_PLACEHOLDER="${rocketmq.push-consumer.customized-trace-topic:}";
StringACCESS_CHANNEL_PLACEHOLDER="${rocketmq.access-channel:}";
StringconsumerGroup();
Stringtopic();
SelectorTypeselectorType() defaultSelectorType.TAG;
StringselectorExpression() default"*";
ConsumeModeconsumeMode() defaultConsumeMode.CONCURRENTLY;
MessageModelmessageModel() defaultMessageModel.CLUSTERING;
@DeprecatedintconsumeThreadMax() default64;
intconsumeThreadNumber() default20;
intmaxReconsumeTimes() default-1;
longconsumeTimeout() default15L;
intreplyTimeout() default3000;
StringaccessKey() defaultACCESS_KEY_PLACEHOLDER;
StringsecretKey() defaultSECRET_KEY_PLACEHOLDER;
booleanenableMsgTrace() defaultfalse;
StringcustomizedTraceTopic() defaultTRACE_TOPIC_PLACEHOLDER;
StringnameServer() defaultNAME_SERVER_PLACEHOLDER;
StringaccessChannel() defaultACCESS_CHANNEL_PLACEHOLDER;
StringtlsEnable() default"false";
Stringnamespace() default"";
/*** Message consume retry strategy in concurrently mode.** -1,no retry,put into DLQ directly* 0,broker control retry frequency* >0,client control retry frequency*/intdelayLevelWhenNextConsume() default0;
intsuspendCurrentQueueTimeMillis() default1000;
intawaitTerminationMillisWhenShutdown() default1000;
StringinstanceName() default"DEFAULT";

从这个配置中,我们可以看到其存在相关的配置:其中可以看到最大的消费线程数已经被废弃,取而代之的是consumeThreadNumber,默认是20个线程执行拉取消费操作。消费的模式采用并行消费:ConsumeMode.CONCURRENTLY。消息模式是采用的集群模式。同时根据占位符可以看到基本覆盖了我们配置的yml配置。

我们知道只有注册到Spring中的bean才能被Spring处理。因此我们可以看到:

@OverridepublicvoidafterSingletonsInstantiated() {
//拿到带RocketMQMessageListener的注解,同时将其进行注册Map<String, Object>beans=this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
            .entrySet().stream().filter(entry->!ScopedProxyUtils.isScopedTarget(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerContainer);
    }

注册到容器中:

privatevoidregisterContainer(StringbeanName, Objectbean) {
Class<?>clazz=AopProxyUtils.ultimateTargetClass(bean);
//拿到监听,获取消费者组、主题RocketMQMessageListenerannotation=clazz.getAnnotation(RocketMQMessageListener.class);
StringconsumerGroup=this.environment.resolvePlaceholders(annotation.consumerGroup());
Stringtopic=this.environment.resolvePlaceholders(annotation.topic());
//获取监听,是否开启了监听booleanlistenerEnabled=            (boolean) rocketMQProperties.getPushConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                .getOrDefault(topic, true);
//容器bean名称StringcontainerBeanName=String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContextgenericApplicationContext= (GenericApplicationContext) applicationContext;
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () ->createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainercontainer=genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
//启动容器container.start();
            } catch (Exceptione) {
log.error("Started container failed. {}", container, e);
thrownewRuntimeException(e);
            }
        }
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }

启动容器的本质是启动DefaultMQPushConsumer。

@Overridepublicvoidstart() throwsMQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
//启动默认推模式的消费方式this.defaultMQPushConsumerImpl.start();
if (null!=traceDispatcher) {
try {
//链路追踪traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientExceptione) {
log.warn("trace dispatcher start failed ", e);
            }
        }
    }


三、litepull方式的消费DefaultListPushConsumer

那我们再来看看拉取的方式,其默认的使用litePull的方式RocketMQAutoConfiguration:这个自动注入的条件是

@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
@ConditionalOnProperty(prefix="rocketmq", value= {"name-server", "pull-consumer.group", "pull-consumer.topic"})
publicDefaultLitePullConsumerdefaultLitePullConsumer(RocketMQPropertiesrocketMQProperties)
throwsMQClientException {
RocketMQProperties.PullConsumerconsumerConfig=rocketMQProperties.getPullConsumer();
StringnameServer=rocketMQProperties.getNameServer();
StringgroupName=consumerConfig.getGroup();
StringtopicName=consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null");
StringaccessChannel=rocketMQProperties.getAccessChannel();
MessageModelmessageModel=MessageModel.valueOf(consumerConfig.getMessageModel());
SelectorTypeselectorType=SelectorType.valueOf(consumerConfig.getSelectorType());
StringselectorExpression=consumerConfig.getSelectorExpression();
Stringak=consumerConfig.getAccessKey();
Stringsk=consumerConfig.getSecretKey();
intpullBatchSize=consumerConfig.getPullBatchSize();
booleanuseTLS=consumerConfig.isTlsEnable();
DefaultLitePullConsumerlitePullConsumer=RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
litePullConsumer.setInstanceName(consumerConfig.getInstanceName());
log.info(String.format("a pull consumer(%s sub %s) init on namesrv %s",  groupName, topicName,nameServer));
returnlitePullConsumer;
    }

从ISSUE中可以看到其是为了支持litepull消费消息的,[ISSUE #306] Support real LitePullMessage in RocketMQ-Spring (#307) 。因此,如果配置了rocketmq为前缀,同时采用pull-consumer的方式的话,此时就可以采用这种方式注入。同时其出现在ExtRocketMQConsumerConfiguration配置中。通常这种方式 适合于实现扩展类,比如下面这种:

@ExtRocketMQConsumerConfiguration(topic="${demo.rocketmq.topic}", group="string_consumer", tlsEnable="${demo.ext.consumer.tlsEnable}")
publicclassExtRocketMQTemplateextendsRocketMQTemplate {
}

因此,如果使用了pull,就不要使用push。否则就会产生冲突,在一个项目中。可以看到启动的是DefaultLitePullConsumer。

privatevoidregisterTemplate(StringbeanName, Objectbean) {
Class<?>clazz=AopProxyUtils.ultimateTargetClass(bean);
// 获取扩展配置消费者的配置ExtRocketMQConsumerConfigurationannotation=clazz.getAnnotation(ExtRocketMQConsumerConfiguration.class);
GenericApplicationContextgenericApplicationContext= (GenericApplicationContext) applicationContext;
DefaultLitePullConsumerconsumer=null;
try {
//创建消费者consumer=createConsumer(annotation);
//启动消费者DefaultLitePullConsumerconsumer.start();
        } catch (Exceptione) {
log.error("Failed to startup PullConsumer for RocketMQTemplate {}", beanName, e);
        }
RocketMQTemplaterocketMQTemplate= (RocketMQTemplate) bean;
rocketMQTemplate.setConsumer(consumer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
log.info("Set real consumer to :{} {}", beanName, annotation.value());
    }

也即此时验证我们看到诊断信息出现的两个消费者的情况。通过 实践可以知道两者目前不可以共存。否则会出现竞争的情况。

同时可以看到基于RocketMQ模板方式的生产者和消费者分别为:

privateDefaultMQProducerproducer;
privateDefaultLitePullConsumerconsumer;

也即模板中的生产者是基于默认MQ生产,而消费者则是采用默认的litepull消费。


四、两者有什么区别

litepull模式下只是通过poll方法批量拉取消息,缺乏重试机制,而push方式可以进行重试,并且失败后,会重试16次。那如果使用了litepull的方式,如果出现了失败,需要如何补偿呢?此时需要在业务系统中,获取offset,然后重置offset,进行重新消费,同时需要进行幂等处理。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
11天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
27 6
|
4月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
69 0
|
6月前
|
消息中间件 监控 Java
使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收
使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收
554 3
|
5月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
5月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
5月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
6月前
|
消息中间件 JavaScript Java
消息队列 MQ产品使用合集之如何嵌入到Spring Boot中运行
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java Spring
Spring Boot与RabbitMQ的集成应用
Spring Boot与RabbitMQ的集成应用
|
6月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
6月前
|
消息中间件 Java RocketMQ
教程:Spring Boot整合RocketMQ的配置与优化
教程:Spring Boot整合RocketMQ的配置与优化