Apache RocketMQ中如果只是消息推送,消费,代码这一层是可以识别的,但是到了rocketmq这块,就是状态改不过来,还有消息推送之后,消息重复的问题, 我rocketmq用controller集群部署,proxy换成CLUSTER模式后,消费的消息显示我消费者不在线是啥原因你知道嘛?import java.io.IOException; import java.util.Collections; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class PushConsumerExample02 { private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample02() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "192.168.21.170:28081;192.168.21.170:29081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "messageTag";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "test";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "TestTopic";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
} proxy服务换成LOCAL模式,消息状态就正常,proxy的CLUSTER模式,组这边识别不到,LOCAL模式这边能有个客户端信息
确认消息消费状态:首先,确认消息消费状态是否正确更新。在消息成功消费后,消费者应该返回成功的确认,告知消息代理(Broker)消息已被消费。这样,消息代理才能将消息标记为已消费,并不再推送给消费者。确保消费者代码中包含正确的确认逻辑,以及错误情况下的处理和重试机制。
检查消费者并发度:如果消费者的并发度设置过高,可能导致消息重复消费的问题。并发度设置过高意味着多个消费者同时处理相同的消息,可能会导致重复消费。确保消费者的并发度合理设置,并避免多个消费者同时处理相同的消息。
检查消息去重机制:为了避免消息重复消费,可以在消费者端实现消息的去重机制。可以使用消息的唯一标识(如消息ID)进行记录和判断,确保相同的消息不会被重复消费。可以借助数据库、缓存或其他持久化存储来记录已消费的消息,以便进行去重操作。
如果只是消息推送,消费,代码这一层是可以识别的,但是到了rocketmq这块,就是状态改不过来,还有消息推送之后,消息重复的问题,这是因为在使用CLUSTER模式时,消费者会连接到多个Broker,每个Broker都会有一份消费状态,当多个消费者同时消费同一个主题的消息时,可能会出现消息重复的问题。在使用LOCAL模式时,消费者只会连接到一个Broker,每个消息只会被消费一次,不会出现消息重复的问题。如果要在CLUSTER模式下使用消息推送功能,可以考虑使用多个消费者连接到同一个Broker,并使用相同的消费者组和消费者标识符,这样可以保证消息的唯一性。另外,可以使用一些专门用于解决消息重复问题的工具,如Redis,它可以将消息的多个副本存储在Redis中,并根据消息的主题和时间来匹配消息的副本,从而保证消息的唯一性。
消息重复推送:如果消息在发送时出现异常,可能会导致消息重复推送。解决方法是在消息发送时,使用唯一的消息 ID 来标识消息,RocketMQ 可以识别重复的消息 ID,并且不会将其重复推送给消费者。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/