开发者社区 > 云原生 > 云消息队列 > 正文

Apache RocketMQ中如果只是消息推送,消费,代码这一层是可以识别的,但是到了rocke?

Apache RocketMQ中如果只是消息推送,消费,代码这一层是可以识别的,但是到了rocketmq这块,就是状态改不过来,还有消息推送之后,消息重复的问题,image.png image.png 我rocketmq用controller集群部署,proxy换成CLUSTER模式后,消费的消息显示我消费者不在线是啥原因你知道嘛?import java.io.IOException; import java.util.Collections; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

public class PushConsumerExample02 { private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);

private PushConsumerExample02() {
}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
    final ClientServiceProvider provider = ClientServiceProvider.loadService();
    // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
    String endpoints = "192.168.21.170:28081;192.168.21.170:29081";
    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
            .setEndpoints(endpoints)
            .build();
    // 订阅消息的过滤规则,表示订阅所有Tag的消息。
    String tag = "messageTag";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    // 为消费者指定所属的消费者分组,Group需要提前创建。
    String consumerGroup = "test";
    // 指定需要订阅哪个目标Topic,Topic需要提前创建。
    String topic = "TestTopic";
    // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            // 设置消费者分组。
            .setConsumerGroup(consumerGroup)
            // 设置预绑定的订阅关系。
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
            // 设置消费监听器。
            .setMessageListener(messageView -> {
                // 处理消息并返回消费结果。
                logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
                return ConsumeResult.SUCCESS;
            })
            .build();
    Thread.sleep(Long.MAX_VALUE);
    // 如果不需要再使用 PushConsumer,可关闭该实例。
    // pushConsumer.close();
}

} image.png proxy服务换成LOCAL模式,消息状态就正常,proxy的CLUSTER模式,组这边识别不到,LOCAL模式这边能有个客户端信息image.png

展开
收起
真的很搞笑 2023-07-03 15:56:54 88 0
4 条回答
写回答
取消 提交回答
  • 确认消息消费状态:首先,确认消息消费状态是否正确更新。在消息成功消费后,消费者应该返回成功的确认,告知消息代理(Broker)消息已被消费。这样,消息代理才能将消息标记为已消费,并不再推送给消费者。确保消费者代码中包含正确的确认逻辑,以及错误情况下的处理和重试机制。

    检查消费者并发度:如果消费者的并发度设置过高,可能导致消息重复消费的问题。并发度设置过高意味着多个消费者同时处理相同的消息,可能会导致重复消费。确保消费者的并发度合理设置,并避免多个消费者同时处理相同的消息。

    检查消息去重机制:为了避免消息重复消费,可以在消费者端实现消息的去重机制。可以使用消息的唯一标识(如消息ID)进行记录和判断,确保相同的消息不会被重复消费。可以借助数据库、缓存或其他持久化存储来记录已消费的消息,以便进行去重操作。

    2023-07-08 10:13:01
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    如果只是消息推送,消费,代码这一层是可以识别的,但是到了rocketmq这块,就是状态改不过来,还有消息推送之后,消息重复的问题,这是因为在使用CLUSTER模式时,消费者会连接到多个Broker,每个Broker都会有一份消费状态,当多个消费者同时消费同一个主题的消息时,可能会出现消息重复的问题。在使用LOCAL模式时,消费者只会连接到一个Broker,每个消息只会被消费一次,不会出现消息重复的问题。如果要在CLUSTER模式下使用消息推送功能,可以考虑使用多个消费者连接到同一个Broker,并使用相同的消费者组和消费者标识符,这样可以保证消息的唯一性。另外,可以使用一些专门用于解决消息重复问题的工具,如Redis,它可以将消息的多个副本存储在Redis中,并根据消息的主题和时间来匹配消息的副本,从而保证消息的唯一性。

    2023-07-06 14:41:42
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    消息重复推送:如果消息在发送时出现异常,可能会导致消息重复推送。解决方法是在消息发送时,使用唯一的消息 ID 来标识消息,RocketMQ 可以识别重复的消息 ID,并且不会将其重复推送给消费者。

    2023-07-04 07:54:35
    赞同 展开评论 打赏
  • 在dashboard上看到的吗,此回答整理自钉群“群1-Apache RocketMQ 中国开发者钉钉群”

    2023-07-03 16:03:05
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 相关电子书

    更多
    Apache Flink技术进阶 立即下载
    Apache Spark: Cloud and On-Prem 立即下载
    Hybrid Cloud and Apache Spark 立即下载

    相关镜像