Pulsar-Producer实现简介

简介: “Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.” Pulsar是pub-sub模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。

“Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.”

Pulsar是pub-sub模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。

Pulsar由雅虎开发并开源的下一代消息系统,目前是Apache软件基金会的孵化器项目。

本片文章简单介绍Pulsar的Producer,包含以下内容:

  • Producer的设计
  • 消息发送的实现

1. Producer设计

1.1 创建Producer

以上是Pulsar官网上创建一个Producer的示例代码。

创建的过程如下:

  1. 指定serviceUrl创建PulsarClient
  2. 指定Producer发送消息的Topic,通过PulsarClient创建Producer

通过上述的创建代码可以推测:

  1. serviceUrl应该是用于做服务发现的,通过serviceUrl查找Broker的信息
  2. Producer指定了Topic,那么一个Producer只能往特定的Topic发送消息

1.2 Producer API

Pulsar中,发送相关的接口为Producer,如上图所示:

  • Producer定义了发送接口
  • ProducerBase作为抽象类,提供了基础实现
  • ProducerImpl则是真正的实现类
  • PartitionedProducerImpl看着和分区相关,这个之后再看

Producer 接口具体如下:

public interface Producer<T> extends Closeable {
    /**
     * 返回Producer发送消息的Topic
     */
    String getTopic();
    /**
     * Producer的名称
     */
    String getProducerName();
    /**
     * 同步发送消息
     */
    MessageId send(T message) throws PulsarClientException;
    /**
     * 有发送消息
     */
    CompletableFuture<MessageId> sendAsync(T message);
    /**
     * Flush客户端完成中的消息并等待所有消息成功持久化
     * @since 2.1.0
     * @see #flushAsync()
     */
    void flush() throws PulsarClientException;
    /**
     * 异步Flush客户端完成中的消息并等待所有消息成功持久化
     * @since 2.1.0
     * @see #flush()
     */
    CompletableFuture<Void> flushAsync();
    /**
     * 创建TypedMessageBuilder,用于构建消息
     */
    TypedMessageBuilder<T> newMessage();
    /**
     * 同步发送消息,已经被弃用
     */
    @Deprecated
    MessageId send(Message<T> message) throws PulsarClientException;
    /**
     * 异步发送消息,已经被弃用
     */
    @Deprecated
    CompletableFuture<MessageId> sendAsync(Message<T> message);
    /**
     * 获取Producer发送的最后一个序列号
     */
    long getLastSequenceId();
    /**
     * 获取Producer的统计信息
     */
    ProducerStats getStats();
    /**
     * 异步关闭Producer并且释放资源
     */
    CompletableFuture<Void> closeAsync();
    /**
     * 返回Producer是否连接到Broker上
     */
    boolean isConnected();
}

通过Producer接口可以看出Pulsar Producer提供的能力:

  • 同步发送消息
  • 异步发送消息
  • 一个Producer只能向一个特定的Topic发送消息(Producer#topic()返回了一个Topic,说明Producer会绑定到一个Topic上)
  • 批量发送(flush方法说明了应该是支持批量的,消息会在客户端内存中保存)
  • 包含了sequenceId是否可以做幂等之类的事情?
  • 统计能力

1.3 ProducerBase

ProducerBase作为抽象类,实现了Producer接口。

ProducerBase包含四个属性:

  • producerCreatedFuture:异步创建Producer的Future
  • conf:Producer的配置
  • schema:消息相关的Schema信息
  • interceptors:Producer的拦截器,在发送前后插入一些操作

producerCreatedFuture

重命名上看这个属性是用于异步创建Producer。

但是在一个基类中提供异步创建实体的Future显得比较难理解。一般的编程思路会在基类中定义一些基础的公共的属性,用于保存状态或者配置,比如conf。这里的producerCreatedFuture实际用于PartitionedProducerImpl异步创建多个Producer,这个后续再看。

conf

ProducerConfigurationData提供了Producer相关的配置信息,包含是否批量发送、内存缓存消息的大小、发送的Timeout等。

schema

Schema指明了消息的格式,通过Schema完成对消息的encode和decode。

interceptors

ProducerInterceptor是Producer提供的拦截器,包含两个方法:beforeSend、onSendAcknowledgement,分别用于在发送前和发送后插入行为。

1.4 ProducerImpl

ProducerImpl继承了ProducerBase,是Producer接口的核心实现。

ProducerImpl在ProducerBase的基础上增加了大量的属性,包含:

  • producerId:通过AtomicLong生成的进程内唯一的Producer ID
  • msgIdGenerator:消息ID
  • pendingMessages:内存中缓存的消息
  • pendingCallbacks:内存中缓存的消息对应的Callback
  • timeout:发送的超时配置
  • batchMessageContainer:批量消息的容器
  • producerName:全局唯一的Producer名称
  • 等等...(在后续发送实现中介绍相关的属性)

ProducerImpl实现了具体的发送行为,比如同步发送、异步发送(后续在消息发送的实现部分介绍)。

1.5 PartitionedProducerImpl

Producer提供的发送相关的API定义,ProducerBase提供了基础实现,ProducerImpl提供了具体的实现,那么PartitionedProducerImpl做什么?

通过PartitionedProducerImpl的属性可以看到内部包含了一个ProducerImpl列表,那么可以PartitionedProducerImpl和ProducerImpl是一个组合的关系。

通过start方法可以看出,PartitionedProducerImpl根据对应的topicMetadata的分区数创建了对应数量的ProducerImpl实例(这里也说明了ProducerBase中producerCreatedFuture的用途)。

为什么在PartitionedProducerImpl中需要创建一组ProducerImpl实例?

PartitionedProducerImpl另外增加了一个routerPolicy属性,其接口为:

public interface MessageRouter extends Serializable {

    @Deprecated
    default int choosePartition(Message<?> msg) {
        throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");
    }

    default int choosePartition(Message<?> msg, TopicMetadata metadata) {
        return choosePartition(msg);
    }

}

通过接口的定义不难理解MessageRouter接口实现Message和Partition的映射。

通过internalSendAsync方法的实现可以看出,发送消息时通过routerPolicy将消息映射到Partition,通过Partition选择对应的Producer执行发送,那么久解释了为什么在PartitionedProducerImpl会创建和对应Topic的分区数相同的ProducerImpl。

通过以上内容,能总结出Producer模块的各个类的职责:

  • Producer:定义发送接口,用户使用的核心API
  • ProducerBase:Producer接口的基础实现
  • ProducerImpl:实现具体的发送行为,一个ProducerImpl只能向一个Topic写入消息
  • PartitionedProducerImpl:整合多个ProducerImpl,用于向多分区发送消息的场景

2. 消息发送的实现

在对Producer模块有个整体的认识后,后续内容具体阐述一条消息的发送流程。

在消息系统中,从Producer的视角看,一条消息写入过程一般包含:

  1. 消息校验
  2. 消息属性增强(添加一些必要的系统属性)
  3. 消息路由(选择目标分区)
  4. 消息序列化
  5. 消息数据写入网络
  6. 等待写入结果响应
  7. 返回写入结果

下面将通过ProducerImpl的实现来了解Pulsar的Producer发送消息的过程。

2.1 寻址

要发送一条消息,除了校验消息是否合法,首先要这条消息的写入目标(通过路由找到消息目标的Partition)。

在ProducerImpl的构造方法最后一行调用了grabCnx()方法创建了链接(构建了链接的上下文)。

grabCnx方法通过PulsarClient创建Connection,而PubsarClient内部则通过LookupService接口来完成Topic到Broker的映射并建立链接。

LookupService接口提供了BinaryProtoLookupService和HttpLookupService实现,通过LookupService用户也可以实现自己的服务发现模块。

2.2 消息发送

发送消息的调用链如上图所示,最终通过ProducerImpl的internalSendAsync将消息发送出去。无论同步发送还是异步发送,最终都会通过异步的方式执行发送(同时只是在异步的基础上等待发送结果),这里可以看到Pulsar Producer在API实现上比较注重代码的复用性即API的最小功能原则。

以单挑消息发送为例,sendAsync的具体实现如下:

  1. 在必要的校验后,将消息包装成OpSendMsg对象(包含异步发送完成后的Callback)
  2. 将消息添加到pendingMessages
  3. 通过Connection的EventLoop执行发送操作

ProducerImpl将在ackReceived方法中处理服务端对写入消息的响应,通过消息的sequenceId来识别对应的OpSendMsg,并调用对应Callback来执行回调逻辑。实际在Callback完成了响应用户的操作及发送行为的一些统计。

ProducerImpl只会建立一个链接,且发送和ACK都是通过synchronized执行的,所以中间通过pendingMessages来完成消息发送和响应的对应,以及超时的处理。这块具体可以看一下代码实现。

总结

本文介绍了Pulsar Producer模块的设计,包含各个类的职责,并简单介绍了消息的发送过程。Puslar Producer在设计上和RocketMQ的思想差异还是比较大的,比如Puslar Producer会将Producer对应到分区上,每个分区有自己的Producer,这样可以比较容易完成一些幂等之类的操作。

如果本文对您有帮助,点一下右下角的“推荐”
目录
相关文章
|
消息中间件 存储 Kubernetes
kafka/pulsar on k8s
kafka/pulsar on k8s
kafka/pulsar on k8s
|
存储 消息中间件 监控
Pulsar 介绍与部署
Pulsar 介绍与部署
4153 0
Pulsar 介绍与部署
|
消息中间件 存储 缓存
一文了解Kafka的消息收集器RecordAccumulate
一文了解Kafka的消息收集器RecordAccumulate
140 1
|
消息中间件 关系型数据库 MySQL
Logstash接收Kafka数据写入至ES
Logstash接收Kafka数据写入至ES
300 0
|
网络协议 算法 Java
聊聊 Pulsar: Pulsar 分布式集群搭建
聊聊 Pulsar: Pulsar 分布式集群搭建
731 0
|
存储 消息中间件 Cloud Native
分布式消息队列Pulsar
分布式消息队列Pulsar
238 0
|
消息中间件 存储 运维
系列二:次时代Kafka与Pulsar该如何选择?
感谢大家支持,目前新书已上架各大线上平台!! 多谢开发者社区对此的支持。感谢机械工业出版社编辑老师长期的指导。感谢Tencent同事们的指点与陪伴。
1106 0
|
消息中间件 存储 Kafka
通过 KoP 将 Kafka 应用迁移到 Pulsar
KoP(Pulsar on Kafka)通过在 Pulsar Broker 上引入 Kafka 协议处理程序,为 Apache Pulsar 带来原生 Apache Kafka 协议支持。 通过将 KoP 协议处理程序添加到您现有的 Pulsar 集群,您可以将现有的 Kafka 应用程序和服务迁移到 Pulsar,而无需修改代码。 这使 Kafka 应用程序能够利用 Pulsar 的强大功能,
411 0
|
消息中间件 存储 缓存
CTO问我Pulsar到底能不能取代Kafka
CTO问我Pulsar到底能不能取代Kafka
219 0
CTO问我Pulsar到底能不能取代Kafka
|
消息中间件 存储 数据采集
从Kafka到Pulsar:数据流演进之路
从Kafka到Pulsar:数据流演进之路
213 0
从Kafka到Pulsar:数据流演进之路