举一个栗子: 一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟是 18W 条,1000 多 W 条需要一个小时恢复。
步骤:
1. 先修复 consumer 的问题,确保其恢复消费速度,然后将现有的 consumer 都停掉
2. 新建一个topic,partition是原来的 10 倍,临时建立好原先 10 倍或者 20 倍的 queue 数量
3. 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue
4. 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据
5. 这种做法相当 于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常 10 倍速度
6. 等快速消费完积压数据之后,恢复原先部署架构 ,重新用原先的 consumer机器消费消息
原来 3 个消费者需要 1 个小时可以搞定,现在 30 个临时消费者需要 10 分钟就可以搞定。
如果用的 rabbitmq,并且设置了过期时间,如果此消费在 queue里积压超过一定的时间会被 rabbitmq清理掉,数据直接搞丢。这个时候开始写程序,将丢失的那批 数据查出来,然后重新灌入mq里面,把白天丢的数据补回来。
如果消息积压mq,长时间没被处理掉,导致mq快写完满了,你临时写一个程序,接入数据来消费,写到一个临时的mq里,再让其他消费者慢慢消费 或者消费一个丢弃一个,都不要了,快速消费掉所有的消息,然后晚上补数据。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/