大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)

接上篇:https://developer.aliyun.com/article/1622681?spm=a2c6h.13148508.setting.17.27ab4f0ek8nPMY

运行测试

结果数据

查看 word-count/word-count-result.csv 打开即可看到以下内容:

Stateful 1
any 1
common 1
computations 2
on 1
setup 1
state 1
streams. 1
unbounded 1
& 3
Data 2
DataStream 1
High-availability 1
for 1
perform 1
run 1
to 1
Event-time 1
Flexible 1
Sophisticated 1
framework 1
is 1
scale. 1
Exactly-once 1
ProcessFunction 1
Stream 1
a 1
been 1
handling 1
in 1
late 1
processing 2
Batch 1
DataSet 1
at 2
bounded 1
consistency 1
deployment 1
distributed 1
engine 1
has 1
API 2
Apache 1
Flink 2
SQL 1
Streams 1
all 1
designed 1
over 2
Computations 1
Savepoints 1
and 3
data 2
environments, 1
in-memory 1
speed 1
stateful 1
(Time 1
Correctness 1
State) 1
cluster 1
guarantees 1

单词统计(流数据)

需求说明

Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5秒)的数据进行聚合统计,每隔1秒汇总计算一次,并且把时间窗口内计算结果打印出来。

编写代码

Server部分

编写一个Socket服务,提供一定的数据流

package icu.wzk;


import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

public class WordCountServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        String ip = "localhost";
        int port = 9999;
        Random random = new Random();
        ServerSocket serverSocket = new ServerSocket();
        InetSocketAddress address = new InetSocketAddress(ip, port);
        serverSocket.bind(address);
        Socket socket = serverSocket.accept();
        OutputStream outputStream = socket.getOutputStream();
        PrintWriter writer = new PrintWriter(outputStream, true);
        for (int i = 0; i < 1000; i ++) {
            int number = random.nextInt(100);
            System.out.println(number);
            writer.println(number);
            Thread.sleep((random.nextInt(900) + 100));
        }
        socket.close();
        serverSocket.close();
    }

}

Flink部分

连接到上述的Server部分

package icu.wzk;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class WordCount2 {

    public static void main(String[] args) throws Exception {
        String ip = "localhost";
        int port = 9999;

        // 获取 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取 Socket 输入数据
        DataStreamSource<String> textStream = env.socketTextStream(ip, port, "\n");
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = textStream
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] splits = value.split("\\s");
                        for (String word : splits) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                });
        SingleOutputStreamOperator<Tuple2<String, Integer>> word = wordCount
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .sum(1);

        // 输出并运行
        word.print();
        env.execute("Word Count");
    }

}

观察结果

Server部分

35
18
84
72
24
51
15
13
65
98
55
68
22
84
17

Flink部分

3> (35,1)
4> (18,1)
3> (35,1)
5> (84,1)
4> (18,1)
6> (72,1)
3> (35,1)
5> (84,1)
5> (24,1)
3> (35,1)
6> (72,1)
4> (18,1)
7> (51,1)
5> (24,1)
5> (84,1)
4> (15,1)
6> (72,1)
7> (51,1)
3> (35,1)
4> (15,1)
4> (18,1)

运行结果过程截图如下所示:

过程总结

  • 获得一个执行环境
  • 加载、创建 初始化环境
  • 指定数据操作的算子
  • 指定结果数据存放位置
  • 调用Execute触发执行程序

注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正的触发执行程序。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
205 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
577 7
|
2月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
71 2
|
16天前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试
|
2月前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
124 1
|
3天前
|
数据采集 存储 分布式计算
解密大数据:从零开始了解数据海洋
解密大数据:从零开始了解数据海洋
38 17
|
2月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
224 61
zdl
|
2月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
189 56
|
2月前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
140 4
|
2月前
|
存储 大数据 数据管理
大数据分区简化数据维护
大数据分区简化数据维护
35 4