Pulsar Consumer实现介绍

简介: Pulsar-Consumer “Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.” Pulsar是pub-sub模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。

Pulsar-Consumer

“Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.”

Pulsar是pub-sub模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。

Pulsar由雅虎开发并开源的下一代消息系统,目前是Apache软件基金会的孵化器项目。

本片文章简单介绍Pulsar的Consumer,包含以下内容:

  • Consumer的体系
  • 消费逻辑的实现

1. Consumer体系

A consumer is a process that attaches to a topic via a subscription and then receives messages.

Consumer通过订阅关系绑定到Topic(和Producer类似,都是绑定到一个Topic上),并接收消息。

Consumer支持:

  • 同步接收消息:阻塞用户线程等待消息
  • 异步接收消息:异步等待消息(通过Future返回消息)
  • 通过MessageListener返回消息:接收消息后回调用户的MessageListener

Consumer提供了三类获取消息的方式,其中异步的方式包含通过Future异步等待消息和通过MessageListener被动接收消息。MessageListener和另外两种方式是互斥的,一旦Consumer注册了MessageListener接口,则必须通过MessageListener处理消息,主动触发receive获取消息将抛出异常。

Consumer的继承关系:

  • Consumer:定义了消费者相关的接口
  • ConsumerBase:接口中基础方法的实现,抽象类
  • ConsumerImpl:在ConsumerBase基础上的Consumer具体实现
  • MultiTopicsConsumerImpl:组合多个ConsumerImpl完成对多Topic/Partition的消费

Consumer的设计和Producer是一致的,通过接口定义行为,基础类实现基本能力,在通过组合的方式来实现消费多个Topic/Partition(Producer则是像多个Topic发送消息)。

1.1 消费进度提交

Consumer处理消息后需要发送acknowledgement到Broker,这样Broker可以丢弃消息(应该是移动消费offset的操作,类似RocketMQ,并不是真正的删除消息)。支持单挑消息提交或者批量提交,批量提交则以最后一条消息的offset为准。(只是记录一个offset比较某个位置之前的消息都已经被Consumer处理,所以批量提交其实只是把最大的offset提交)

1.2 订阅模型

订阅模型决定了消息时如何被投递给Consumer的。在Pulsar中,订阅模型有: exclusive、shared、 failover。

Exclusive

只能有一个Consumer绑定到订阅关系上,其他Consumer尝试绑定到订阅关系上时会报错(Exclusive是默认的订阅模型)。

Shared

在Shared模型中,多个Consumer可以绑定到一个订阅关系上。消息按照round-robin模式被投递给各个Consumer。若某个Consumer宕机,被投递给该Consumer的未被ACK(没有acknowledgement)的消息将被重新投递给其他的Consumer进行消费。

Shared模式带来的限制:

  1. 消息时按照round-robin模式投递给各个Consumer的,所以消息顺序无法得到保障
  2. 同样因为round-robin模式,无法使用批量提交acknowledgement的功能(如上图Consumer C-3如果提交了m4会导致m3被标记为已经消费,但实际Consumer C1可能还没处理m3)

failover

在Failover模型中,多个Consumer可以绑定到一个订阅关系上,但是只有一个称为Master Consumer的消费者能消费消息。对多个Consumer按照name进行排序,第一个Consumer则为Master Consumer。

在Master Consumer失效(比如断开连接)后,Master Consumer未提交的消息和后续的消息会提交给后续的Consumer。

2. 消费逻辑的实现

Consumer获取消息的核心API有以下两个,分别实现同步获取消息和异步获取消息:

/**
     * Receives a single message.
     * <p>
     * This calls blocks until a message is available.
     *
     * @return the received message
     * @throws PulsarClientException.AlreadyClosedException
     *             if the consumer was already closed
     * @throws PulsarClientException.InvalidConfigurationException
     *             if a message listener was defined in the configuration
     */
    Message<T> receive() throws PulsarClientException;

    /**
     * Receive a single message
     * <p>
     * Retrieves a message when it will be available and completes {@link CompletableFuture} with received message.
     * </p>
     * <p>
     * {@code receiveAsync()} should be called subsequently once returned {@code CompletableFuture} gets complete with
     * received message. Else it creates <i> backlog of receive requests </i> in the application.
     * </p>
     *
     * @return {@link CompletableFuture}<{@link Message}> will be completed when message is available
     */
    CompletableFuture<Message<T>> receiveAsync();

MessageListener则通过ConsumerBuilder接口进行设置并传入到Consumer的构造方法中。

这三个API都由ConsumerImpl#messageReceived触发,即Consumer接收到消息后根据请求的类型来决定:

  • 同步获取消息的,将消息放入内存队列,被挂起的线程会从队列中获取消息
  • 异步获取消息的,执行callback将消息放入future
  • 通过MessageListener处理消息的,通过ListenerExecutor执行逻辑

可见Pulsar在消费模式上处理是统一的,即无论客户端采用何种方式进行消息的接收,消息统一由服务端进行“推送”,而在Consumer内部根据用户请求的类型进行处理。

通过ConsumerImpl#messageReceived的实现可以发现Pulsar的消息消费是一种“推”的模型,这和RocketMQ的“拉”的模型差异是很大的(RocketMQ采用一种Long-Polling的方式,由Consumer主动发起请求从服务端获取数据,若服务端有需要处理的消息,请求立即返回;如果没有消息,这个请求会在服务单阻塞一段时间,直到新消息到达或者请求即将超时,返回给客户端)。

Consumer获取消息的模型

具体看Pulsar-Consumer获取消息的代码实现会发现它也不是一种纯粹的,类似淘宝Notify的推的模式,而是一种推拉结合的方式,示意如下:

  1. Consumer向Broker发送FLOW请求,通知Broker可以推送消息给Consumer
  2. Broker将消息通过MESSAGE请求将消息推送给Consumer

这是一个反复的过程,每次Consumer接收消息处理后都会继续发送FLOW请求给Broker。

这是在RocketMQ或者Kafka的设计中都没有的一种方式,这种方式进行一定的拓展则可以实现类似akka的Dynamic Push/Pull模式(详见公众号历史文章:《Push or Pull?》)。

在阅读Pulsar Consumer部分代码的时候还发现非常有趣的一点,当你搜索“Consumer”时会出现一个Consumer接口和一个Consumer类:

  • 接口: org.apache.pulsar.client.api.Consumer
  • 类: org.apache.pulsar.broker.service.Consumer

Consumer接口是Client模块定义Consumer行为的,为什么在Broker模块会有一个Consumer类?

实际在Broker端会给链接上来的Consumer构造一个对应的Consumer对象,维护远端的Consumer的链接等信息。所有对远端的Consumer的操作会封装在Broker端的Consumer中。这样可以更好的实现代码的可插拔性,降低耦合,提升代码的可测试性。比如在测试Broker端的逻辑时,只需要Mock一个Consumer类来模拟各种正常和网络异常的情况,而不需要真正的启动一个Consumer。

总结

本文主要是介绍一下Pulsar Consumer模块的相关概念和一些模型,没有深入的解读代码实现。Pulsar Consumer的实现方式还是非常有趣的,和大家比较熟悉的RocketMQ的实现方式差异比较大,值得一读。

如果本文对您有帮助,点一下右下角的“推荐”
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 Kubernetes
kafka/pulsar on k8s
kafka/pulsar on k8s
kafka/pulsar on k8s
|
存储 消息中间件 监控
Pulsar 介绍与部署
Pulsar 介绍与部署
4269 0
Pulsar 介绍与部署
|
8月前
|
Kubernetes 固态存储 容器
pulsar on k8s
pulsar on k8s
|
消息中间件 存储 算法
聊聊 Kafka: Consumer 源码解析之 Consumer 如何加入 Consumer Group
聊聊 Kafka: Consumer 源码解析之 Consumer 如何加入 Consumer Group
1095 0
|
消息中间件 存储 缓存
一文了解Kafka的消息收集器RecordAccumulate
一文了解Kafka的消息收集器RecordAccumulate
178 1
|
网络协议 算法 Java
聊聊 Pulsar: Pulsar 分布式集群搭建
聊聊 Pulsar: Pulsar 分布式集群搭建
1048 0
|
消息中间件 缓存 容灾
Apache Kafka-通过设置Consumer Group实现广播模式
Apache Kafka-通过设置Consumer Group实现广播模式
1957 0
|
消息中间件 存储 运维
系列二:次时代Kafka与Pulsar该如何选择?
感谢大家支持,目前新书已上架各大线上平台!! 多谢开发者社区对此的支持。感谢机械工业出版社编辑老师长期的指导。感谢Tencent同事们的指点与陪伴。
1388 0
|
消息中间件 存储 Kafka
通过 KoP 将 Kafka 应用迁移到 Pulsar
KoP(Pulsar on Kafka)通过在 Pulsar Broker 上引入 Kafka 协议处理程序,为 Apache Pulsar 带来原生 Apache Kafka 协议支持。 通过将 KoP 协议处理程序添加到您现有的 Pulsar 集群,您可以将现有的 Kafka 应用程序和服务迁移到 Pulsar,而无需修改代码。 这使 Kafka 应用程序能够利用 Pulsar 的强大功能,
473 0
|
NoSQL API Redis
Pulsar 也会重复消费?
排查了一个问题: 在使用 Pulsar 消费时,发生了同一条消息反复消费的情况。