【Flink】Flink Java 统计词频 开发

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【1月更文挑战第26天】【Flink】Flink Java 统计词频 开发

在当今数据风暴的时代,实时数据处理已经成为众多企业关注的热点。Apache Flink作为一个高性能、可扩展的实时计算框架,在实时数据处理领域占据着举足轻重的地位。Flink底层是 以Java编写的,并为开发人员同时提供了完整的Java和Scala API。  

  1. 开发环境:

在开始之前,请确保你的开发环境已经安装了以下软件:

  • JDK 1.8或更高版本
  • Maven 3.x
  • Apache Flink 1.x

在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink程序了。首先我 们要做的,就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。

  1. Maven环境准备:

配置基础的Maven环境:

在项目的pom文件中,增加标签设置属性,然后增加标签引 入需要的依赖。我们需要添加的依赖最重要的就是 Flink 的相关组件,包括 flink-java、 flink-streaming-java,以及 flink-clients(客户端,也可以省略)。

<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

在属性中,我们定义了,这指代的是所依赖的Scala版本。这有一点 奇怪:Flink底层是Java,而且我们也只用Java API,为什么还会依赖Scala呢?这是因为Flink 的架构中使用了Akka来实现底层的分布式通信,而Akka是用Scala开发的。

搭好项目框架,接下来就是我们的核心工作——往里面填充代码。我们会用一个最简单的 示例来说明Flink代码怎样编写:统计一段文字中,每个单词出现的频次。这就是传说中的 WordCount程序——它是大数据领域非常经典的入门案例,地位等同于初学编程语言时的 Hello World。  

单词计数:使用批处理的方式统计词频:

对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一 个文本文档,然后读取这个文件处理数据就可以了

public class WorldCount {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境:
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.从文件中读取数据:
        DataSource<String> source = env.readTextFile("E:\\back_end\\demo\\src\\main\\resources\\data\\data_txt.txt");
        //3.将每个单词提取出来,进行分词,转换成为一个二元组:
        FlatMapOperator<String, Tuple2<String, Long>> wordAndTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 将一行文本进行拆分,转换为二元组:
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
            //指定数据返回值类型:
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.按照word进行分组:
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndTuple.groupBy(0);//给定元组索引位置
        //5.分组内进行聚合统计:
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        //6.打印输出:
        sum.print();
    }
}

单词计数:使用流处理的方式统计词频:

我们已经知道,用DataSet API可以很容易地实现批处理;与之对应,流处理当然可以用 DataStream API 来实现。对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之 后的DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。

对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我 们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的 ——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。  下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建一个流式的执行环境:
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.读取文件:
        DataStreamSource<String> source = executionEnvironment.readTextFile("src/main/resources/data/data_txt.txt");
        //3.转换计算:
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.分组操作:
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //5.求和:
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        //6.输出:
        sum.print();
        //7.启动执行:
        executionEnvironment.execute();
    }
}

时时监听端口输入:

public class StreamWordCountTrue {
    public static void main(String[] args) throws Exception {
        //1.创建流式执行环境:
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //从参数中提取主机名:
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        String hostname = fromArgs.get("host");
        Integer port = fromArgs.getInt("port");
        //2.监听端口,时时获取数据:
        DataStreamSource<String> source = executionEnvironment.socketTextStream(hostname, port);
        //3.对获取到的文本流进行处理:
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.分组操作:
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //5.求和:
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        //6.输出:
        sum.print();
        //7.启动执行:
        executionEnvironment.execute();
    }
}

IDEA配置输入参数:

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
23天前
|
Java API Maven
如何使用Java开发抖音API接口?
在数字化时代,社交媒体平台如抖音成为生活的重要部分。本文详细介绍了如何用Java开发抖音API接口,从创建开发者账号、申请API权限、准备开发环境,到编写代码、测试运行及注意事项,全面覆盖了整个开发流程。
69 10
|
21天前
|
监控 Java API
如何使用Java语言快速开发一套智慧工地系统
使用Java开发智慧工地系统,采用Spring Cloud微服务架构和前后端分离设计,结合MySQL、MongoDB数据库及RESTful API,集成人脸识别、视频监控、设备与环境监测等功能模块,运用Spark/Flink处理大数据,ECharts/AntV G2实现数据可视化,确保系统安全与性能,采用敏捷开发模式,提供详尽文档与用户培训,支持云部署与容器化管理,快速构建高效、灵活的智慧工地解决方案。
|
11天前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
27 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
3天前
|
算法 Java API
如何使用Java开发获得淘宝商品描述API接口?
本文详细介绍如何使用Java开发调用淘宝商品描述API接口,涵盖从注册淘宝开放平台账号、阅读平台规则、创建应用并申请接口权限,到安装开发工具、配置开发环境、获取访问令牌,以及具体的Java代码实现和注意事项。通过遵循这些步骤,开发者可以高效地获取商品详情、描述及图片等信息,为项目和业务增添价值。
32 10
|
6天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
22天前
|
开发框架 Java 关系型数据库
Java哪个框架适合开发API接口?
在快速发展的软件开发领域,API接口连接了不同的系统和服务。Java作为成熟的编程语言,其生态系统中出现了许多API开发框架。Magic-API因其独特优势和强大功能,成为Java开发者优选的API开发框架。本文将从核心优势、实际应用价值及未来展望等方面,深入探讨Magic-API为何值得选择。
29 2
|
23天前
|
监控 前端开发 Java
【技术开发】接口管理平台要用什么技术栈?推荐:Java+Vue3+Docker+MySQL
该文档介绍了基于Java后端和Vue3前端构建的管理系统的技术栈及功能模块,涵盖管理后台的访问、登录、首页概览、API接口管理、接口权限设置、接口监控、计费管理、账号管理、应用管理、数据库配置、站点配置及管理员个人设置等内容,并提供了访问地址及操作指南。
|
22天前
|
IDE Java 编译器
开发 Java 程序一定要安装 JDK 吗
开发Java程序通常需要安装JDK(Java Development Kit),因为它包含了编译、运行和调试Java程序所需的各种工具和环境。不过,某些集成开发环境(IDE)可能内置了JDK,或可使用在线Java编辑器,无需单独安装。
48 1
|
29天前
|
SQL 安全 Java
安全问题已经成为软件开发中不可忽视的重要议题。对于使用Java语言开发的应用程序来说,安全性更是至关重要
在当今网络环境下,Java应用的安全性至关重要。本文深入探讨了Java安全编程的最佳实践,包括代码审查、输入验证、输出编码、访问控制和加密技术等,帮助开发者构建安全可靠的应用。通过掌握相关技术和工具,开发者可以有效防范安全威胁,确保应用的安全性。
48 4
|
1月前
|
安全 Java 测试技术
Java开发必读,谈谈对Spring IOC与AOP的理解
Spring的IOC和AOP机制通过依赖注入和横切关注点的分离,大大提高了代码的模块化和可维护性。IOC使得对象的创建和管理变得灵活可控,降低了对象之间的耦合度;AOP则通过动态代理机制实现了横切关注点的集中管理,减少了重复代码。理解和掌握这两个核心概念,是高效使用Spring框架的关键。希望本文对你深入理解Spring的IOC和AOP有所帮助。
35 0