开发者社区 > 云原生 > 正文

关于有序消费模型中的时间延迟。

测试环境: MQ和测试程序都在本地 生产者: Message msg = new Message(topic, tag, keys, content); SendResult result = producer.send(msg, new MessageQueueSelector() { @OverRide public MessageQueue select(List mqs, Message msg, Object arg) { Integer id = Integer.parseInt(arg.toString()); int index = id % mqs.size(); return mqs.get(index); } }, serverId);

消费者: DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); // Specify name server addresses. consumer.setNamesrvAddr(MQFactory.LOCAL_NAMESRV_ADDR); // Subscribe one more more topics to consume. consumer.subscribe(topic, tag); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

  • 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    consumer.registerMessageListener(listener);
    //Launch the consumer instance.
    consumer.start();
    

listener里面的内容: @OverRide public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { MessageExt message = msgs.get(0); LogUtils.info("收到消息 -> " + message.getMsgId() + ",内容 -> " + new String(message.getBody()) + ",key -> " + message.getKeys()); return ConsumeOrderlyStatus.SUCCESS; }

问题就是,每次消费者第一次启动后,生产者发来的消息总是要很久才能收到,最长时间超过1分钟,最短也是30秒以上 ,有什么办法可以缩短这个时间吗?

2019-03-23 16:51:11.655 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEA10000, offsetMsgId=AC10014200002A9F0000000000660A7A, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=59] 2019-03-23 16:51:11.656 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEA70001, offsetMsgId=AC10014200002A9F0000000000660B2C, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=60] 2019-03-23 16:51:11.658 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEA80002, offsetMsgId=AC10014200002A9F0000000000660BDE, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=61] 2019-03-23 16:51:11.659 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAA0003, offsetMsgId=AC10014200002A9F0000000000660C90, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=62] 2019-03-23 16:51:11.660 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAB0004, offsetMsgId=AC10014200002A9F0000000000660D42, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=63] 2019-03-23 16:51:11.661 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAC0005, offsetMsgId=AC10014200002A9F0000000000660DF4, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=64] 2019-03-23 16:51:11.662 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAD0006, offsetMsgId=AC10014200002A9F0000000000660EA6, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=65] 2019-03-23 16:51:11.663 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAE0007, offsetMsgId=AC10014200002A9F0000000000660F58, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=66] 2019-03-23 16:51:11.664 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAF0008, offsetMsgId=AC10014200002A9F000000000066100A, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=67] 2019-03-23 16:51:11.665 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEB00009, offsetMsgId=AC10014200002A9F00000000006610BC, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=68] 2019-03-23 16:52:14.618 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEA10000,内容 -> i1-0,key -> i1-0 2019-03-23 16:52:14.618 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEA70001,内容 -> i1-1,key -> i1-1 2019-03-23 16:52:14.618 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEA80002,内容 -> i1-2,key -> i1-2 2019-03-23 16:52:14.618 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAA0003,内容 -> i1-3,key -> i1-3 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAB0004,内容 -> i1-4,key -> i1-4 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAC0005,内容 -> i1-5,key -> i1-5 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAD0006,内容 -> i1-6,key -> i1-6 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAE0007,内容 -> i1-7,key -> i1-7 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAF0008,内容 -> i1-8,key -> i1-8 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEB00009,内容 -> i1-9,key -> i1-9

原提问者GitHub用户bean326

展开
收起
芬奇福贵 2023-05-26 16:05:20 135 0
1 条回答
写回答
取消 提交回答
  • 遇到过类似的问题,建议你shutdown程序的时候,不要用kill -9,要执行consumer.shutdown()。这样才会注销掉对MessageQueue的分配,服务端默认MessageQueue占有时间是1分钟,所以你会遇到重启一段时间不能消费,其实是在重平衡。

    原回答者GitHub用户makabakaboom

    2023-05-26 18:04:19
    赞同 展开评论 打赏
问答地址:

阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载