下一章:发送问题排查 | 《Rocket MQ 使用排查指南》第二章>>>
也可以PC端点击https://developer.aliyun.com/topic/download?id=820下载
什么是消息队列Rocket MQ
核心概念
消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高 并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 版既可为分布式 应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆 积、高吞吐、可靠重试等特性。
产品功能与特性
消息队列 RocketMQ 版在阿里云多个地域(Region)提供了高可用消息云服务。 单个地域内采用多机房部署,可用性极高,即使整个机房都不可用,仍然可以为应用 提供消息发布服务。
消息队列 RocketMQ 版提供 TCP 和 HTTP 协议的多语言接入方式,方便不 同编程语言开发的应用快速接入消息队列 RocketMQ 版消息云服务。您可以将应 用部署在阿里云 ECS、企业自建云,或者嵌入到移动端、物联网设备中与消息队列 RocketMQ 版建立连接进行消息收发;同时,本地开发者也可以通过公网接入消息队 列 RocketMQ 版服务进行消息收发。
系统部署架构
系统部署架构如下图所示。
图中所涉及到的概念如下所述:
● Name Server:是一个几乎无状态节点,可集群部署,在消息队列 RocketMQ 版中提供命名服务,更新和发现 Broker 服务。
● Broker:消息中转角色,负责存储消息,转发消息。分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后需要完成一次将 自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。
● 生产者:与 Name Server 集群中的其中一个节点(随机)建立长链接(Keepalive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。
● 消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、 Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心 跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。 应用场景
应用场景
削峰填谷
流量削峰也是消息队列 RocketMQ 版的常用场景,一般在秒杀或团队抢购活动 中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在 处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系 统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间 加入消息队列 RocketMQ 版。
秒杀处理流程如下所述:
- 用户发起海量秒杀请求到秒杀业务处理系统。
- 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 RocketMQ 版。
- 下游的通知系统订阅消息队列 RocketMQ 版的秒杀相关消息,再将秒杀成 功的消息发送到相应用户。
- 用户收到秒杀成功的通知。 异步解耦 传统处理
异步解耦
传统处理
最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注 册成功。传统的做法有以下两种:
(1)串行方式
串行方式下的注册流程如下图所示。
数据流动如下所述:
● 用户在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入 注册系统成功。
● 注册信息写入注册系统成功后,再发送请求至邮件通知系统。邮件通知系统收 到请求后向用户发送邮件通知。
● 邮件通知系统接收注册系统请求后再向下游的短信通知系统发送请求。短信通 知系统收到请求后向用户发送短信通知。
以上三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
假设每个任务耗时分别为 50 ms,则用户需要在注册页面等待总共需要 150 ms 才能登录。
(2)并行方式
并行方式下的注册流程如下图所示。
数据流动如下所述:
● 用户在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入 注册系统成功。
● 注册信息写入注册系统成功后,再同时发送请求至邮件和短信通知系统。邮件 和短信通知系统收到请求后分别向用户发送邮件和短信通知。
以上两个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
假设每个任务耗时分别为 50 ms,其中,邮件和短信通知并行完成,则用户需 要在注册页面等待总共需要 100 ms 才能登录。
以下就注册场景中使用了消息队列 RocketMQ 版的效果进行说明。
异步解耦
对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便 可以登录,后续的注册短信和邮件不是即时需要关注的步骤。
对于注册系统而言,发送注册成功的短信和邮件通知并不一定要绑定在一起同步 完成,所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消 息队列 RocketMQ 版中然后马上返回用户结果,由消息队列 RocketMQ 版异步地 进行这些操作。
数据流动如下所述:
● 用户在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入 注册系统成功。
● 注册信息写入注册系统成功后,再发送消息至消息队列 RocketMQ 版。消息 队列 RocketMQ 版会马上返回响应给注册系统,注册完成。用户可立即登录。
● 下游的邮件和短信通知系统订阅消息队列 RocketMQ 版的此类注册请求消息, 即可向用户发送邮件和短信通知,完成所有的注册流程。
用户只需在注册页面等待注册数据写入注册系统和消息队列 RocketMQ 版的时 间,即等待 55 ms 即可登录。
异步解耦是消息队列 RocketMQ 版的主要特点,主要目的是减少请求响应时间和 解耦。主要的适用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消 息放入消息队列。同时,由于使用了消息队列 RocketMQ 版,只要保证消息格式不 变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。
顺序收发
消息队列 RocketMQ 版顺序消息分为两种情况:
全局顺序:对于指定的一个 Topic,所有消息将按照严格的先入先出(FIFO)的 顺序,进行顺序发布和顺序消费;
分区顺序:对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分 区,同一个分区内的消息将按照严格的 FIFO 的顺序,进行顺发布和顺序消费,可以 保证一个消息被一个进程消费。
在注册场景中,可使用用户 ID 作为 Sharding Key 来进行分区,同一个分区下 的新建、更新或删除注册信息的消息必须按照 FIFO 的顺序发布和消费。
分布式事务一致性
注册系统注册的流程中,用户入口在网页注册系统,通知系统在邮件系统,两个系统之间的数据需要保持最终一致。
普通消息处理
如上所述,注册系统和邮件通知系统之间通过消息队列进行异步处理。注册系统 将注册信息写入注册系统之后,发送一条注册成功的消息到消息队列 RocketMQ 版, 邮件通知系统订阅消息队列 RocketMQ 版的注册消息,做相应的业务处理,发送注 册成功或者失败的邮件。
流程说明如下:
- 注册系统发起注册。
- 注册系统向消息队列 RocketMQ 版发送注册消息成功与否的消息。
2.1 消息发送成功,进入 3。
2.2 消息发送失败,导致邮件通知系统未收到消息队列 RocketMQ 版发送 的注册成功与否的消息,而无法发送邮件,最终邮件通知系统和注册 系统之间的状态数据不一致。
- 邮件通知系统收到消息队列 RocketMQ 版的注册成功消息。
- 邮件通知系统发送注册成功邮件给用户。
在这样的情况下,虽然实现了系统间的解藕,上游系统不需要关心下游系统的业 务处理结果;但是数据一致性不好处理,如何保证邮件通知系统状态与注册系统状态 的最终一致。
流程说明如下:
- 注册系统向消息队列 RocketMQ 版发送半事务消息。 1.1 半事务消息发送成功,进入 2。
1.2 半事务消息发送失败,注册系统不进行注册,流程结束。(最终注册系 统与邮件通知系统数据一致)
- 注册系统开始注册。
2.1 注册成功,进入 3.1。
2.2 注册失败,进行 3.2。
- 注册系统向消息队列 RocketMQ 版发送半消息状态。
3.1 提交半事务消息,产生注册成功消息,进入 4。
3.2 回滚半事务消息,未产生注册成功消息,流程结束。(最终注册系统与 邮件通知系统数据一致)
- 邮件通知系统接收消息队列 RocketMQ 版的注册成功消息。
- 邮件通知系统发送注册成功邮件。(最终注册系统与邮件通知系统数据一致)
大规模机器的缓存同步
双十一大促时,各个分会场会有玲琅满目的商品,每件商品的价格都会实时变 化。使用缓存技术也无法满足对商品价格的访问需求,缓存服务器网卡满载。访问较 多次商品价格查询影响会场页面的打开速度。
此时需要提供一种广播机制,一条消息本来只可以被集群的一台机器消费,如果 使用消息队列 RocketMQ 版的广播消费模式,那么这条消息会被所有节点消费一次, 相当于把价格信息同步到需要的每台机器上,取代缓存的作用。
消息类型
普通消息
普通消息是指消息队列 RocketMQ 版中无特性的消息,即发送到服务端会立马 被消费的消息,且消息是无序消费,不会按照发送的顺序一次顺序消费。
定时消息
Producer 将消息发送到消息队列 RocketMQ 版服务端,但并不期望立马投递 这条消息,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费, 该消息即定时消息。
延时消息
Producer 将消息发送到消息队列 RocketMQ 版服务端,但并不期望立马投递 这条消息,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。 定时消息与延时消息在代码配置上存在一些差异,但是最终达到的效果相同:消 息在发送到消息队列 RocketMQ 版服务端后并不会立马投递,而是根据消息中的属 性延迟固定时间后才投递给消费者。
全局顺序消息
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布 和消费。
分区顺序消息
对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。同一个分 区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用 来区分不同分区的关键字段,和普通消息的 Message Key 是完全不同的概念。
事务消息
消息队列 RocketMQ 版提供类似 X/Open XA 的分布事务功能,通过消息队列
RocketMQ 版的事务消息能达到分布式事务的最终一致。
SDK 支持语言及协议
RocketMQ 支持 tcp 协议以及 http 协议的接入。
其中推荐使用阿里推出的 tcp 协议下的三大 sdk:java,C/C++,.NET。
除了阿里推出的 sdk,我们还支持开源的多语言 sdk 接入阿里云 RocketMQ: java,go,python,C++。
如 果 您 想 使 用 多 语 言 的 sdk, 推 荐 使 用 http 协 议 接 入:java,PHP,go, Python,Nodejs,C#,C++。
具体协议以及 sdk 的获取参考链接: https://help.aliyun.com/document_detail/124693.html?spm=a2c4g.11186 623.6.582.104e425cW5Tbm2
RocketMQ 快速入门教程
如果您使用的是阿里云主账号,则可以通过本文来体验从开通服务、创建资源、 到使用 SDK 收发消息的完整流程,快速上手消息队列 RocketMQ 版。 无论您使用的是消息队列 RocketMQ 版支持的何种协议、何种语言,前三个步 骤都一致,只是在控制台上具体填写的信息会略有不同,请以控制台说明为准。但在 调用 SDK 时,不同协议和语言的示例代码有所不同,本文以 TCP 协议下的 Java SDK 为例进行说明。
步骤一:开通服务
- 在消息队列 RocketMQ 版产品页,单击立即开通。
- 在确认订单页面,选择我已阅读并同意《消息队列 MQ 服务协议》,再单击 立即开通即可完成开通。
步骤二:创建资源
在使用消息队列 RocketMQ 版时,请注意以下网络访问限制:
● Topic 和 Group ID 需创建在同一个地域(Region)下的同一个实例中才能互通。 例如,当某 Topic 创建在华东 1(杭州)下的实例 A 中,那么该 Topic 只能被在 华东 1(杭州)下的实例 A 中创建的 Group ID 对应的生产端和消费端访问。
● 如果只是测试,或者需要在本地(非阿里云 ECS 服务器)使用消息队列 RocketMQ 版的服务,请将 Topic 和 Group ID 都创建在公网地域下的实例 中。生产端和消费端可以部署在本地或者部署在任意地域的 ECS 上,前提是 本地服务器或者相应的 ECS 需要能够访问公网。
创建实例
实例是用于消息队列 RocketMQ 版服务的虚拟机资源,会存储消息主题 (Topic)和客户端 ID(Group ID)信息。
- 登录消息队列 RocketMQ 版控制台。在页面顶部导航栏,选择地域,如公 网地域。
- 在左侧导航栏,单击实例详情。
- 在实例详情页面右上角,单击创建实例按钮。
- 在创建实例对话框,选择实例类型,并输入实例名和描述,然后单击确认。
创建 Topic
Topic 是消息队列 RocketMQ 版里对消息的一级归类,例如可以创建 Topic_ Trade 这一 Topic 来识别交易类消息,消息生产者将消息发送到 Topic_Trade,而 消息消费者则通过订阅该 Topic 来获取和消费消息。
● Topic 不能跨实例使用,例如在实例 A 中创建的 Topic A 不能在实例 B 中 使用。
● Topic 名称必须在同一实例中是唯一的。
● 您可创建不同的 Topic 来发送不同类型的消息,例如用 Topic A 发送普通消 息,Topic B 发送事务消息,Topic C 发送定时 / 延时消息。
- 在控制台左侧导航栏,单击 Topic 管理。
- 在 Topic 管理页面上方选择刚创建的实例,单击创建 Topic 按钮。
- 在创建 Topic 对话框中的 Topic 一栏,输入 Topic 名称,选择该 Topic 对 应的消息类型,输入该 Topic 的备注内容,然后单击确定。
您创建的 Topic 将出现在 Topic 列表中。
创建 Group ID
创建完实例和 Topic 后,您需要为消息的消费者(或生产者)创建客户端 ID , 即 Group ID 作为标识。
● Group ID 必须在同一实例中是唯一的。
● Group ID 和 Topic 的关系是 N:N,即一个消费者可以订阅多个 Topic,同 一个 Topic 也可以被多个消费者订阅;一个生产者可以向多个 Topic 发送消 息,同一个 Topic 也可以接收来自多个生产者的消息。 说明 :消费者必须有对应的 Group ID,生产者不做强制要求。
- 在控制台左侧导航栏,单击 Group 管理。
- 在 Group 管理页面上方选择刚创建的实例,然后选择 TCP 协议 > 创建 Group ID。本文以 TCP 协议为例。 说明 :TCP 和 HTTP 协议下的 Group ID 不可以共用,因此需分别创建。
- 在创建 Group ID 对话框中,输入 Group ID 和描述,然后单击确认。
创建阿里云 AccessKey
阿里云 AccessKey 用于收发消息时进行账户鉴权。
在调用 SDK 发送和订阅消息的时候,除了需要指定创建的 Topic 和 Group ID 以外,还需输入您在 RAM 控制台创建的身份验证信息,即 AccessKey。AccessKey 的信息包含 AccessKeyId 和 AcessKeySecret。
步骤三:获取接入点
在控制台创建好资源后,您需通过控制台获取实例的接入点。在收发消息时,您需要为生产端和消费端配置该接入点,以此接入某个具体实例或地域的服务。
- 在控制台左侧导航栏,单击实例详情。
- 在实例详情页面上方选择刚创建的实例。
- 在默认显示的实例信息页签的获取接入点信息区域,您可以分别看到新创 建实例的 TCP 和 HTTP 协议接入点。接入点性质因协议而异,具体说明 如下:
● TCP 协议:您在控制台看到的 TCP 协议接入点是地域下某个具体实例的接入 点。同一地域下的不同实例的接入点各不相同。
● HTTP 协议:您在控制台看到的 HTTP 协议接入点是某个地域的接入点,跟 具体实例无关。您在收发消息时还需另外设置实例 ID。
- 在 TCP 协议的接入点区域,单击复制。 对于 TCP 协议的接入点,您还可以单击示例代码,查看在各种开发语言的程序 中如何设置接入点。
完成以上准备工作后,您就可以运行示例代码,用消息队列 RocketMQ 版进行 消息发送和订阅了。
步骤四:发送消息
您可以通过以下方式发送消息: 控制台发送消息:用于快速验证 Topic 资源的可用性,主要用作测试。
- 在控制台左侧导航栏,单击 Topic 管理。
- 在 Topic 管理页面,找到您刚刚创建的 Topic,单击右侧操作列的发送。
- 在发送消息对话框中的 Message Body 一栏,输入消息的具体内容,单击 确定。
控制台会返回消息发送成功通知以及相应的 Message ID。
调用 SDK 发送消息:用于生产环境下使用消息队列 RocketMQ 版。
下文以调用 TCP Java SDK 为例进行说明。
调用 TCP Java SDK 发送消息
- 通过以下任一方式引入依赖:
● Maven 方式引入依赖:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>"XXX"</version>
// 设置为 Java SDK 的最新版本号
</dependency>
● 下载依赖 JAR 包:
- 根据以下说明设置相关参数,运行示例代码:
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台创建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// 鉴权用 AccessKeyId,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey,"XXX");
// 鉴权用 AccessKeySecret,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置 TCP 接入域名,进入控制台的实例详情页面,在页面上方选择实例后,在实例信息中的“获取
接入点信息”区域查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
// 循环发送消息
while(true){
Message msg = new Message( //
// 在控制台创建的 Topic,即该消息所属的 Topic 名称
"TopicTestMQ",
// Message Tag,
// 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列
RocketMQ 版服务器过滤
"TagA",
// Message Body
// 任何二进制形式的数据,消息队列 RocketMQ 版不做任何干预,
// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一,以方便您在无法正常收到消息情况下,可通过控
制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
msg.setKey("ORDERID_100");
// 发送消息,只要不抛异常就是成功
// 打印 Message ID,以便用于消息发送状态查询
SendResult sendResult = producer.send(msg);
System.out.println("Send Message success. Message ID is: " + sendResult.
getMessageId());
}
// 在应用退出前,可以销毁 Producer 对象
// 注意:如果不销毁也没有问题
producer.shutdown();
}
}
查看消息是否发送成功 消息发送后,您可以在控制台查看消息发送状态,步骤如下:
- 在控制台左侧导航栏,选择消息查询 > 按 Message ID 查询。
- 在搜索框中输入发送消息后返回的 Message ID,单击搜索查询消息发送 状态。 储存时间表示消息队列 RocketMQ 版服务端存储这条消息的时间。如果查询到此消息,表示消息已经成功发送到服务端。
步骤五:调用 SDK 订阅消息
消息发送成功后,需要启动消费者来订阅消息。下文以调用 TCP Java SDK 为 例说明如何订阅消息。
- 调用 TCP Java SDK 订阅消息。
您可以运行以下示例代码来启动消费者,并测试订阅消息的功能。请按照说明正 确设置相关参数。
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台创建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// 鉴权用 AccessKeyId,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, "XXX");
// 鉴权用 AccessKeySecret,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置 TCP 接入域名,进入控制台的实例详情页面,在页面上方选择实例后,在实例信息中的“获取
接入点信息”区域查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
- 查看消息订阅是否成功。
完成上述步骤后,您可以在控制台查看消费者是否启动成功,即消息订阅是否 成功。
- 在控制台左侧导航栏,单击 Group 管理。
- 找到要查看的 Group ID,单击该 Group ID 所在行操作列的订阅关系。
如果是否在线显示为是,且订阅关系一致,则说明订阅成功。否则说明订阅 失败。