生产者创建消息。在其他基于发布与订阅的消息系统中,生产者可能被称为发布者 或 写入者。
一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
生产者发送消息的方式
生产者发送消息主要有 2 种方式:同步发送消息、异步发送消息
同步发送消息
同步发送消息:我们调用 KafkaProducer 的 send() 方法发送消息,send() 方法会返回一个包含 RecordMetadata 的 Future 对象,然后调用 Future 的 get() 方法等待 Kafka 响应,通过 Kafka 的响应,我们就可以知道消息是否发送成功。
- 如果服务器返回错误,Future 的 get() 方法会抛出异常。
- 如果没有发生错误,我们会得到一个 RecordMetadata 对象,这个对象包含消息的目标主题、分区信息和消息的偏移量等信息。
我们调用 KafkaProducer 的 send() 方法发送 ProducerRecord 对象,消息先是被放进缓冲区,然后使用单独的线程将消息发送到服务器端。
异常处理
如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。在发送消息之前,生产者也是有可能发生异常的。这些异常有可能是 SerializationException(说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。
KafkaProducer 一般会发生两类错误。
- 其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。
- 另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。
public void send(String topic, String key, String val) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, val);
try {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);
SendResult<String, String> sendResult = future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
异步发送消息
异步发送消息:我们调用 KafkaProducer 的 send() 方法,并指定一个回调方法,在服务器返回响应时调用该方法。
大多数时候,我们并不需要等待响应。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。
为了使用回调,需要一个实现了 org.apache.kafka.clients.producer.Callback 接口的类,这个接口只有一个 onCompletion() 方法。如果 Kafka 返回一个错误,onCompletion() 方法会抛出一个非空异常。通过 onCompletion() 方法抛出的异常,我们可以对发送失败的消息进行处理。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
分区器
介绍分区
ProducerRecord 对象包含目标主题、消息键和值(消息)。
- 如果消息键为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器使用粘性分区策略(UniformSticky),会随机选择一个分区,并尽可能一直使用该分区,等到该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(保证和上一次的分区不同)。
- 如果消息键不为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器会对消息键进行散列(使用 Kafka 自己的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上(散列值 与 主题的分区数进行取余得到 partition 值)。
这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题的所有分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。
只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。一旦主题增加了新的分区,那么键与分区之间的映射关系就改变了。如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。
自定义分区策略
生产者可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
通过分区器实现自定义分区策略的步骤:
- 定义一个类,该类实现 Partitioner 接口(分区器)
- 配置生产者(KafkaProducer),让生产者发送消息时使用自定义的分区器:properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
public class MyPartitioner implements Partitioner {
/**
* 返回信息对应的分区
*
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
if ((keyBytes == null) || (!(key instanceof String))) {
throw new InvalidRecordException("We expect all messages to have String type as key");
}
// 实现自己的分区策略
// 返回数据写入的分区号
return 0;
}
// 关闭资源
@Override
public void close() {
}
// 配置方法
@Override
public void configure(Map<String, ?> configs) {
}
}
参考资料
《Kafka 权威指南》第 3 章:Kafka 生产者——向 Kafka 写入数据