背景介绍
公司最近年底要对系统做一次大的体检,所以是不测不知道,一测吓一跳啊,出现了很多问题,其中最恶心的问题要数我们的ROCKETMQ消息队列的问题了,大家都知道消息队列是作为流量削峰的主要手段,负责系统健壮性和压力的最佳手段,谁知道,它竟然“生病”了,干不动活了。
问题现象
系统频繁出现:system busy 和 broker busy 解决方案:
com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 8 复制代码
Rocketmq发送控制流程
针对前4种 broker busy ,主要是由于 Broker 在追加消息时持有的锁时间超过了设置的1s,Broker 为了自我保护,会抛出错误,客户端会选择其他 broker 服务器进行重试。
如果对不是金融级服务,建议将 transientStorePoolEnable = true,可以有效避免前面 4 种 broker ,因为开启这个参数,消息首先会存储在堆外内存中,并且 RocketMQ 提供了内存锁定的功能,其追加性能能得到一定的保障,这样可以做到在内存使用层面的读写分离,即写消息是直接写入堆外内存,消费消息直接从 pagecache中读,然后定时将堆外内存的消息写入 pagecache。
但这种方案随之带来的就是可能存在消息丢失,如果对消息非常严谨的话,建议扩容集群,或迁移topic到新的集群。
可以看出来,抛出这种错误,在 broker 还没有发送“严重”的 pagecache 繁忙,即消息追加到内存中的最大时延没有超过 1s,通常追加是很快的,绝大部分都会低于1ms,但可能会由于出现一个超过200ms的追加时间,导致排队中的任务等待时间超过了200ms,则此时会触发broker 端的快速失败,让请求快速失败,便于客户端快速重试。但是这种请求并不是实时的,而是每隔10s 检查一遍。
值得注意的是,一旦出现 TIMEOUT_CLEAN_QUEUE,可能在一个点会有多个这样的错误信息,具体多少与当前积压在待发送队列中的个数有关。
Rocketmq 发送时异常
system busy 和 broker busy 解决方案
- [REJECTREQUEST]system busy too many requests and system thread pool busy
- [PC_SYNCHRONIZED]broker busy
- [PCBUSY_CLEAN_QUEUE]broker busy
- [TIMEOUT_CLEAN_QUEUE]broker busy
之前写的解决方案,都是基于测试环境测试的.到生产环境之后,正常使用没有问题,生产环境压测时,又出现了system busy异常(简直崩溃)
com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 8 For more information, please visit the url, http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:455) at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:272) at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:253) at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:215) at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:671) at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:440) at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1030) at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:989) at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90) at at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 复制代码
报错定位
- cleanExpiredRequestInQueue会处理发送消息、拉取消息、心跳、事务消息队列中的数据,此次遇到的问题是发送Topic消息报出来的错误,所以接下来针对发送消息流程进行分析。
- 报出此错误的源码位置为broker快速失败机制BrokerFastFailure.java类(该类在Broker启动时会启动一个定时任务,每10毫秒执行一次),报错位置代码如下:
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) { while (true) { try { if (!blockingQueue.isEmpty()) { // 获取队列头元素 final Runnable runnable = blockingQueue.peek(); if (null == runnable) { break; } final RequestTask rt = castRunnable(runnable); if (rt == null || rt.isStopRun()) { break; } final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); // 如果头元素对应的任务处理时间超过设置的最大等待时间,则处理请求返回该错误,并移除掉该任务 if (behind >= maxWaitTimeMillsInQueue) { if (blockingQueue.remove(runnable)) { rt.setStopRun(true); rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size())); } } else { break; } } else { break; } } catch (Throwable ignored) { } } } 复制代码
这段代码是Broker快速失败机制的核心代码,如果一个等待队列的头元素(也就是第一个要处理或者正在处理的元素)等待时间超过该队列设置的最大等待时间,则丢弃该元素对象的任务,并对这个请求返回[TIMEOUT_CLEAN_QUEUE]broker busy异常信息。
发送Topic消息报该错误
sendThreadPoolQueue取出头元素,转换成对应的任务,判断任务在队列存活时间是否超过了队列设置的最大等待时间,如果超过了则组装处理返回对象response,response的code为RemotingSysResponseCode.SYSTEM_BUSY,内容为:
[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: [当前任务在队列存活时间], size of queue: [当前队列的长度] 复制代码
MQClientAPIImpl.processSendResponse处理返回response,根据response.getCode()的处理分支,最终返回MQBrokerException异常,response分支处理代码如下:
// 只有ResponseCode.SUCCESS的情况下返回结果,其他情况抛出MQBrokerException异常 private SendResult processSendResponse( final String brokerName, final Message msg, final RemotingCommand response ) throws MQBrokerException, RemotingCommandException { switch (response.getCode()) { case ResponseCode.FLUSH_DISK_TIMEOUT: case ResponseCode.FLUSH_SLAVE_TIMEOUT: case ResponseCode.SLAVE_NOT_AVAILABLE: { } case ResponseCode.SUCCESS: { // 省略部分代码 return sendResult; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); } 复制代码
消息发送客户端接收到MQBrokerException异常信息,捕获异常处理中不符合消息重试逻辑,直接抛出该异常,也就是用户看到的; // timesTotal为消息生产者设置的发送失败重试次数
for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { // 省略部分代码 } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { // 此处为MQBrokerException异常处理逻辑,RemotingSysResponseCode.SYSTEM_BUSY不符合分支条件,最终throw e抛出异常 endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } 复制代码
生产环境各种参数:
- broker busy异常: 可通过增大 waitTimeMillsInSendQueue 解决
- system busy异常:可通过增大 osPageCacheBusyTimeOutMills 解决
#发送队列等待时间 waitTimeMillsInSendQueue=3000 #系统页面缓存繁忙超时时间(翻译),默认值 1000 osPageCacheBusyTimeOutMills=5000 复制代码
出现问题分析
出现异常的原因是因为我们同一台服务器部署的多个应用造成的。我们一台服务器上部署了 三个ES、八个redis、一个rocketmq ,压力测试时这些都在使用,虽然cpu、内存都还有很大剩余,但是磁盘io和内存频率毕竟只有那么多可能已经占满,或者还有其他都会有影响。
之前测试环境测试其他东西时,发现mq和redis同时大量使用时,redis速度会降低三到四倍,由此可见应用分服务器部署的重要性。以前知道会有影响,没想到影响这么大。
最终结解决方案:应该给rocketmq单独部署性能较高的服务器.
记一次 rocketmq 使用时的异常。
问题分析总结
- system busy , start flow control for a while
该异常会造成 消息丢失。
- broker busy , start flow control for a while
该异常不会造成消息丢失。
问题解决过程
1、最开始时候 ,测试发现在性能好的服务器上只会出现system busy,也就是说出现异常就会消息丢失。
所以:业务代码进行处理,出现异常就会重发到当前topic的bak队列,当时想的是既然这个topic busy了,就换到另外的topic去发,总不能都 busy吧。也算是临时解决了。
2、发现有消息重复的现象。不用想肯定是报broker busy异常,重发到topic的 bak队列了。又因为broker busy可能不会造成消息丢失,所以消息重复就出现了。
解决方案:
修改rocketmq配置文件:
- 方案一:sendMessageThreadPoolNums 改成 1 ,没有的话新增一行。sendMessageThreadPoolNums=1
- 方案二:useReentrantLockWhenPutMessage改成true,没有的话新增一行。
sendMessageThreadPoolNums=32 useReentrantLockWhenPutMessage=true 复制代码
sendMessageThreadPoolNums这个属性是发送线程池大小, rocketmq4.1版本之后默认为 1,之前版本默认什么不知道但是肯定大于1。这个属性改成1的话,就不用管useReentrantLockWhenPutMessage这个属性了;
如果改成大于1,就需要将useReentrantLockWhenPutMessage这个属性设置为 true;
目前测试 未发现这两个方案有什么区别,sendMessageThreadPoolNums=1 时也支持多线程发送,发送速度感觉和 sendMessageThreadPoolNums大于1没有区别,都能跑满100M的网卡。
感觉如果useReentrantLockWhenPutMessage=true的时候,就是打开锁,然后关键代码其实还是单线程处理;
解决方案
- 业务逻辑处理中进行异常捕获,如果捕获到异常为MQBrokerException并且responseCode为2则重发消息;
- 修改broker的默认发送消息任务队列等待时长waitTimeMillsInSendQueue(单位: 毫秒);
除此之外,还可以观察报错时磁盘的IO情况,出现这种错误很有可能是当时的磁盘IO很高,导致消息落盘时间变长。
RocketMQ的参数指南
NameServer配置属性
#broker名字,注意此处不同的配置文件填写的不一样 brokerClusterName=rocketmqcluster brokerName=broker-a #0 表示 Master, >0 表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #这个配置可解决双网卡,发送消息走外网的问题,这里配上内网ip就可以了 brokerIP1=10.30.51.149 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=8 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=false #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 0点 deleteWhen=03 #文件保留时间,默认 48 小时 fileReservedTime=48 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=1000000 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/app/data/rocketmq/data #commitLog 存储路径 storePathCommitLog=/app/data/rocketmq/data/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/app/data/rocketmq/data/consumerqueue #消息索引存储路径 storePathIndex=/app/data/rocketmq/data/index #checkpoint 文件存储路径 storeCheckpoint=/app/data/rocketmq/data/checkpoint #abort 文件存储路径 abortFile=/app/data/rocketmq/data/abort #限制的消息大小 修改为16M maxMessageSize=16777216 #发送队列等待时间 waitTimeMillsInSendQueue=3000 osPageCacheBusyTimeOutMills=5000 flushCommitLogLeastPages=12 flushConsumeQueueLeastPages=6 flushCommitLogThoroughInterval=30000 flushConsumeQueueThoroughInterval=180000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 sendMessageThreadPoolNums=80 #拉消息线程池数量 pullMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=true 复制代码
官方资料
- Rocketmq官网:rocketmq.apache.org/
- Rocketmq的其它项目:github.com/apache/rock…
- Rocketmq-console安装:blog.csdn.net/zzzgd_666/a…
参考资料
- thinkinjava.cn/2019/05/08/…
- www.cnblogs.com/zhyg/p/1025…
- blog.csdn.net/prestigedin…
- thinkinjava.cn/2019/05/08/…