Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource

代码仓库

会同步代码到 GitHub

https://github.com/turbo-duck/flink-demo

pom.xml

修改pom.xml,需要加入 kafka相关的包,和适配器。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-demo-01</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.2</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>
</project>

编写代码

设置 Kafka 配置

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "0.0.0.0:9092");

创建Kafka消费者


FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);

添加数据源


DataStreamSource<String> data = env.addSource(consumer);


FlatMap


SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
        for (String word : s.split(" ")) {
            collector.collect(Tuple2.of(word, 1L));
        }
    }
});


计算求和


SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOne
    .keyBy(new KeySelector<Tuple2<String, Long>, Object>() {
        @Override
        public Object getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    .sum(1);


全部代码

package icu.wzk.demo04;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class StartApp {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","0.0.0.0:9092");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        DataStreamSource<String> data = env.addSource(consumer);
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
                for (String word : s.split(" ")) {
                    collector.collect(Tuple2.of(word, 1L));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOne
                .keyBy(new KeySelector<Tuple2<String, Long>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Long> value) throws Exception {
                        return value.f0;
                    }
                })
                .sum(1);
        result.print();
        env.execute();
    }

}

KafkaProducer

package icu.wzk.demo04;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class TestKafkaProducer {

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "0.0.0.0:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 500; i++) {
            String key = "key-" + i;
            String value = "value-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("test", key, value);
            producer.send(record);
            System.out.println("send: " + key);
            Thread.sleep(200);
        }
        producer.close();
    }

}

目录
相关文章
|
18天前
|
算法 Java 数据处理
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。HashSet基于哈希表实现,提供高效的元素操作;TreeSet则通过红黑树实现元素的自然排序,适合需要有序访问的场景。本文通过示例代码详细介绍了两者的特性和应用场景。
34 6
|
16天前
|
存储 Java API
深入剖析Java Map:不只是存储数据,更是设计艺术的体现!
【10月更文挑战第17天】在Java编程中,Map是一种重要的数据结构,用于存储键值对,并展现了设计艺术的精髓。本文深入剖析了Map的设计原理和使用技巧,包括基本概念、设计艺术(如哈希表与红黑树的空间时间权衡)、以及使用技巧(如选择合适的实现类、避免空指针异常等),帮助读者更好地理解和应用Map。
53 3
|
2天前
|
存储 分布式计算 Java
存算分离与计算向数据移动:深度解析与Java实现
【11月更文挑战第10天】随着大数据时代的到来,数据量的激增给传统的数据处理架构带来了巨大的挑战。传统的“存算一体”架构,即计算资源与存储资源紧密耦合,在处理海量数据时逐渐显露出其局限性。为了应对这些挑战,存算分离(Disaggregated Storage and Compute Architecture)和计算向数据移动(Compute Moves to Data)两种架构应运而生,成为大数据处理领域的热门技术。
13 2
|
4天前
|
消息中间件 资源调度 Java
用Java实现samza转换成flink
【10月更文挑战第20天】
|
8天前
|
SQL Java OLAP
java实现“数据平滑升级”
java实现“数据平滑升级”
26 2
|
13天前
|
SQL Java 关系型数据库
java连接mysql查询数据(基础版,无框架)
【10月更文挑战第12天】该示例展示了如何使用Java通过JDBC连接MySQL数据库并查询数据。首先在项目中引入`mysql-connector-java`依赖,然后通过`JdbcUtil`类中的`main`方法实现数据库连接、执行SQL查询及结果处理,最后关闭相关资源。
|
18天前
|
Java
Java Set以其“不重复”的特性,为我们提供了一个高效、简洁的处理唯一性约束数据的方式。
【10月更文挑战第16天】在Java编程中,Set接口确保集合中没有重复元素,每个元素都是独一无二的。HashSet基于哈希表实现,提供高效的添加、删除和查找操作;TreeSet则基于红黑树实现,不仅去重还能自动排序。通过这两个实现类,我们可以轻松处理需要唯一性约束的数据,提升代码质量和效率。
28 2
|
20天前
|
算法 Java 数据处理
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其独特的“不重复性”要求,彻底改变了处理唯一性约束数据的方式。
【10月更文挑战第14天】从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其独特的“不重复性”要求,彻底改变了处理唯一性约束数据的方式。本文深入探讨Set的核心理念,并通过示例代码展示了HashSet和TreeSet的特点和应用场景。
16 2
|
9天前
|
SQL Java OLAP
java实现“数据平滑升级”
java实现“数据平滑升级”
7 0
|
18天前
|
缓存 Java 数据处理
java查询大量数据优化
通过结合的高性能云服务,如其提供的弹性计算资源与全球加速网络,可以进一步增强这些优化策略的效果,确保数据处理环节更加迅速、可靠。蓝易云不仅提供稳定的基础架构,还拥有强大的安全防护和灵活的服务选项,是优化大型数据处理项目不可或缺的合作伙伴。
26 0