Kafka生产者

简介: 生产者创建消息。在其他基于发布与订阅的消息系统中,生产者可能被称为发布者 或 写入者。

生产者创建消息。在其他基于发布与订阅的消息系统中,生产者可能被称为发布者 或 写入者。

一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。

生产者发送消息的方式

生产者发送消息主要有 2 种方式:同步发送消息、异步发送消息

同步发送消息

同步发送消息:我们调用 KafkaProducer 的 send() 方法发送消息,send() 方法会返回一个包含 RecordMetadata 的 Future 对象,然后调用 Future 的 get() 方法等待 Kafka 响应,通过 Kafka 的响应,我们就可以知道消息是否发送成功。

  • 如果服务器返回错误,Future 的 get() 方法会抛出异常。
  • 如果没有发生错误,我们会得到一个 RecordMetadata 对象,这个对象包含消息的目标主题、分区信息和消息的偏移量等信息。

我们调用 KafkaProducer 的 send() 方法发送 ProducerRecord 对象,消息先是被放进缓冲区,然后使用单独的线程将消息发送到服务器端。


异常处理

如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。在发送消息之前,生产者也是有可能发生异常的。这些异常有可能是 SerializationException(说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。

KafkaProducer 一般会发生两类错误。

  • 其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。
  • 另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。
public void send(String topic, String key, String val) {

    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, val);

    try {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);
        SendResult<String, String> sendResult = future.get();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

异步发送消息

异步发送消息:我们调用 KafkaProducer 的 send() 方法,并指定一个回调方法,在服务器返回响应时调用该方法。

大多数时候,我们并不需要等待响应。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。

为了使用回调,需要一个实现了 org.apache.kafka.clients.producer.Callback 接口的类,这个接口只有一个 onCompletion() 方法。如果 Kafka 返回一个错误,onCompletion() 方法会抛出一个非空异常。通过 onCompletion() 方法抛出的异常,我们可以对发送失败的消息进行处理。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。

private class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        }
    }
}

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");

producer.send(record, new DemoProducerCallback());

分区器

介绍分区

ProducerRecord 对象包含目标主题、消息键和值(消息)。

  • 如果消息键为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器使用粘性分区策略(UniformSticky),会随机选择一个分区,并尽可能一直使用该分区,等到该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(保证和上一次的分区不同)。
  • 如果消息键不为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器会对消息键进行散列(使用 Kafka 自己的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上(散列值 与 主题的分区数进行取余得到 partition 值)。

这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题的所有分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。

只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。一旦主题增加了新的分区,那么键与分区之间的映射关系就改变了。如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。

自定义分区策略

生产者可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。

通过分区器实现自定义分区策略的步骤:

  1. 定义一个类,该类实现 Partitioner 接口(分区器)
  2. 配置生产者(KafkaProducer),让生产者发送消息时使用自定义的分区器:properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
public class MyPartitioner implements Partitioner {

    /**
     * 返回信息对应的分区
     *
     * @param topic      主题
     * @param key        消息的 key
     * @param keyBytes   消息的 key 序列化后的字节数组
     * @param value      消息的 value
     * @param valueBytes 消息的 value 序列化后的字节数组
     * @param cluster    集群元数据可以查看分区信息
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        if ((keyBytes == null) || (!(key instanceof String))) {
            throw new InvalidRecordException("We expect all messages to have String type as key");
        }

        // 实现自己的分区策略
        // 返回数据写入的分区号
        return 0;
    }

    // 关闭资源
    @Override
    public void close() {
    }

    // 配置方法
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

参考资料

《Kafka 权威指南》第 3 章:Kafka 生产者——向 Kafka 写入数据

相关文章
|
7月前
|
消息中间件 大数据 Kafka
【Kafka】Kafka 中生产者运行流程
【4月更文挑战第10天】【Kafka】Kafka 中生产者运行流程
|
7月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
80 0
|
7月前
|
消息中间件 缓存 Kafka
探究Kafka原理-5.Kafka设计原理和生产者原理解析(下)
探究Kafka原理-5.Kafka设计原理和生产者原理解析
83 0
|
7月前
|
消息中间件 存储 负载均衡
探究Kafka原理-5.Kafka设计原理和生产者原理解析(上)
探究Kafka原理-5.Kafka设计原理和生产者原理解析
105 0
|
7月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
494 4
|
1月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
68 2
|
2月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
41 1
|
3月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
4月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
5月前
|
消息中间件 存储 缓存
深入理解Kafka核心设计及原理(二):生产者
深入理解Kafka核心设计及原理(二):生产者
92 8