Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计

简介: Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计

Flink是什么

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。


Flink的特点

支持事件时间(event-time)和处理时间(processing-time)语义

精确一次(exactly-once)的状态一致性保证

低延迟,每秒处理数百万个事件,毫秒级延迟

与众多常用存储系统的连接

高可用,动态扩展,实现7*24小时全天候运行

Flink的全球热度


image.pngFlink可以实现的目标

低延迟 来一次处理一次


高吞吐


结果的准确性和良好的容错性


基于流的世界观

在Flink的世界观中,一切皆有流组成,就如python中的一切皆对象的概念。对应离线的数据,则规划为有界流;对于实时的数据怎规划为没有界限的流。也就是Flink中的有界流于无界流

有开始也有结束的确定在一定时间范围内的流称为有界流。一旦确定就不会再改变,一般 批处理 用来处理有界数据。

无界流就是持续产生的数据流,数据是无限的,有开始,无结束,一般 流处理 用来处理无界数据


image.pngimage.pngimage.png

package com.yo.wc;
/**
 * created by YO
 */
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;
// 批处理word count
public class WordCount {
    public static void main(String[] args) throws Exception{
        // 创建执行环境,类似与spark的创建上下文
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 从文件中读取数据    这里可以随意指定路径,txt文件写入空格隔开的随意单词即可
        String inputPath = "D:\\hello.txt";
        //read读取数据,可以指定读取的文件类型,整套批处理的api在flink里面就叫做dataset
        //dataset是flink针对离线数据的处理模型
        DataSet<String> inputDataSet = env.readTextFile(inputPath);
        // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
        DataSet<Tuple2<String, Integer>> result = inputDataSet.flatMap(new MyFlatMapper())
                .groupBy(0)    // 按照第一个位置的word分组
                .sum(1);    // 将第二个位置上的数据求和
        result.print();
    }
    // 自定义类,实现FlatMapFunction接口  输出是String  输出是元组Tuple2<String, Integer>>是flink提供的元组类型
    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        //value是输入,out就是输出的数据
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            // 按空格分词
            String[] words = value.split(" ");
            // 遍历所有word,包成二元组输出
            for (String word : words) {
                out.collect(new Tuple2<>(word, 1));  
            }
        }
    }
}
输出:          文本内的单词不同输出也不同
(scala,1)
(flink,1)
(world,1)
(hello,4)

image.pngimage.pngimage.png

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.net.URL;
public class StreamWordCount {
    public static void main(String[] args) throws Exception{
        // 创建流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 用parameter tool工具从程序启动参数中提取配置项 ,这里就是从main方法中获取参数了args,可以在集群运行,这里再IDEA传参模拟
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");
        // 从socket文本流读取数据
        DataStream<String> inputDataStream = env.socketTextStream(host, port);
        // 基于数据流进行转换计算
        DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
                .keyBy(0)
                .sum(1);
        resultStream.print();
        // 执行任务
        env.execute();
    }
}

image.pngFlink的第一课入门到这里就完成了,同学们有遇到问题可直接私信,博主会尽力解答!



相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
841 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
308 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
4月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
534 0
|
2月前
|
存储 消息中间件 人工智能
云栖实录|实时计算 Flink 全新升级 - 全栈流处理平台助力实时智能
本文根据 2025 云栖大会演讲整理而成,演讲信息如下 演讲人:黄鹏程 阿里云智能集团计算平台事业部实时计算Flink版产品负责人
259 1
云栖实录|实时计算 Flink 全新升级 - 全栈流处理平台助力实时智能
|
9月前
|
SQL 算法 调度
Flink批处理自适应执行计划优化
本文整理自阿里集团高级开发工程师孙夏在Flink Forward Asia 2024的分享,聚焦Flink自适应逻辑执行计划与Join算子优化。内容涵盖自适应批处理调度器、动态逻辑执行计划、自适应Broadcast Hash Join及Join倾斜优化等技术细节,并展望未来改进方向,如支持更多场景和智能优化策略。文章还介绍了Flink UI调整及性能优化措施,为批处理任务提供更高效、灵活的解决方案。
401 0
Flink批处理自适应执行计划优化
|
Java 流计算
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
205 2
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
268 0

热门文章

最新文章