Flink-Kafka-Connector Flink结合Kafka实战

简介: 简介Flink-kafka-connector用来做什么?Kafka中的partition机制和Flink的并行度机制结合,实现数据恢复Kafka可以作为Flink的source和sink任务失败,通过设置kafka的offset来恢复应用kafka简单介绍关于kafka,我们会有专题文章介绍,这里简单介绍几个必须知道的概念。

简介

Flink-kafka-connector用来做什么?

Kafka中的partition机制和Flink的并行度机制结合,实现数据恢复
Kafka可以作为Flink的source和sink
任务失败,通过设置kafka的offset来恢复应用

kafka简单介绍

关于kafka,我们会有专题文章介绍,这里简单介绍几个必须知道的概念。

1.生产者(Producer)

顾名思义,生产者就是生产消息的组件,它的主要工作就是源源不断地生产出消息,然后发送给消息队列。生产者可以向消息队列发送各种类型的消息,如狭义的字符串消息,也可以发送二进制消息。生产者是消息队列的数据源,只有通过生产者持续不断地向消息队列发送消息,消息队列才能不断处理消息。

2.消费者(Consumer)

所谓消费者,指的是不断消费(获取)消息的组件,它获取消息的来源就是消息队列(即Kafka本身)。换句话说,生产者不断向消息队列发送消息,而消费者则不断从消息队列中获取消息。

3.主题(Topic)

主题是Kafka中一个极为重要的概念。首先,主题是一个逻辑上的概念,它用于从逻辑上来归类与存储消息本身。多个生产者可以向一个Topic发送消息,同时也可以有多个消费者消费一个Topic中的消息。Topic还有分区和副本的概念。Topic与消息这两个概念之间密切相关,Kafka中的每一条消息都归属于某一个Topic,而一个Topic下面可以有任意数量的消息。

kafka简单操作

启动zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

启动server: nohup bin/kafka-server-start.sh config/server.properties &

创建一个topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic:bin/kafka-topics.sh --list --zookeeper localhost:2181

发送数据:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

启动一个消费者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

删除topic: bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topn

Flink消费Kafka注意事项

  • setStartFromGroupOffsets()【默认消费策略】

    默认读取上次保存的offset信息
    如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据
    
  • setStartFromEarliest()
    从最早的数据开始进行消费,忽略存储的offset信息
  • setStartFromLatest()
    从最新的数据进行消费,忽略存储的offset信息
  • setStartFromSpecificOffsets(Map)
    从指定位置进行消费
  • 当checkpoint机制开启的时候,KafkaConsumer会定期把kafka的offset信息还有其他operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,重新消费kafka中的数据。
  • 为了能够使用支持容错的kafka Consumer,需要开启checkpoint
    env.enableCheckpointing(5000); // 每5s checkpoint一次

搭建Kafka单机环境

我本地安装了一个kafka_2.11-2.1.0版本的kafka

image

启动Zookeeper和kafka server:

启动zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties  &

启动server: nohup bin/kafka-server-start.sh config/server.properties &

创建一个topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

image

实战案例

Kafka作为Flink Sink

首先pom依赖:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>

向kafka写入数据:

public class KafkaProducer {


    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("test",new SimpleStringSchema(),properties);
/*
        //event-timestamp事件的发生时间
        producer.setWriteTimestampToKafka(true);
*/
        text.addSink(producer);
        env.execute();
    }
}//

大家这里特别注意,我们实现了一个并行度为1的MyNoParalleSource来生产数据,代码如下:

//使用并行度为1的source
public class MyNoParalleSource implements SourceFunction<String> {//1

    //private long count = 1L;
    private boolean isRunning = true;
    
    /**
     * 主要的方法
     * 启动一个source
     * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while(isRunning){
            //图书的排行榜
            List<String> books = new ArrayList<>();
            books.add("Pyhton从入门到放弃");//10
            books.add("Java从入门到放弃");//8
            books.add("Php从入门到放弃");//5
            books.add("C++从入门到放弃");//3
            books.add("Scala从入门到放弃");//0-4
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));

            //每2秒产生一条数据
            Thread.sleep(2000);
        }
    }
    //取消一个cancel的时候会调用的方法
    @Override
    public void cancel() {
        isRunning = false;
    }
}

代码实现了一个发送器,来发送书名等...

然后右键运行我们的程序,控制台输出如下:

image

开始源源不断的生产数据了。

然后我们用命令去查看一下 kafka test这个topic:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

输出如下:

image

Kafka作为Flink Source

直接上代码:

public class KafkaConsumer {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        //从最早开始消费
        consumer.setStartFromEarliest();
        DataStream<String> stream = env
                .addSource(consumer);
        stream.print();
        //stream.map();
        env.execute();

    }
}//

控制台输出如下:

image
将我们之前发往kafka的消息全部打印出来了。

目录
相关文章
|
2月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
442 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
消息中间件 存储 传感器
241 0
|
4月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
272 7
|
6月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
7月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
235 12
|
11月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
488 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
362 1
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
781 0

热门文章

最新文章