开发者社区> 问答> 正文

RocketMq 时间点消费消息无效怎么办?

我的业务场景是,前10天一直有消息堆积,业务方没有消费,第11天业务方开始消费,但希望从当前时间开始消费,以前的消息不处理;

通过看官方提供的例子,找到如下测试代码,但测试时发现,重新定义一个新消费组,1分钟前的消息依然能消费,请高手指点。

 
 public static void main(String[] args) throws InterruptedException, MQClientException {    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer23");

consumer.setNamesrvAddr("10.7.13.83:9876;10.7.13.84:9876");                  
consumer.subscribe("TopicTest1", "*");                    // 一个新的订阅组第一次启动从指定时间点开始消费         
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
// 设置时间点
consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(
           System.currentTimeMillis() - (1000 * 60 * 1)));

consumer.registerMessageListener(new MessageListenerConcurrently() {         
@Override          
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
           ConsumeConcurrentlyContext context) {

    MessageExt msg = msgs.get(0);                   
    System.out.println(new String(msg.getBody()));
    String keys = msg.getKeys();
    System.out.println("keys="+keys);                 
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
});                
/**  * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>  */                  consumer.start();

System.out.println("Consumer Started.");
}

展开
收起
montos 2020-06-03 13:15:32 929 0
1 条回答
写回答
取消 提交回答
  • "

    这是你业务逻辑做的吧,本来mq就是这样消费的。

    ######
    consumer.setConsumeTimestamp(UtilAll.<span style=""font-style:italic;"">timeMillisToHumanString3(            System.<span style=""font-style:italic;"">currentTimeMillis() - (<span style=""color:#0000FF;"">1000 * <span style=""color:#0000FF;"">60 * <span style=""color:#0000FF;"">1)));
    你设置的是从一分钟前开始消费啊。。。######回复 <a href=""http://my.oschina.net/u/2481791"" class=""referer"" target=""_blank"">@云中飞雪 : 你新的组的group以及topic和现在一样吗?######是的,我期望是,当我用一个新的组去消费的时候,从当前时间开始消费,10天前的消息不消费"

    2020-06-03 13:48:28
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
基于RocketMQ Connect 构建全新数据流转处理平 立即下载
万亿级数据洪峰下的消息引擎——Apache RocketMQ 立即下载
万亿级数据洪峰下的消息引擎-Apache RocketMQ 立即下载