测试环境: MQ和测试程序都在本地 生产者: Message msg = new Message(topic, tag, keys, content); SendResult result = producer.send(msg, new MessageQueueSelector() { @OverRide public MessageQueue select(List mqs, Message msg, Object arg) { Integer id = Integer.parseInt(arg.toString()); int index = id % mqs.size(); return mqs.get(index); } }, serverId);
消费者: DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); // Specify name server addresses. consumer.setNamesrvAddr(MQFactory.LOCAL_NAMESRV_ADDR); // Subscribe one more more topics to consume. consumer.subscribe(topic, tag); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(listener);
//Launch the consumer instance.
consumer.start();
listener里面的内容: @OverRide public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { MessageExt message = msgs.get(0); LogUtils.info("收到消息 -> " + message.getMsgId() + ",内容 -> " + new String(message.getBody()) + ",key -> " + message.getKeys()); return ConsumeOrderlyStatus.SUCCESS; }
问题就是,每次消费者第一次启动后,生产者发来的消息总是要很久才能收到,最长时间超过1分钟,最短也是30秒以上 ,有什么办法可以缩短这个时间吗?
2019-03-23 16:51:11.655 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEA10000, offsetMsgId=AC10014200002A9F0000000000660A7A, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=59] 2019-03-23 16:51:11.656 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEA70001, offsetMsgId=AC10014200002A9F0000000000660B2C, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=60] 2019-03-23 16:51:11.658 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEA80002, offsetMsgId=AC10014200002A9F0000000000660BDE, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=61] 2019-03-23 16:51:11.659 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAA0003, offsetMsgId=AC10014200002A9F0000000000660C90, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=62] 2019-03-23 16:51:11.660 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAB0004, offsetMsgId=AC10014200002A9F0000000000660D42, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=63] 2019-03-23 16:51:11.661 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAC0005, offsetMsgId=AC10014200002A9F0000000000660DF4, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=64] 2019-03-23 16:51:11.662 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAD0006, offsetMsgId=AC10014200002A9F0000000000660EA6, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=65] 2019-03-23 16:51:11.663 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAE0007, offsetMsgId=AC10014200002A9F0000000000660F58, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=66] 2019-03-23 16:51:11.664 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEAF0008, offsetMsgId=AC10014200002A9F000000000066100A, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=67] 2019-03-23 16:51:11.665 [main] INFO java.lang.StackTraceElement - LocalMQProducer:84 [sendMsg] -> 发送结果 -> SendResult [sendStatus=SEND_OK, msgId=AC1001425B6018B4AAC274E9AEB00009, offsetMsgId=AC10014200002A9F00000000006610BC, messageQueue=MessageQueue [topic=TO_BATTLE, brokerName=BEAN, queueId=1], queueOffset=68] 2019-03-23 16:52:14.618 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEA10000,内容 -> i1-0,key -> i1-0 2019-03-23 16:52:14.618 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEA70001,内容 -> i1-1,key -> i1-1 2019-03-23 16:52:14.618 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEA80002,内容 -> i1-2,key -> i1-2 2019-03-23 16:52:14.618 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAA0003,内容 -> i1-3,key -> i1-3 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAB0004,内容 -> i1-4,key -> i1-4 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAC0005,内容 -> i1-5,key -> i1-5 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAD0006,内容 -> i1-6,key -> i1-6 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAE0007,内容 -> i1-7,key -> i1-7 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEAF0008,内容 -> i1-8,key -> i1-8 2019-03-23 16:52:14.619 [ConsumeMessageThread_1] INFO java.lang.StackTraceElement - TestMQ$1:62 [consumeMessage] -> 收到消息 -> AC1001425B6018B4AAC274E9AEB00009,内容 -> i1-9,key -> i1-9
原提问者GitHub用户bean326
遇到过类似的问题,建议你shutdown程序的时候,不要用kill -9,要执行consumer.shutdown()。这样才会注销掉对MessageQueue的分配,服务端默认MessageQueue占有时间是1分钟,所以你会遇到重启一段时间不能消费,其实是在重平衡。
原回答者GitHub用户makabakaboom
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。