作者 | 柳下
背景介绍
消息队列服务(下文均以 Message Service 命名)作为云计算 PaaS 领域的基础设施之一,其高并发、削峰填谷的特性愈发受到开发者关注。Message Service 对上承接消息生产者服务的请求,对下连接消费者服务。提到消费:那就不得不引入两个问题?
- 如何以低成本、高吞吐、低延时的方式将消息数据从 Message Service 输送给下游消费服务?
- 如何快速构建免运维、按需弹性伸缩算力的消息消费服务?
今天就来聊聊如何在阿里云上基于 Serverless 计算服务 + Message Service 构建这样一套系统。
名词解释
函数计算(Function Compute)
阿里云函数计算是事件驱动的全托管 Serverless 计算服务。通过函数计算,您无需管理服务器等基础设施,只需编写代码并上传。函数计算会为您准备好计算资源,以弹性、可靠的方式运行您的代码,更多产品细节可阅读官方文档[1]。
连接器(Connector)
Connector 实现了大量数据的导入和导出。例如将 KAFKA topic 中数据导出到 stdout,或将本地文件中数据导入到 RocketMQ。Connector 简化了数据在不同系统间复制和传输的复杂度,本文探讨的消息服务和计算服务的连接同样依赖 Connector 实现。
事件总线(EventBridge)
事件总线是 Connector 的产品化服务,支持阿里云服务、自定义应用、SaaS 应用等以标准化、中心化的方式接入,并能够以标准化协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构,更多产品细节可阅读官方文档[2]。
架构演进
传统的数据消费架构如下图左:
1)数据源将产生的数据写入到消息系统;
2)开发者借助 Message Service 提供的 OpenAPI/SDK 或 Proxy 服务客户端从 Message Service 读取数据;
3)根据消息数据处理业务逻辑,也就是我们所谓的消费消息,将消息消费的业务结果写入到目标服务;如此架构开发者会面临以下几个问题:
1、如何并发安全的从 Message Service 读取数据?
2、数据消费能力小于生产能力时,如何快速提升消费吞吐?
3、目标服务资源成为瓶颈时,如何快速扩容?当流量波峰过后,面对空闲的机器成本,您又如何处理?
4、如何保证消费实时性、顺序性?
5、如何实现容错、缓存、降级、限流等高可用保护手段?
6、如何监控链路状态或异常?
面对上面多个琐碎又复杂的问题,相信总有几个会击中您的痛点。为了同时解决提到的所有问题,阿里云开发 Connector Service(如上图右)打通 Message Service 和 Serverless 计算服务的数据链路,您只需声明上游的消息服务实例和下游的消费算子,便可一键部署上线,连接器同时提供了丰富的流计算框架具备的数据处理能力和监控能力,总结如下:
Transform:以 UDF 方式自定义数据清洗逻辑,同时支持 JsonPath 语法简单提取数据;
- Filter:减少无用消息的后续处理,提供多种过滤匹配规则,如:前后缀匹配、数值匹配、IP 地址匹配等;
Window:提供窗口能力,可按照消息数量和间隔时间对消息做聚合推送。可提升消息处理吞吐,降低消息处理成本;
Real Time:从 Message Service 拉取消息到推送目标服务延时毫秒级别;
自定义并发消费能力:并发安全的消费消息,提升吞吐能力;
弹性计算资源:下游计算服务根据负载自动扩缩容,无需关心服务器资源水位问题;
Monitoring + Logging + Tracing:提供了丰富的监控指标和日志分析助力开发者监控系统状态、定位异常;
完备的异常保障机制:自定义重试策略 + 容错机制 + 死信队列 + 限流 + 反压;
为让大家对功能有更深入的了解,下面我们详细介绍各个功能的益处和应用场景。
降本提效功能
Window
在大规模数据场景中,One Message Per Request 早已无法满足开发者需求。Window 本质是提供了一种消息攒批处理的能力,Connector 在产品层面提供两个可调配参数:
- 批量推送条数:单次聚合的最大消息条数,当积压的消息数量到达设定值时才会将消息推送到下游。
- 批量推送间隔:系统每到间隔时间点会将积压的消息聚合后发给下游,如果设置 0 秒表示无等待时间,接收即投递。
两个参数结合使用可极大提升数据传输效率,进而提升数据吞吐,同时可以解锁多种用户场景,例:
- 流模式实时消费:将推送间隔设为 0s,推送条数设置最大值,这样可以保证从上游拉到的数据实时推送到下游目标服务。
- 请求稀疏且延时不敏感场景下,希望消息被攒批处理,可以接受消费滞后但不希望滞后时间过长:如果仅设置批量推送条数一个参数,则可能在低谷期由于消息稀疏长时间无法达到预设的攒批条数而滞后过久,此时可引入批量推送间隔参数解决此问题。
Transform
消息消费离不开数据处理,所谓数据处理,就是通过某个过程将原始数据转为目标数据,转换的过程即为 transform。通常原始数据是一个大而全的信息集合,而目标数据只是一个结构化的子集,关键在于如何嵌入数据的清洗和提取能力。对此 Connector 提供了多种转换能力:
- Template:对于原数据和目标数据都是确定结构的数据,且数据提取组装规则简单,可以借助模版完成 transform,模版同时支持 JsonPath 数据提取规则,如下图:
- UDF(User Define Function 用户自定义函数):对原数据结构复杂,且数据转换过程复杂的场景,可以借助 UDF 实现。UDF 模式中,服务提供方仅约定了函数的入参协议、参数的数据结构,至于函数中如何对数据做清洗?返回的数据结构如何?全部交由开发者实现,极大提升了消息处理的灵活度,一个简单的 UDF demo 如下:
# -*- coding: utf-8 -*- # handle_message 为函数执行入口 # 服务提供方约定了入参 event 和 context 的数据格式 # 只需从 event 中解析消息体并做处理即可 def handle_message(event, context): try: new_message = transform(event) except Exception as e: raise e return new_message def transform(old_message): # 自定义对数据的清洗和处理逻辑,并返回处理后的消息 return new_message
Filter
Filter 减少无用消息的后续处理,提升消息处理的效率,尤其和 Serverless 计算结合时,可减少调用次数,例如以下场景:
- 对敏感字、非法文字、关键字进行过滤;
- 对某些具有攻击性的 IP 进行消息拦截;
- ......
为覆盖足够多的业务场景,Connector 提供了前缀匹配、后缀匹配、数值匹配、IP 地址匹配等多种匹配模式,您可以根据业务需求选择适合的模式。
Real Time
在流计算场景中,低延时消费是开发者比较关注的一个问题,Connector 在提供批处理能力的同时也兼顾了流处理场景,当时间攒批窗口设置为 0 时,系统将演变为实时消费行为。
自定义并发消费能力
以 KAFKA 为例,当 KAFKA 数据量增大时,用户通常借助 Topic Partition 的水平扩展能力提升投递和消费的速率,随着 Topic Partition 分区数的不断增加,Consumer 端仍沿用单线程消费所有 partition 数据的方案一定会遇到瓶颈,进而导致消息积压。为了解决此问题,Connector 开放了自定义并发消费线程数配置,您可以指定多个 consumer threads,多个 consumer threads 会均分 kafka 的多个 partition,避免消息积压问题。当 Topic Partition 数量和 Consumer 线程数相等时可达到最大吞吐(如下图3),同时可做到 Partition 粒度保序。
高可用保护策略
- 重试:由于网络异常、系统 crash 等原因导致消息消费异常时,系统会按配置的 Retry Policy 进行重试,目前支持退避重试、指数衰减重试;
- 死信队列:当消息超过重试次数后仍未消费成功时,就变成了死信消息,如果不希望死信消息被丢弃,可以配置死信队列,所有的死信消息会被系统投递到死信队列中,目前系统支持 KAFKA、RocketMQ、MNS 作为死信队列的目标端;
- 容错策略:当消息消费发生错误时,系统提供以下两种处理方式:
- 允许容错:允许异常容错,当异常发生时不会阻塞执行,超出重试策略后会根据配置将消息投递至死信队列或直接丢弃,继续消费下一条消息;
- 禁止容错:不允许错误,当异常发生并超过重试策略配置时会阻塞执行;
- 反压:当系统接收消息的速率远高于它的处理速率时,出于对系统的保护会触发反压机制,避免系统崩溃,反压在系统中体现在两方面:
- 从上游拉消息的速率大于下游消费速率:积压的消息逐渐增多,如果不控制上游的拉取速率,会导致 Connector 内存不足造成 OOM;
- 下游目标服务限流:当目标服务受连接数、网络带宽等资源限制无法服务更多请求时,会返回给 Connector 大量限流错误,如果 Connector 不控制消息消费速率,可能引发系统雪崩;
针对上面两种场景,系统均通过技术手段做了保护,技术细节暂不描述。
弹性计算资源
Connector 打通了消息服务和 Serverless 函数计算服务,您可能会担心一个问题:函数计算服务的算力能否实时适配上游消息规模的不断增长?答案是可以的。函数计算作为 Serverless 计算服务,底层的计算资源可以做到毫秒级伸缩,不论您的 consumer 端并发消费能力如何调整,投递消息的频率有多高,函数计算均可在 quota 范围内快速伸缩计算实例。
计算实例 Quota 是函数计算出于对业务方服务保护设置的最大并发运行实例数,如果实际业务规模大于此默认值,可以给函数计算团队提工单调高此值。
Connector 结构
Connector 定义了数据从哪里复制到哪里,通过协调调度一系列 task 完成数据的传输工作,Task 根据职责不同可划分为以下几类:
- Poller Task:从上游消息服务中拉取消息;
- Transform Task:对消息做清洗、加工、过滤、聚合等操作;
- Sink Task:将消息推送到下游服务;
Task 均可水平扩展,并发消费上游多 partition 数据,且并发将消息投递到下游处理。
当前 Connector 依赖阿里云 EventBridge 实现,更多能力可参考官方文档[3]
客户需求
某广告平台每天将浏览的用户信息(个人信息、时间、登录设备等)投递至 kafka 中,从业务角度投递的数据格式并不完全相同,客户需将不同格式的数据清洗为相同格式的数据,并将清洗后的数据投递到 ClickHouse 服务,随着用户业务日益增长,预计未来几个月有几倍增长,且客户对实时性和成本都有要求,总结客户的几点关键需求如下:
- 具备数据清洗能力;
- 低成本;
- 系统不受业务增长因素影响;
解决方案
函数计算恰好可以完美解决上述问题,下面结合如下数据链路介绍如何解决客户的几个需求:
- 如何实现数据清洗?
Transform Task 中提供了 Data Cleaning 功能,客户可以以 UDF 方式自定义数据清洗逻辑,平台规定了入参协议,出参可以为任意格式的清洗后数据; - 如何做到低成本?
- 整条链路主要费用源于函数计算的计算资源消耗和调用次数,可通过以下两个手段降低成本:
- Window:将多条消息聚合为一条批量消息发送至函数计算,减少调用次数,避免重复执行公共计算逻辑;
- Filter:减少无用消息的后续处理,减少调用函数计算的次数;
- 如何保证系统不受业务增长因素影响?
通过下图可发现,kafka topic 的 partition 分区数、Poller 数量、Sink Task 的 worker 数量、函数计算的计算实例数都可实现任意水平扩展,且均可通过配置调整,因此当客户预判到业务增长时,只需修改相应的配置项即可实现水平扩容。
客户业务现状
目前客户已将业务全量迁移到函数计算,迁移后的几个月内仅通过简单修改扩容配置轻松应对业务规模的数倍增长。
最佳实践
下文通过演示一个将 kafka 数据导入到函数计算的 demo,快速搭建一套消息消费系统:
- 创建上游服务登录 kafka 控制台[4]创建 kakfa 实例,并在该实例下创建 topic 和 groupID,可以参考 kakfa 快速入门[5]快速完成此操作。
- 创建下游服务 + 配置数据处理规则
- 创建函数计算的服务,并为服务命名,如下图:
- 在创建的服务下创建一个函数,函数是执行代码的最小单元,如下图:
- 在创建函数页面,为函数命名,并点击触发器配置,其中触发器类型选择 kakfa,将 step1 创建的资源(kakfa 实例、Topic、Group ID )填写到下图中,其他值可使用默认值。
- (可选) 如需要验证攒批功能,可点击批量推送,并配置批量推送条数和批量推送间隔,此 demo 设置批量推送条数为 2 条,批量推送间隔为 10s,如下图:
- 上面流程完成后点击确定即部署成功。
- 编写函数,函数内的逻辑为输出接收到的消息数量和消息内容:
# -*- coding: utf-8 -*- import logging import json def handler(event, context): evt = json.loads(event) logger = logging.getLogger() logger.info(len(evt)) // 输出消息列表的长度 logger.info(evt)。 // 输出消息内容 return 'succ'
- 测试验证
- 到 kafka 控制台的 topic 中快速发送 3 条消息,如下图:
- 预期函数计算会收到 2 次请求,第 1 次请求由于触发推送条数条件包含 2 条消息,第 2 次请求在等待 10s 后触发推送间隔条件包含 1 条消息,如下图:
- 可通过函数日志查看所有请求日志,可以发现一共接收到 3 条消息,如下图:
总结&展望
基于 Serverless 函数计算,您可以快速搭建一套安全可靠的数据消费系统,总结系统优势如下:
- 降本
- Filter:减少无效的消息处理和对函数计算的调用;
- Window:提供消息攒批处理能力,帮助更好处理一些非实时和离散场景下的消息,也减少了对函数计算的调用次数;
- 按需付费:计算资源按需付费的特性避免了波峰波谷场景下为峰值预留机器产生的无用开销;
- 持续降价:函数计算在 11 月份下调全地域全计费项价格,下调幅度达 12%-47%,并对内存和 cpu 做精细化计费;
- 提效
- 研发效率:Transform、UDF、Template、JsonPath 等能力解锁更多业务场景,避免二次开发,助您快速构建系统,未来也会内嵌更丰富的算子,甚至可以编排算子;
- 数据分析效率:提供数值检索、可视化分析等能力,您可以通过简单的引导式交互,即可快速实现基于事件的流式查询与分析;
- 问题排查效率:系统提供丰富的可观测能力,如事件轨迹、事件大盘等助您对业务进行监控和整体状态分析,未来也会从指标探索、运维监控、故障定位等多个维度完善能力,实现更全面的系统可观测性;
- 运维效率:Serverless 计算实例毫秒级自动弹性伸缩的特性让你彻底摆脱资源运维的负担;
随着云计算逐渐走向全面 Serverless 化,Message Service 和 Serverless 计算的连接会更加紧密,如今 Connector 的成熟更加降低了复杂系统的开发门槛,让您真正实现端到端全链路深度上云。