RocketMQ 全链路灰度探索与实践

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 当链路请求中存在消息的时候,如何实现全链路灰度?一起来看看消息灰度的设计与实现吧!

本文作者:肖京,Spring Cloud Alibaba PMC,阿里云智能技术专家。

01 全链路灰度背景介绍

图片

发布新版本时,为了有效、谨慎地验证新版本代码逻辑的正确性,通常会采用灰度发布,从而达到减小第一次变更影响面的目的。

举个例子,应用的集合中可能会包含交易中心、商品中心、库存中心等多个模块。在一次新版本发布的过程中,可能有 feature 既修改了交易中心,又修改了商品中心。为了验证新版本的正确性,需要让灰度的流量到达交易中心和商品中心的灰度版本,串联起来才能有效验证新版本的正确性,因此需要全链路灰度。

如上图,流量从入口应用进来之后,如果被识别成灰度流量,则它不仅会去往交易中心的灰度,也会去往商品中心的灰度。如果库存中心有灰度,也会去库存中心的灰度;如果没有,则会降级到库存中心的基线环境。

图片

假设有一个网关,客户端可以从 iOS、安卓或 H5 对网关进行访问,在访问过程中,会给参数加上 header(http协议),header 头里包含用户 ID 信息。后端分为A、B、C三个模块,比如发布了新功能,需要更新 A 和 C 两个应用模块,A 和 C 的灰度版本需要同时发布,而B应用没有新特性发布。为了有效地验证 A 和 C 新版本代码逻辑的正确性,我们将灰度规则设置为user ID =120 时去往灰度环境。因此流量从网关进来之后,先去往 A 的灰度环境;而后A调用B时,发现 B 没有灰度环境,会降级到 B 的基线环境;下一跳 B 调用 C 的时候,发现 C 存在灰度环境,又重新回到 C 的灰度环境,以此实现全链路的灰度发布。

上述全链路灰度发布的过程可以有效验证 A 和 C 两个应用的新版本组合在一起时的有效性,只有可控范围内的流量参与到灰度环境,可以有效验证新版本的正确性,避免因为新版本的业务逻辑错误造成比较重大的损失或业务故障。

以上为RPC 层的全链路灰度。

图片

当链路请求中存在消息的时候,如何实现全链路灰度?

假设库存中心C收到一笔订单之后,会生产消息并发送到 RocketMQ server 中,同时由 A 应用来消费该消息。此时如果在过程中对消费逻辑进行了修改,则需要 C 应用生产的消息(灰度的消息)被 A 的灰度环境即RocketMQ的灰度消费者消费,才能实现灰度环境的闭环。

02 消息灰度的设计与实现

图片

消息灰度的设计中,消息的生产者如何生产灰度消息?

带上灰度标签有以下三种方式:

①如果请求在入口被识别成灰度请求,则该消息会被标记成灰度消息。

②如果节点本身属于灰度节点,且开启了流量染色,则该消息会被标记为灰度消息。

③入口处请求没有被识别成灰度流量,但消息本身的 payload 属于灰度流量,则该消息也会被标记成灰度消息。

消息生产者在生产的时候,可以通过在 tag 或 user-property 中加上一些字段将灰度信息附带在消息体中。但是考虑到 tag 包含业务逻辑语义,且每个消息只能有一个 tag,因此不推荐使用 tag 。而 user-property 字段属于  key-value 结构,较灵活,更适用于存储消息的灰度标识。

RocketMQ 的 producer 中提供了 SendMessageHook,可以自定义逻辑,生产消息的时候可以将灰度标签存储在 user-property 中,消息发送到 RocketMQ server 的时候就包含了灰度信息。

图片

消费者灰度复杂一些,既支持客户端过滤,也支持服务端过滤。

从图中可以看到,开源的 RocketMQ client 中有 FilterMseeageHook 能够进行逻辑处理。可以通过 FilterMseeageHook 将本环境不需要消费的消息直接过滤掉。正式环境和灰度环境的消费者分别使用不同的 consumer group,使其 offset 分开。然后在 FilterMseeageHook 中加入对应逻辑,即可将灰度环境中收到所有非灰度的消息过滤掉。

正式环境需要拉取所有消息以分析 user-property 字段,key value 里包含消息的环境标,若识别到灰度环境的消息,则正式环境会通过 remove 的方式忽略此消息,灰度环境同理。

此套方案下,正式环境和灰度环境属于两个不同的 consumer group,且他们都需要将所有消息拉取本地。比较极端的场景下,比如灰度消费者只有一台机器,但是正式环境的消费者有 100 台机器,则灰度环境需要承担巨大压力。另一方面,RocketMQ server 服务端也需要将每条消息推送两次,也增加了服务端的压力。

客户端过滤的方式存在弊端,那么,能否通过服务端过滤来规避这些弊端?

服务端的过滤分为 Tag 过滤和 SQL92 过滤两种方式。

图片

RocketMQ 的服务端过滤包含两种过滤模式,分为 Tag 过滤和 SQL92 过滤。

Tag 过滤的实现中,RocketMQ 消费者向 server 端订阅的时候,会传递订阅信息到服务端。订阅信息为 SubscribtionData,其中包含四个字段:

  • topic
  • tagSet
  • expressionType=tag:表达式的类型,这里是 tag 过滤,所以值为tag
  • client version:此次订阅的版本号

Client 会不断向 server 端发送心跳,默认情况下 30 秒一次。过程中  SubscribtionData 可以动态变化,如果对tagSet或表达式的类型进行过更改,则会增加 client version 的值。服务端收到心跳之后,发现心跳里的 SubscribtionData 版本号改变,意味着订阅规则也有所变化,此时会更新客户端的订阅逻辑,决定服务端过滤变化的推送。

Server 端处理服务端的灰度过滤逻辑如下:

RocketMQ 中有一个 MessageFilter 类,首先会进行 consumer queue 的比对,如果匹配成功,则进行 tagscode 的比对。两次比对都匹配才会将消息推送到 client 消费者端。

以上流程的优势为避免了灰度环境拉取所有消息,能够有效减轻灰度环境消费者的负担。同时,服务端不会将所有消息都推送两遍,大大降低了服务端压力。

图片

如果灰度信息保存在 user-property 字段,可以通过 SQL 92 的方式进行过滤。

服务端 ConsumerFilterManager 保存了每一个 topic 对应的FilterDataMapByTopic,而 FilterDataMapByTopic 里保存了不同 consumer group 对应的消费逻辑ConsumerFilterData,ConsumerFilterData 里包含了 consumer group、topic、表达式以及 client version,与客户端发送的信息非常类似,因此可以借此来过滤。

SQL 92 是一种可以写复杂表达式的过滤规则,除了能够实现tag 过滤方式,也能基于 user-property 字段进行过滤。

图片

SQL92 的过滤规则如上图所示。

  • 消费 Tag 为 A 或者 Tag 为 B 的消息可以写为:(TAGS is not null and TAGS in ('A', 'B'))
  • 消费 user-property 中 version 为 gray 的消息可以写为:(version is 'gray')
  • 消费 Tag 为 A 且 user-property 中 version 为 gray 的消息可以写为:(TAGS is  'A') and (version is 'gray')
  • 消费 Tag 为 A 且 user-property 中 version 为 green 或者 blue 的消息可以写为:(TAGS is not null and TAGS is  'A' ) and ( (version is 'green') or (version is 'blue') )

图片

此外,实际应用场景中要实现完善的消息灰度,还有诸多需要考虑的问题:

  • 如果消息生产逻辑是独立的线程池,如何实现灰度标的透传?常见方式为往线程池里提交任务的时候,将属于灰度的标志放到 task 某个字段中,在消费的时候读取 task 字段,重新放在 thread local 中,实现跨线程的灰度标传递。
  • 全路灰度发布的过程中出现了回滚,有一些灰度消息是否会出现没有灰度消费者的情况?消息一直没有被消费,应该怎么办?有两种选择,一种是筛选没有消费的消息,进行补偿动作;如果消息的消费逻辑没有进行较大变更,也可以容忍让基线环境消费灰度消息。
  • 消息灰度过程中出现重复消费怎么办?可以在消费逻辑中保证幂等,或设置更精准的灰度控制逻辑。
  • 消费者的订阅行为能否支持动态变更?比如没有灰度的消费者是否可以用正式环境消费灰度消息?
  • 如果正式环境可以消费灰度消息,那么正式环境的默认行为时消费所有消息,还是只消费正式环境的消息?
  • 如何实现自定义的消息灰度逻辑?比如流量刚进来的时候,并没有被识别为灰度流量,但是在发送过程中发现消息里有一些特殊的逻辑,正好命中了灰度规则,则需要将其标记为一条灰度规则。此时如何做自定义的灰度逻辑?
  • 假设正式环境能够消费灰度的消息,过程中如果灰度的消费者启动了,正式环境能否自动探测是否需要消费灰度?实现这一点可以避免重复消费,同时又能解决发布上下游有联动的情况。

03 MSE全链路灰度最佳实践

图片

阿里云的微服引擎MSE的全链路灰度最佳实践如上图:name=xiaoming 属于灰度流量,进行规则配置后流量经过 A 的灰度环境再到 B 的基线环境再到 C 的灰度环境,C 产生的灰度消息可以被 RocketMQ Server 接收,同时准确推送到 A 的灰度环境,实现闭环。

图片

只要基于开源标准的方式开发,接入使用MSE就不需要修改任何代码,业务无侵入/无感知,而且零升级成本,完全不需要改变现有的业务架构。主要通过 One Java Agent 方式实现,通过 Java Agent 进行字节码的增强,使消息的生产者和消费者在开源RocketMQ 的发送行为和消费行为自动注入灰度相关字节码,业务部署以后即可自动 attach 上 Java Agent,无需修改任何逻辑就能完成消息灰度。

上图中,消息生产者挂载上 Java Agent 之后,agent 会自动修改发送消息的逻辑。如果识别到消息属于灰度流量,会自动将消息加上灰度标,发送到消息服务端。消息消费者在启动过程中识别到灰度环境,会自动通过 Java Agent 修改 consumer group ,而且发送订阅规则时会自动订阅只消费灰度的消息。消息的服务端会对过滤的规则进行识别和匹配,以保证只有灰度消息会推送给灰度消费者。

接入微服务引擎 MSE 后,所有对于消息灰度的操作可直接在控制台上完成。用户只需在 MSE 控制台进行治理规则的配置,即可自动通过配置中心给消费者推送消费规则,使消费者可以动态变更消费行为而完成消息灰度的完整过程。过程中无需修改任何代码,只需要接入 Java Agent 即可实现全部功能。

图片

使用步骤如下:在 MSE 的应用列表页选择对应的应用,开启消息灰度,配置基线环境和需要忽略的标签,即可使用消息的灰度。

点击链接可查看更多相关信息:

https://help.aliyun.com/document_detail/397318.html

图片

接入MSE后,Demo的效果为:A 应用的灰度环境只会消费灰度消息,基线环境只会消费基线消息。

图片

上图为Demo效果。左边为基线环境,只会消费基线的应用生产出来的消息,后续A 调 B 调 C 的过程中也只会在基线的环境。右边日志是 A 的灰度环境,消费的消息用 C 的灰度环境生产出来。往后调用 A 调 B 调 C 的过程中,A是灰度环境,调用 B 的时候,因B只有基线环境,所以是流量消息只到B的基线环境,最后到达 C 的灰度环境。消费消息的时候,能够继续带上消息的灰度标往下透传并走到正确路径上。

Demo源码下载地址:

https://github.com/aliyun/alibabacloud-microservice-demo/tree/master/mse-simple-demo

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 监控
|
消息中间件 弹性计算 Java
Rocketmq-spring入门与实践
本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。
|
1月前
|
消息中间件 存储 Serverless
【实践】快速学会使用阿里云消息队列RabbitMQ版
云消息队列 RabbitMQ 版是一款基于高可用分布式存储架构实现的 AMQP 0-9-1协议的消息产品。云消息队列 RabbitMQ 版兼容开源 RabbitMQ 客户端,解决开源各种稳定性痛点(例如消息堆积、脑裂等问题),同时具备高并发、分布式、灵活扩缩容等云消息服务优势。
76 2
|
2月前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
72 3
|
3月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
55 3
|
13天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
23天前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
58 6
|
20天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
21天前
|
消息中间件 存储 弹性计算
云消息队列 RabbitMQ 版实践解决方案评测
随着企业业务的增长,对消息队列的需求日益提升。阿里云的云消息队列 RabbitMQ 版通过架构优化,解决了消息积压、内存泄漏等问题,并支持弹性伸缩和按量计费,大幅降低资源和运维成本。本文从使用者角度详细评测这一解决方案,涵盖实践原理、部署体验、实际优势及应用场景。
|
27天前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
61 4