消费者组和订阅
首先你应该关心的是不同的消费者群组可以独立的消费相同的主题,并且每个组都拥有自己的消费偏移量。
请确保相同消费者内的每个消费者订阅一样的主题。
消息监听器
串行
消费者将锁定每个消息队列以确保它被顺序消费。这会引起性能丢失,但当你关心消息顺序的实时这是很有用的。不推荐抛异常,你可以用返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 替代。
并行
正如名字所说,消息者将会并行消费消息。为了好的性能推荐使用。不推荐抛异常,你可以使用返回ConsumeConcurrentlyStatus.RECONSUME_LATER 代替。
消费状态
对于MessageListenerConcurrently,你可以返回 RECONSUME_LATER 来告诉消费者,你现在不能马上消费并且想在稍后重新消费。然后你可以继续消费其他消息。
对于MessageListenerOrderly,因为你关心顺序,你不能跳过消息,但你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 来告诉消费者等待一会。
阻塞
不建议阻塞监听器,因为它会阻塞线程池,最后使消费进程停止。
线程数量
消费者内部使用ThreadPoolExecutor来处理消费,因此你通过设置setConsumeThreadMin 或setConsumeThreadMax 可以改变它。
从哪开始消费
当创建一个新的消费者组,需要决定它是否需要消费在broker中已经存在的历史消息。
CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,消费所有的之后产生的消息。
CONSUME_FROM_FIRST_OFFSET 将会消费Broker中存在的每一个消息。
CONSUME_FROM_TIMESTAMP ,消费指定时间戳之后产生的消息。
重复
很多情况下都会引起重复,例如:
- Producer重发消息(FLUSH_SLAVE_TIMEOUT情况)
- Consumer 关闭,一些偏移量未及时更新到Broker
如果你的应用不能容忍重复的话,你可能需要做一些额外的工作来处理。例如,你可以检查DB的唯一主键。