背景
消息队列在大型分布式应用中非常常见,目前还停留在能熟练使用基础功能的阶段,对其高级功能以及背后的原理了解甚少,比如事务消息、有序消息等。
学习之前需要先带着几个问题,为什么会诞生消息队列?消息队列的原理是什么?使用消息队列需要注意什么?
假设我们自己开发了一套“极其简陋”的电商系统,包含浏览商品、下单、付款三个功能,系统涉及的服务有商品服务、订单服务、扣款服务。
我们来举例一下用户常用的操作流程:
下单:
1.用户看到自己喜欢的商品,点击提交订单,请求到订单服务,系统生成订单记录,核心数据为:状态“待支付”、商品名称、商品数量、过期时间等
2.生成订单后请求到商品服务,对应商品数量-1(并非真的-1,毕竟用户没有付款)
支付:
1.用户确定没问题,发起支付请求,请求到订单服务,状态改为“正在支付”
2.请求到扣款服务,系统通过支付系统对用户银行卡发卡扣款
3.扣款成功,回调订单服务更新订单状态为“支付成功”
4.支付成功后商品服务将商品数量真正-1
在了解了常见场景后我们来看当前系统存在什么问题?这里的例子只是假设,并非真实电商场景,只看部分问题即可。
- 容错率低。一次请求涉及多个系统,一个系统故障整个操作失败
- RT高实效性低。一次请求涉及多个系统,总RT = 各个系统RT之和
- 吞吐量低。服务器连接资源有限,每个同步调用都要占用一个连接,根据木桶效应系统能力取决于最低的系统的表现,大促等活动场景下流量是正常时候的几倍甚至几十倍,这种架构根本无法支持
抽象一下问题就是,同步调用、耦合严重、流量洪峰。这也就是消息队列要解决的三个问题。
- 异步,阶段性处理请求,处理完立刻返回
- 解耦,系统间不再强耦合,没有了木桶效应,能跑多快全凭系统自身
- 削峰,通过堆积系统处理不了的请求来达到消除流量高峰问题
通过消息队列改造后,整体架构如下图:
下单:
- 用户提交订单,消息分发器产生“下单”消息
- 商品服务、订单服务分别订阅该主题,自行消费处理
支付:
- 用户发起支付请求,消息分发器产生“支付”消息
- 订单服务、扣款服务分别订阅,自行处理消息
- 扣款服务处理成功,产生“支付成功”消息
- 商品服务、订单服务分别订阅,自行处理消息
可以看到通过消息队列改造后,各个系统间不再同步调用强耦合,系统不再直接对接用户请求,而是消费消息队列里堆积的请求。
消息队列也是一把双刃剑,感觉通过消息队列改造后整个系统的性能得到了质的提升,低延迟、高并发,看似很完美,但是也存在许多问题。
- 系统的可用性所有降低,这样的架构必须要保证消息队列不出问题
- 系统复杂性提高,引入消息队列后整个系统的链路变得复杂,系统理解、运维难度都有所提升
- 一致性问题,异步处理的场景下订单系统处理成功就成功了,无法保证商品服务不出问题,这要就会出现“多卖”,“少卖”的数据不一致问题
接下来我们一起看看消息队列是如何处理和解决这些问题的。
RocketMQ
简介
阿里内部使用的是 Metaq,外部又称之为 RocketMQ,两者可以说是等价的,RocketMQ github 地址:https://github.com/apache/rocketmq
RocketMQ 是一个队列模型的消息中间件,具备高性能、高可靠、高实时、分布式特点。Producer 轮流向一些队列发送消息,队列集合称为 Topic,Consumer 根据消费模式做集群/广播消息。
- Producer、Consumer、队列都可以是分布式
- 可以保证严格的消息顺序
- 亿级消息堆积能力
架构图
Name Server
用于服务发现,专为 RocketMQ 设计的轻量级名称服务。
Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker
消息中转角色,负责存储消息,转发消息,一般也称为server。
Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。
Producer
消息生产者,负责生产消息,一般由业务系统负责产生消息。
Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
Consumer
消息消费者,负责消费消息,一般是由后台应用负责消费。
Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
消费模式
广播消息
consumer group中的每一个consumer都会消费一次消息,广播消费中的consumer group概念可以认为在消息划分上没有意义。
集群消息
一个consumer group中的consumer平均分摊消费消息。假设topic中有6条消息,consumer group有两个consumer实例(可能是两个进程或者两台机器),那么每个consumer只消费3条消息。
消息类型
普通消息
消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。
定时和延时消息
定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不 可避免的产生巨大性能开销。RocketMQ 支持定时消息,但是不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。
顺序消息
消费消息的顺序要同发送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送的顺序去消费消息。
普通顺序消息
顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方 式比较合适。
严格顺序消息
顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。 如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不 可用。(依赖同步双写,主备自动切换,自动切换功能目前还未实现),目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。
事务消息
RocketMQ 采用二阶段提交实现事务消息,保证本地事务与消息发送的原子性,达到事务最终一致性状态。
功能模块
消息优先级
支持根据消息优先级高低进行投递。
由于 RocketMQ 所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此 RocketMQ 没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级的队列, 将不同优先级发送到不同队列即可。
消息过滤
Broker 端消息过滤
在 Broker 中按照 Consumer 的要求过滤,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担,实现相对复杂。
RocketMQ 支持按照简单的 Message tag 进行过滤,也支持按照 Message Header, Message Body 过滤。
Demo:
// Don't forget to set enablePropertyFilter=true in broker
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
Consumer 端过滤
这种过滤方式完全由 Consumer 端自己控制,缺点是很多无用的消息需要传送到 Consumer 端。
消息可靠性保证
影响消息可靠性的几种情况:
- Broker 正常关闭
- Broker 异常 Crash
- OS 死机
- 机器掉电,但是能立即恢复供电情况。
- 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
- 磁盘设备损坏。
前四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
5 和 6 属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点, 同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。
RocketMQ 从 3.0 版本开始支持同步双写。
消息堆积
消息持久化
RocketMQ 的所有消息都是持久化的,先写入系统 PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据, 访问时,直接从内存读取。
Linux 异步刷盘
了解消息持久化前需要先了解 Linux 是如何实现异步刷盘的。因为硬盘和内存的读写性能差距巨大,Linux 默认情况是以异步方式读写文件的,比如调用系统函数 open() 打开或创建文件时缺省情况下是带有 O_ASYNC flag 的。Linux 借助于内核的 page cache 来实现这种异步操作。
异步刷盘
broker 将 producer 发送的消息写入到 Linux page cache 后直接返回,并不等待文件内容写入到硬盘才返回。
- 由于服务器硬盘读写速度一般高于网卡速度(千兆网卡),所以刷盘速度跟得上消息写入速度,硬盘写操作不会是性能瓶颈。
- 万一此时系统压力过大,可能堆积消息,除了写入 IO,还有读取 IO,万一出现磁盘读取落后情况,会不会导致系统内存溢出?答案是否定的。
- 写入消息到 page cache 时,如果内存不足,则尝试丢弃干净的 page,腾出内存供新消息使用,策略是 LRU
- 如果干净页不足,此时写入 page cache 会被阻塞,系统尝试刷盘部分数据,大约每次尝试32个 page 来找出更多干净 page
同步刷盘
同步刷盘与异步刷盘的唯一区别是,在把消息写入 page cache 后异步刷盘会直接返回,而同步刷盘需要等刷盘完成后才返回。
- 消息写入 page cache,线程等待,通知刷盘线程刷盘
- 刷盘线程完成后,通知前端等待线程(可能是一批线程)
- 前端等待线程向用户返回成功
事务消息
主要流程
这里分两种场景来讲,正常场景下生产者根据本地事务直接返回 commit / rollback,异常情况下返回 unknown(抛异常、服务器重启等)
commit / rollback
流程简述:
- Producer 发送 Half 消息到 Broker。topic: RMQ_SYS_TRANS_HALF_TOPIC,Consumer 此时不能拉取到消息,需要等后续 Half 消息状态变更。
- Producer 执行本地事务,根据执行结果返回 commit 或 rollback 给 Broker。
- Broker 收到 Producer 事务状态,如果是 rollback,则丢弃 Half 消息
- 如果是 commit,则将 Half 消息放入对应的 topic 中
- Consumer 接收到消息,执行本地事务
- Consumer 根据执行结果返回消费状态给 Broker
- 如果消费失败,则根据 Consumer 配置决定是否重试
unknown
流程简述:
- Producer 发送 Half 消息到 Broker。topic: RMQ_SYS_TRANS_HALF_TOPIC,Consumer 此时不能拉取到消息,需要等后续 Half 消息状态变更。
- Producer 执行本地事务,此时遇到未捕获的异常,服务器返回 unknown 状态,或者超过时间限制(默认6s),Broker 也会当作 unknown 状态来处理。
- Broker 收到 Producer unknown 的事务状态,不处理。
- 针对 unknown 状态的消息,Broker 会定期(30s左右,根据实际情况调整)发送 checkLocalTransaction 到 Producer 集群(相同 producerGroup)的任意一台机器,以此来检查对应的事务状态(默认最多检查10次,直到 commit 或者 rollback,超过次数限制则当作 unknown 来处理。
- Broker 根据最终状态来决定是丢弃 Half 消息,还是放入对应的 topic。
测试demo
TransactionProducerTest.java
public class TransactionProducerTest {
public static void main(String[] args) throws InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("CID_PC_TRAN_QUICK_START_PICHENG_TEST");
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
thread.setDaemon(false);
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(new TransactionCheckListener());
// 设置重试次数
producer.setRetryTimesWhenSendFailed(1);
try {
producer.start();
print("-> producer started success");
} catch (Exception e) {
print("-> producer started failed");
e.printStackTrace();
}
final String topic = "PC_TRAN_QUICK_START";
final String tag = "dkangel";
final String keys = "PC_TRAN_QUICK_START_keys";
String content = "Rocketmq transaction test";
Message message = new Message(topic, tag, keys, content.getBytes());
try {
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
print(String.format("-> msgId:%s, content:%s transaction result:%s", result.getMsgId(),
content, result.getLocalTransactionState()));
if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {
print(String.format("-> halfMsg sendResult:%s, transaction result:%s", result.getSendStatus(),
result.getLocalTransactionState()));
} else {
print(String.format("-> halfMsg sendResult:%s, transaction result:%s", result.getSendStatus(),
result.getLocalTransactionState()));
}
Thread.sleep(100);
} catch (MQClientException | InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
print("-> shutdown producer");
producer.shutdown();
}
}
TransactionCheckListener.java
public class TransactionCheckListener implements TransactionListener {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
private final AtomicLong last = new AtomicLong(System.currentTimeMillis());
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = atomicInteger.get();
if (value == 0) {
//注意:如果抛出异常,等于设置UNKNOW
PrintUtils.print(String.format("-> execute transaction exception, index: %d", value));
throw new RuntimeException("Could not find db");
} else if ((value % 3) == 0) {
PrintUtils.print(String.format("-> execute transaction rollback, index: %d", value));
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 2) == 0) {
PrintUtils.print(String.format("-> execute transaction commit, index: %d", value));
return LocalTransactionState.COMMIT_MESSAGE;
}
PrintUtils.print(String.format("-> execute transaction unknown, index: %d", value));
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return process();
}
private LocalTransactionState process() {
long time = (System.currentTimeMillis() - last.get()) / 1000;
if (this.atomicInteger.get() != 0 && this.atomicInteger.get() % 10 == 0) {
PrintUtils.print(String.format("-> check transaction multi-commit, index: %d, cost(s): %d", atomicInteger.get(), time));
return LocalTransactionState.COMMIT_MESSAGE;
}
this.atomicInteger.getAndIncrement();
int value = atomicInteger.get();
last.compareAndSet(last.get(), System.currentTimeMillis());
if ((value % 11) == 0) {
//注意:如果抛出异常,等于设置UNKNOW
PrintUtils.print(String.format("-> check transaction exception, index: %d, cost(s): %d", value, time));
throw new RuntimeException("Could not find db");
} else if ((value % 12) == 0) {
PrintUtils.print(String.format("-> check transaction rollback, index: %d, cost(s): %d", value, time));
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 10) == 0) {
PrintUtils.print(String.format("-> check transaction commit, index: %d, cost(s): %d", value, time));
return LocalTransactionState.COMMIT_MESSAGE;
}
PrintUtils.print(String.format("-> check transaction unknown, index: %d, cost(s): %d", value, time));
return LocalTransactionState.UNKNOW;
}
}
PrintUtils
public class PrintUtils {
public static void print(String msg) {
System.out.println(Thread.currentThread().getId() + "-" + Thread.currentThread().getName() + ": " + msg);
}
}
结果展示
maxPoolSize = 2
maxPoolSize = 1
疑问:
broker master、slave同步方式?
所有数据单独存储到一个commitLog,顺序写,随机读。按照什么维度区分?topic?
pageCache?顺序跳跃读?
写消息LRU方式?
messageIdId格式?
message key,slot table 槽表格式?
message queue存储结构?
消费方式:pull 长轮询方式?
producer选messageQueue算法?
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue
附录
消息队列选型分析
kafka, rocketMQ, rabbitMQ
专业术语
Topic:消息主题,由用户定义并在mq-ops进行配置。
Tag:消息主题标签,一个topic可以有多个tag。
Producer:消息生产者,负责生产消息,一般由业务系统负责产生消息。
Producer group:表示一类producer,通常用来发送一类消息,发送时可以指定不同的topic。
Consumer:消息消费者,负责消费消息,一般是由后台应用负责消费。
Consumer group:表示一类consumer,这类consumer消费一类消息,消费逻辑一致。
Push consumer:consumer的一种,向consumer对象注册一个listener接口,一旦接受到消息,consumer对象就立刻回调listener接口。(这里和diamond注册listener类似,都是通过回调方法实现消息通知)
Pull consumer:consumer的一种,应用主动调用consumer的拉消息方法从broker拉消息,主动权由应用控制。
Name server:用于服务发现,专为 RocketMQ 设计的轻量级名称服务。
Broker:消息中转角色,负责存储消息,转发消息,一般也称为server。
关键约束
topicMaxLength = 255
com.alibaba.rocketmq.client.Validators#CHARACTER_MAX_LENGTH
defaultMaxMessageSize = 4M
com.alibaba.rocketmq.client.producer.DefaultMQProducer#maxMessageSize=1024 * 1024 * 4; // 4M
涉及技术点
Netty
Channel
NIO
NioEventLoopGroup
JDK
other
transient
Comparable
ThreadLocal
JUC
CountDownLatch
AtomicBoolean
AtomicInteger
ReentrantLock
ConcurrentHashMap
ThreadPoolExecutor
AQS
Linux
page cache
https://qinglinmao8315.github.io/linux/2018/03/14/linux-page-cache.html
参考文献
事务消息:https://xie.infoq.cn/article/53240651a2ef7c173f50a3194
rocketmq前世今生:https://developer.aliyun.com/article/69647