在当今数据风暴的时代,实时数据处理已经成为众多企业关注的热点。Apache Flink作为一个高性能、可扩展的实时计算框架,在实时数据处理领域占据着举足轻重的地位。Flink底层是 以Java编写的,并为开发人员同时提供了完整的Java和Scala API。
- 开发环境:
在开始之前,请确保你的开发环境已经安装了以下软件:
- J
DK 1.8
或更高版本 Maven 3.x
Apache Flink 1.x
在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink
程序了。首先我 们要做的,就是在IDEA
中搭建一个Flink
项目的骨架。我们会使用Java
项目中常见的Maven
来进行依赖管理。
Maven
环境准备:
配置基础的Maven
环境:
在项目的pom文件中,增加标签设置属性,然后增加标签引 入需要的依赖。我们需要添加的依赖最重要的就是 Flink 的相关组件,包括 flink-java、 flink-streaming-java,以及 flink-clients(客户端,也可以省略)。
<flink.version>1.13.0</flink.version> <scala.binary.version>2.12</scala.binary.version>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
在属性中,我们定义了,这指代的是所依赖的Scala版本。这有一点 奇怪:Flink底层是Java,而且我们也只用Java API,为什么还会依赖Scala呢?这是因为Flink 的架构中使用了Akka来实现底层的分布式通信,而Akka是用Scala开发的。
搭好项目框架,接下来就是我们的核心工作——往里面填充代码。我们会用一个最简单的 示例来说明Flink代码怎样编写:统计一段文字中,每个单词出现的频次。这就是传说中的 WordCount程序——它是大数据领域非常经典的入门案例,地位等同于初学编程语言时的 Hello World。
单词计数:使用批处理的方式统计词频:
对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一 个文本文档,然后读取这个文件处理数据就可以了
public class WorldCount { public static void main(String[] args) throws Exception { // 1.创建执行环境: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.从文件中读取数据: DataSource<String> source = env.readTextFile("E:\\back_end\\demo\\src\\main\\resources\\data\\data_txt.txt"); //3.将每个单词提取出来,进行分词,转换成为一个二元组: FlatMapOperator<String, Tuple2<String, Long>> wordAndTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> { // 将一行文本进行拆分,转换为二元组: String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } //指定数据返回值类型: }).returns(Types.TUPLE(Types.STRING, Types.LONG)); //4.按照word进行分组: UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndTuple.groupBy(0);//给定元组索引位置 //5.分组内进行聚合统计: AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1); //6.打印输出: sum.print(); } }
单词计数:使用流处理的方式统计词频:
我们已经知道,用DataSet API可以很容易地实现批处理;与之对应,流处理当然可以用 DataStream API 来实现。对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之 后的DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。
对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我 们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的 ——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。 下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。
public class StreamWordCount { public static void main(String[] args) throws Exception { //1.创建一个流式的执行环境: StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); //2.读取文件: DataStreamSource<String> source = executionEnvironment.readTextFile("src/main/resources/data/data_txt.txt"); //3.转换计算: SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)); //4.分组操作: KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0); //5.求和: SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1); //6.输出: sum.print(); //7.启动执行: executionEnvironment.execute(); } }
时时监听端口输入:
public class StreamWordCountTrue { public static void main(String[] args) throws Exception { //1.创建流式执行环境: StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); //从参数中提取主机名: ParameterTool fromArgs = ParameterTool.fromArgs(args); String hostname = fromArgs.get("host"); Integer port = fromArgs.getInt("port"); //2.监听端口,时时获取数据: DataStreamSource<String> source = executionEnvironment.socketTextStream(hostname, port); //3.对获取到的文本流进行处理: SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)); //4.分组操作: KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0); //5.求和: SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1); //6.输出: sum.print(); //7.启动执行: executionEnvironment.execute(); } }
IDEA配置输入参数: