开发者社区> 问答> 正文

RocketMq 时间点消费消息无效怎么办?:配置报错 

我的业务场景是,前10天一直有消息堆积,业务方没有消费,第11天业务方开始消费,但希望从当前时间开始消费,以前的消息不处理;
通过看官方提供的例子,找到如下测试代码,但测试时发现,重新定义一个新消费组,1分钟前的消息依然能消费,请高手指点。

 
 public static void main(String[] args) throws InterruptedException, MQClientException {    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer23");

consumer.setNamesrvAddr("10.7.13.83:9876;10.7.13.84:9876");                   consumer.subscribe("TopicTest1", "*");                   // 一个新的订阅组第一次启动从指定时间点开始消费          consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); // 设置时间点 consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(            System.currentTimeMillis() - (1000 * 60 * 1)));

consumer.registerMessageListener(new MessageListenerConcurrently() {          @Override          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,            ConsumeConcurrentlyContext context) {

MessageExt msg = msgs.get(<span style="color:#0000ff;">0); <span style="color:#808080;font-style:italic;"> <span style="color:#808080;font-style:italic;">                 

    System.out.println(new String(msg.getBody())); String keys = msg.getKeys(); System.out.println("keys="+keys);                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });                 /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */                 consumer.start();

System.out.println("Consumer Started."); }


展开
收起
kun坤 2020-06-01 00:12:22 891 0
1 条回答
写回答
取消 提交回答
  • 这是你业务逻辑做的吧,本来mq就是这样消费的。 ######

    consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(            System.currentTimeMillis() - (1000 * 60 * 1)));
    你设置的是从一分钟前开始消费啊。。。######回复 @云中飞雪 : 你新的组的group以及topic和现在一样吗?######是的,我期望是,当我用一个新的组去消费的时候,从当前时间开始消费,10天前的消息不消费

    2020-06-01 00:12:28
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
RocketMQ Client-GO 介绍 立即下载
RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载