Kafka生产者同步和异步的JavaAPI代码演示

简介: Kafka生产者同步和异步的JavaAPI代码演示

生产者API文档

http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html.


版本说明

Kafka 0.10.0.0 及以后的版本,对生产者代码的底层实现进行了重构。

katka.producer.Producer类被org.apache.kafka.clients.producer.KafkaProducer类替换。

注意:在开发中,直接使用kafka的新版本API: org.apache.kafka.clients.producer.KafkaProducer作为生产者即可! 千万不要使用以前的katka.producer.Producer

同步和异步

Kafka 系统支持两种不同的发送方式–同步模式(Sync)和异步模式(ASync)

  • 同步模式

  • 异步模式

导入Maven的pom依赖

 <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * 演示使用kafka生产者向topic中发送消息
 * kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning  group.id=test_topic_group --topic test_topic
 *
 * @author Zsorrain
 */
public class MyKafkaProducer {
    public static void main(String[] args) throws Exception {
        //TODO 1.准备连接参数
        Properties props = new Properties();
        //指定kafka的broker地址

        /*
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");   ==   props.put("bootstrap.servers","node1:9092");这两个语句用哪个都行

        因为ProducerConfig.BOOTSTRAP_SERVERS_CONFIG    会自动调用bootstrap.servers
        底层执行代码:public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
        */
        props.put("bootstrap.servers", "node1:9092");  //"node1:9092"需要修改为自己连接的服务器地址以及对应的Kafka端口号
        //消息确认机制
        //acks=0,意思就是我的KafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在哪怕Partition Leader上落到磁盘,我就不管他了,直接就认为这个消息发送成功了
        //acks=1,意思就是说只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。
        //acks=all/-1,意思就是说Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。
        //all即所有副本都同步到数据时send方法才返回, 以此来完全判断数据是否发送成功, 理论上来讲数据不会丢失
        props.put("acks", "all");
        //retries和retries.backoff.ms决定了重试机制,也就是如果一个请求失败了可以重试几次,每次重试的间隔是多少毫秒
        props.put("retries", 0);
        props.put("retries.backoff.ms", 20);
        //buffer.memory
        //Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲的,
        //也就是说,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,这样性能才可能高。
        //buffer.memory的本质就是用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB,满了就会阻塞用户线程,不让继续往Kafka写消息了。
        props.put("buffer.memory", 33554432);
        // batch.size是producer批量发送的基本单位,默认是16384Bytes,即16kB;KafkaProducer的Sender线程会把Batch打包成Request发送到Kafka服务器上去
        props.put("batch.size", 16384);
        // lingger.ms是指一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。
        //举个例子,首先假设你的Batch是16KB,那么你得估算一下,正常情况下,一般多久会凑够16KB(1个Batch),比如正常来说可能20ms就会凑够一个Batch。
        //那么你的linger.ms可以设置为25ms,也就是说,正常来说,大部分的Batch在20ms内都会凑满,但是你的linger.ms可以保证,哪怕遇到低峰时期,20ms凑不满一个Batch,还是会在25ms之后强制Batch发送出去。
        //那么producer是按照batch.size大小批量发送消息呢,还是按照linger.ms的时间间隔批量发送消息呢?--满足batch.size和ling.ms之一,producer便开始发送消息
        props.put("linger.ms", 25);
        //这个参数决定了每次发送给Kafka服务器请求的最大大小,同时也会限制你一条消息的最大大小也不能超过这个参数设置的值
        props.put("max.request.size", 163840);
        //k-v序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //TODO 2.根据参数创建producer生产者连接对象
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        //TODO 3.异步发送
        for (int i = 0; i < 10; i++) {//循环发送10条消息到Kafka
            //将需要发送到Kafka的消息封装为record对象
            ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key_" + i, "value_" + i);
            //异步发送消息,传入需要发送的record,和该record真正发送成功后的需要执行回调函数
            producer.send(record, new Callback() {
                //Callback是一个接口,使用匿名内部类重写Callback类中的onCompletion方法
                //onCompletion方法会在record真正发送成功后执行
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (metadata != null) {
                        //异步发送,record真正发送成功后才会执行该方法,所以可以在该方法里面获取到metadata
                        System.out.println("异步发送后获得分区为 :" + metadata.partition() + " ,同步发送后获得offset为 :" + metadata.offset());
                    }
                }
            });
        }
        System.out.println("这条消息在异步发送代码最后,但是最先被打印,说明异步消息在会返回结果后在执行发送信息");

        //TODO 3.同步发送
        for (int i = 10; i < 20; i++) {//循环发送10条消息到Kafka
            //将需要发送到kafka的消息封装到record对象
            ProducerRecord<String, String> record = new ProducerRecord<>("order", "key_" + i, "value_" + i);
            //同步发送消息,并返回消息的元数据,如消息发送到哪个partition(分区)了,offset(偏移量)是多少
            RecordMetadata metadata = producer.send(record).get();
            System.out.println("同步发送后获得分区为 :" + metadata.partition() + " ,同步发送后获得offset为 :" + metadata.offset());
        }
        System.out.println("这条消息在同步发送代码最后,最后被打印,说明同步同步消息执行发送消息完返回成功结果");

        //TODO 4.关闭资源
        producer.close();
    }
}

目录
相关文章
|
3月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
161 2
|
2月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
109 2
|
3月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
47 2
|
3月前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
80 3
|
3月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
60 1
|
3月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
61 1
|
3月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
87 2
|
3月前
|
消息中间件 大数据 Java
大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients
大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients
45 2
|
4月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
76 0

热门文章

最新文章