开发者学堂课程【消息队列 RocketMQ 5.0 云原生架构升级课程:RocketMQ 5.0 中的消费者类型和最佳实践】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/1234/detail/18403
Rocket MQ5.0中消费者类型和最佳实践
内容介绍
一、课前介绍
二、消费者类型介绍
三、Push consumer
四、Simple consumer
五、Pull Consumer
六、三种消费者类型的异处
一、课前介绍
这里是阿里云消息队列公开课,然后今天的议题是Rocket MQ的消费者类型以及最佳实践。
首先会向大家去简单的介绍一下就是Rocket MQ5.0中现存的三种不同的 consumer 类型,对此有一个抽象的大概的了解,然后再具体而言,每个版块再具体介绍三个不同的consumer的具体的使用方法以及最佳实践,那接下来就开始讲具体的内容。
二、消费者类型介绍
1、总述
在Rocket MQ5.0中,去强化了三种不同的消费者类型,其实在现在的Rocket MQ4.0中也是有消费者类型这样的一个概念的。比如很熟悉的这个 Push Consumer 和 Pull Consumer 都是已经存在的消费者类型。在Rocket MQ5.0就更加去强化这个消费者类型的一个概念。 Rocket MQ本身设计是面向一种多样的复杂的业务场景,不同的customer类型其实是用来满足不同的业务场景的。
2、Push Consumer
Push Consumer是一种全托管的消费类型,用户其实只需要去注册对应的消息监听器就可以了,它也是与业务集成最为普遍的一个消费者类型。在使用Push Consumer的时候,用户只需要去注册一个 Listener 就可以了,在 listener 接口中去完成自己的消费逻辑。对于从服务端到达客户端的具体的消息的情况用户不需要关注,所有的逻辑都已经在客户端本身完成封装了。
3、Simple consumer
Simple Consumer 其实是 Rocket MQ5.0中新提出来的一种 customer 类型,它去解耦了消息的消费以及消费进度的同步,用户需要自己去主动接受来自于服务端的消息,然后手动去对消息进行确认。然后它与 Push Consumer 比较类似的一点是它也是由服务端来托管消费进度的,即同一个 consumer group 下这个 Simple Consumer 的消息消费到何处也是由服务端去进行记录的。然后它适用于用户需要自主去控制消费速率的这种业务场景。举例而言,其实消息在进入 Listener 的时候是逐条进行投递的,而在 Simple Consumer 中用户主动 receive 的时候,它可以去批量的进行 receive 的操作。因此在用户需要自主去控制速率这种场景下,Simple Consumer 是非常有用处的。
4、Pull Consumer
Pull Consumer 在welcomes中也是有的,在 Pull Consumer 中用户是以队列的维度去操作这些消息的。在Rocket MQ中队列是最小的一个逻辑组成单位,在 Pull Consumer 中所有的消息都是按照队列进行接收的,这样同时又需要用户去关心整个消息的位点和进度的同步,当然具体而言也可以选择自动或者手动的方式来进行提交。相对而言 Pull Consumer 是一个需要用户自己去管理的消费者类型,它是在流处理框架中更加常用的一种消费者类型。
接下来会具体深入到每一个Push Consumer,给大家展示一下不同的消费者类型,包括它是如何使用的以及它最佳的实践方式。
三、Push Consumer
1、基本介绍
Push Consumer 可能是大家最熟悉的一个 consumer 类型,此前也提到它是一种全托管的消费者,全托管的意思就是用户其实是不需要去关心整个消息的接收的,只需要关心本身消息的接收过程和消费过程就可以了。
2、使用简介
除此之外,所有逻辑都已经在消费者实践中完成封装,用户只需要根据每一条收到的消息返回不同的消费结果就可以了。所以它是一种最为朴实的消费者类型,用户可以根据自己业务逻辑处理结果的不同,进而返回 success 或者 filer。前者就表现为消费成功,后者表现为消费失败。其实每一个 Consumer Group 会有最大消费次数的概念。如果消费失败了,按照正常逻辑来说,它会有一个重投的逻辑,会隔一段时间之后重新来到这个消费队列。如果消费次数超过了最大限度的话,它就不会再投到 Listener 中,而是相反就会进入死信队列。在 Push Consumer 中用户只需要实现 Listener 的消费逻辑就可以了,具体而言其实就是实现这个 Message Listener 中的这个 consumer 接口就可以了。
3、代码事例
这里是一个 Push Consumer 的一个代码事例,大家可以看到首先只需要去设置对应的订阅关系,其中的 topic 以及订阅的是否为 tag 订阅,然后去注册 Message Listener 这样的接口,启动 Push Consumer 其实就可以了。
实际上在 Push Consumer 的实现中,用户其实不需要关心消息的拉取,但是这并不代表拉取的逻辑是不存在的。实际上对于无法确定如何实现这一方面,它是将这个逻辑给删除了,同时为了保证消息到达客户端的及时性。其实本身客户端也做了一些服务端消息的缓存,比如消息它可能是先从服务端拉到客户端,在客户端进行缓存,缓存完成之后再进入 Listener 进行消费的。
这个时候为了避免缓存的消息过多而导致客户端内存泄露,也提供了一些参数,用户可以自己去调节。比如可以是消息本身在客户端里面缓存最多的条数,也可以是最多缓存的字节数量,这个都是用户可以自行调整的。然后可以看到 Push Consumer Build 中有对应的方法。比如Set Max Cached 的大小、数量以及 Bytes,一个是最多能缓存多少条消息,一个是最多能缓存多少自己的消息。只要获得其中一个条件,客户端就不会再去缓存消息了,直到这个消息被消费成功,缓存有所减少,它才会重新的去缓存来自于服务端的消息。
4、最佳实践
(1)首先用户只需要实现这个 Listener 中的逻辑就可以了。在绝大多数情况下,Message Listener 其实是应该返回消费成功的,而如果有大量的消费失败,这是一个不会被 Push Consumer 所推崇的做法。
(2)另外一点就是尽量去捕获任何可能在消息消费过程中发生的一个异常,让一系列行为变得可预期。
(3)不宜长期 block 消费线程,因为实际上在 Push Consumer 中消费线程的数目也是有限的。对于一些消费耗时长的业务场景,比如有的用户可能得到一条消息之后,可能就会经历一个比较长的消费逻辑去操作,这时候可以先行去提交消费成功,再一步的进行业务处理。即在拿到一条消息之后,可以把这条消息丢到一个单独的业务形成池里面进行处理,而对 Push Consumer Listener返回一个消费成功。本身这条消息不会占用这个消费线程,同时这个消息也能得到比较好的处理。
(4)最后一点就是在 Push Consumer 中,其实消费线程也是可以进行调整的。对于一些 IO 密集型的应用,比如可能它没办法去做,也有可能没有办法去做一个很好的一步化处理。这个时候可以适当的去调大一些消费线程数,比如消费时间就是比较长,在调大了消费线程数之后,就可以很大程度上提高消费速度,但这个要根据用户自己的业务需求以及机器的配置来确定具体的消费线程数。
四、Simple consumer
1、基本介绍
Simple consumer 是在 Rocket MQ5.0中新引入的一种消费者类型,它是一种半托管的消费者,需要使用者自行去调用 Simple consumer#receive 方法获取来自于服务端的消息,并且根据自己的业务需求来选择是调用 ack 方法还是 change Invisible Duration 的方法。对于ack 的方法和change Invisible Duration 的方法,会进行介绍进而帮助大家理解。
2、方法介绍
receive方法是通过长轮询接收来自于服务端的消息,具体的长轮询时间可以使用 simple computer builder#set Await Duration来进行设置。
而且用户在收到对应的消息之后,需要根据自己业务处理结果的不同对消息进行不同的处理,也就是对应上面的 ack 和进行change Invisible Duration 的方法。相比较于Push Consumer 而言,Simple Consumer 用户可以自主的去控制整个消息接收的节奏。比如用户 receive 的时候可以选择对应的 batch 大小,用户可以批量的进行处理选择。
Simple Comer 实际上每次消息的接收都是按照 topic 的虚拟分区来发起请求的,比如用户群体发起一次 receive 的请求,它可能是针对一个分区的,具体分区可能会是有一个算法。如果实际的 topic 分区会比较多,这个时候其实比较建议使用 Simple Comer 的时候去并发的调用这个 receive 方法,来保证消息的及时性。有的消息可能分布在这个分区中,有的消息可能分布在另一个分区中,如果这个时候去并发调用的话,可以保证比较好的及时性。另外的话就是在并发 Simple consume 的 receive 方法的时候,也需要考虑到整个业务逻辑的处理能力,所以具体到底开多少的并发度,需要综合自己的业务处理能力。
在这里对 simple consumer 的ack 方法和 change Invisible Duration 方法做一个大致的讲解。比如有一个simple consumer,它的 Consumer Group 的对于消息最大的消费次数是三条,用户在使用receive 方法收到第一条消息之后,这个消息的消费次数当然是一了。receive 方法其实本质上会有一个 Invisible Duration 的设置,在调研 receive 方法的时候其实是可以看到的。它实际上就是一个消息的一个不可见时间,如果用户在不可见时间之内没有做任何操作的话,这个消息就会进行二次的重投。
重投之后消息的消费次数就变成了二,用户这个时候可以选择 ack 消息,或者是选择调用 change Invisible Duration 的方法。如果用户这个时候选择了ack的话,就代表这条消息被消费成功了,那它的位点也就会被同步到服务端。如果这个时候用户选择了 change Invisible Duration 的话,相当于这个消息的这个不可见时间又给用户修改,直到这个时间过完之后,消息才会进行下一次的重投。
假如用户这个时候使用了 change Invisible Duration 的方法,然后消息已经重投到了第三次,这时候的消费次数是三的话,可以注意到消息的最大的消费次数已经是三条了,这个时候用户也可以选择去调用 ack 方法或者是调用 change Invisible Duration 的方法,如果用户这时候选择用ack 方法的话,相当于被消费成功了,这条消息的生命周期也就结束了。如果用户这个时候选择调用change Invisible Duration 的方法,这时候其实它的效果PushConsumer里面的返回消费失败是类似的,这时候相当于它就要进入死信队列了。因为这个时候其实已经是消息的第三次投递了,它与Consumer Group 设计的最大消费次数已经是相同的了,所以可以看到用户使用 change Invisible Duration 接口里面的Invisible Duration 其实和 receive 里面的 Invisible Duration 是一个概念,本质上是消息一个不可见的时间窗口。超过了这个时间窗口,如果消息还没有到达它的最大的消费次数的话,这条消息就会进行再一次的重投,否则的话它就会进入死信队列。
这就是simple consumer 里面的ack 方法和 change Invisible Duration方法的一个简单的介绍。那接下来总结一下simple consumer 的最佳实践。
3、最佳实践
(1)Simple Consumer 需要对收到的消息进行逐条的确认或者修改下次可见时间。关于这里的逐条确认或者修改下次可见时间可以把它认为是 Push Consumer 的消费成功或者失败来进行处理。
(2)关于每条消息的 ack 方法和change Invisible Duration,需要在消息最新的可见时间到来之前进行调用。此前说到这个Invisible Duration,其实它相当于一个不可见的时间窗口,ack 方法和 Invisible Duration,如果是针对当前收到这一次消息进行调用的话,需要在最新的可见时间到来之前进行调用。
(3)第三点就是前面提到的simple consumer,其实在 receive 的时候它是会按照分区来进行调用的。如果想要保证消息接收的及时性,需要去有针性的去增大调用 receive 方法的并发度,但具体而言要根据这个消息的处理能力进行并发调用。
(4)最后一点,就是 Simple Consumer 对于一些用户需要自主去控制消息获取速率或者批量处理消息业务场景里面,它会是一个比 Push Consumer 更好的选择。因为这个Push Consumer里面的 Listener 是面向于单调消息去做的,而Simple Consumer 用户可以去控制bytes,而且是自主的去进行消息接受的。
五、Pull Consumer
1、基本介绍
Pull Consumer 其实也是在Rocket MQ4.0里面就已经存在的一种消费者类型,也是一直以来都支持的一种消费者类型。Rocket MQ5.0全新的 Pull Consumer API还在演进中,敬情期待。接下来的内容会Rocket MQ4.0里面的 Pull Consumer API来进行讲述。
2、使用原理
在 Rocket MQ 中无论是消息的发送和接收,都是通过队列来进行的。一个 topic由若干个队列组成,消息本身也是按照队列的形式来进行组织的。队列天然它会有这种先进先出的特性,同一个队列中的消息会拥有不同的位点,先到达这个队列的消息就会拥有比后到达这个队列的消息更小的位点。
所以说位点的大小是跟随消息到达服务的时间逐次递增的,而且本质上不同 Consumer Group 在服务端的消费进度就是一个个对立中的位点信息,客户端将自己的消费进度同步给服务端,本质上就是在提交一个个消息的位点。
3、对应接口方法
接下来来看一下就是Rocket MQ4.0中 pull consumer 对应的接口中主要的方法。
(1)注册消息路由
首先它会有一个注册消息路由监听器的接口,在这个接口里面,用户需要填对应的 topic,然后以及实现对应的这样的一个界定级方法。
这个方法就是说用户如果想要去订阅某一个 topic 的消息,这个时候因为它所有的这种请求都是针对于队列进行的,用户需要去感知这个队列的变化,这个时候用户就可以通过使用这个接口来达到目的。
(2)assign 方法
另外的话它还会有 assign 方法,就相当于说拿到队列之后,将这个队列绑定到目前的 consumer。如果要消费哪个队的消息,就将这个队列绑定到当前的 consumer,然后再使用这个 pull方法来进行消息详细的过序。因为这些消息需要去同步位点,如果想去查询服务端现在队列对应的消费位点已经到了哪一步,这时候就可以调用对应的 Commit 方法来获取每一个队列提交到服务端的位点。位点的同步也未必是全部手动的,也可以将它设置成自动的模式,这个时候可以使用 set Auto Commit 方法去选择自动提交位点。所谓的自动提交位点就是说消息一旦到达客户端就认为位点就被提交了,被客户端消费了,也可以去选择去自己手动的提交位点,这时候也就是个Commit Sync 的方法。
所以 Pull Consumer 是根据队列来获取消息,然后再通过位点去更新消费进度的消费者类型。在 Pull Consumer 里面,将队列这个概念完整地暴露给客户,用户可以根据自己关系的topic设置鉴定器感知路由的变化,然后去将它绑定给当前的消费者。当用户使用pull这个方法的时候,它其实就是在从已经 assign好的队列里面去获取消息。如果设置Auto Commit 的话,客户端就会自动进行位点的提交和消费进度这样的同步,否则用户也可以选择去手动进行提交。
4、总结
可以看到整个 Pull Consumer 它是面向于队列去设计的,也利用到了队列先进先出的特性,整个复杂度相来说比 Push Consumer 和 Simple Consumer还是更高的,所以 Pull Consumer相对来说,它会在这个流计算框架里面是应用的比较广泛的。
Pull Consumer是用户自己进行管理的一种消费类型,需要手动的去获取消息并提交消费进度。而且 Pull Consumer 是以流计算框架进行集成的一个首选的消费类型,获取消息以及为您提交更多的实际上是由流计算框架来进行决定的。
六、三种消费者类型的异处
通过今天的内容了解到 Rocket MQ 三种不同的消费者类型,分别是Push Consumer,Pull Consumer 以及 Simple Consumer,每一种消费者类型的设计其实都是为了满足不同的业务场景。
比如Push Consumer 它就是最朴实的一种业务处理场景,Simple Consumer更面向于用户需要自己去控制消费进度和消费数据的业务场景,而Pull Consumer它更适用于一种与流计算相结合的这样的业务场景,每一种不同的 Consumer 都有自己独到的业务场景。