Spark Streaming实时流处理项目实战笔记——Kafka Consumer Java API编程

简介: Spark Streaming实时流处理项目实战笔记——Kafka Consumer Java API编程

1、在控制台创建发送者


kafka-console-producer.sh --broker-list hadoop2:9092 --topic zz
>hello world

2、消费者API


import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class CustomNewConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // 定义kakfa 服务的地址,不需要将所有broker指定上
        props.put("bootstrap.servers", "hadoop2:9092");
        // 制定consumer group
        props.put("group.id", "test");
        // 是否自动确认offset
        props.put("enable.auto.commit", "true");
        // 自动确认offset的时间间隔
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 定义consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 消费者订阅的topic, 可同时订阅多个
        consumer.subscribe(Arrays.asList("zz"));
        while (true) {
            // 读取数据,读取超时时间为100ms
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}



相关文章
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
59 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
117 0
|
11天前
|
存储 JSON API
小红书获取笔记详情API接口的开发、应用与收益。
小红书笔记详情API采用Python与Django框架开发,使用MySQL数据库存储数据。接口通过HTTP GET请求获取笔记详情,返回JSON格式数据,包含笔记内容、作者信息、图片链接等。该API应用于小红书APP内笔记展示和互动功能,并支持第三方平台的内容整合与数据分析,提升用户体验与活跃度,促进品牌合作推广,优化平台运营效率,为平台带来显著收益。
67 1
|
28天前
|
存储 搜索推荐 API
小红书笔记详情API接口的开发、应用与收益
小红书笔记详情API接口为开发者、企业和内容创作者提供了获取平台丰富资源的通道。通过该接口,用户可以提取笔记的详细信息(如标题、正文、标签等),并应用于市场调研、竞品分析、内容创作、电商推荐等多个领域。这不仅有助于提升品牌影响力和优化用户体验,还能挖掘商业机会,促进内容创新,增强用户互动与社群凝聚力。总之,小红书笔记详情API接口为企业和个人在社交媒体领域探索新增长点提供了重要工具。
116 0
|
2月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
3月前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
49 7
|
3月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
49 2
|
3月前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
80 3
|
3月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
88 4
|
3月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
87 2

热门文章

最新文章