如何构建、部署运行Flink程序。

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 如何构建、部署运行Flink程序

一、构建Flink程序


构建一个Flink程序有两种方式


方式一:构建 maven 工程,导入流式应用依赖包


<!-- 基础依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
 <!-- DataStream -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.11.3</version>
  <scope>provided</scope>
</dependency>

方式二:基础环境构建直接使用快捷命令【推荐在Mac或者Linux上使用】


curl https://flink.apache.org/q/quickstart.sh | bash -s 1.11.3
* -s 构建 flink 版本

Flink程序一般的开发步骤

构建完成Flink程序之后就可以开发程序了,开发一个Flink程序的一般步骤:


Obtain an execution environment,(构建流执行环境)

Load/create the initial data,(加载初始化的数据)

Specify transformations on this data,(指定此数据的转换)

Specify where to put the results of your computations,(指定计算结果的放置位置)

Trigger the program execution(触发程序执行)


二、快速上手Flink程序


批处理案例:


//批处理 (DataSet) 支持离线数据
public class WordCount {
    public static void main(String[] args)  throws Exception{
        //创建执行环境
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        //从文件中读取数据
        String inputPath="text.txt";
        DataSet<String> inputDataSet = env.readTextFile(inputPath);
        //对数据集进行处理
       DataSet<Tuple2<String,Integer>>  resultSet = inputDataSet.flatMap(new MyflatMapper())
                //按照第一个位置对word分组
                .groupBy(0)
                //将第二个位置上对数据求和
                .sum(1);
        resultSet.print();
    }
    //自定义类实现FlatMapFunction
    public static class MyflatMapper implements FlatMapFunction<String,Tuple2<String,Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            //按空格分词
            String[] words=value.split(" ");
            //遍历所有ord,包成二元组
            for(String word:words){
                out.collect(new Tuple2<>(word,1));
            }
        }
    }
}

本地运行结果展示:

11.png


流处理案例:

//流处理 (DataStream)支持实时数据
public class StreamWordCount {
    /**
     * @author ZhaoPan
     * @createTime 2022/3/2
     * @description
     */
    public static void main(String[] args) throws Exception {
        //创建流处理环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度 相当于8个线程
        //env.setParallelism(2);
        //从文件中读取数据
    String inputPath="text.txt";
        DataStream<String> inputDataSream = env.readTextFile(inputPath);
        //基于数据流进行转换计算
        DataStream<Tuple2<String, Integer>> resultStream = inputDataSream.flatMap(new WordCount.MyflatMapper())
                .keyBy(0)
                .sum(1);
        resultStream.print();
        //执行任务
        env.execute();
    }
}

本地运行结果:


22.png


三、运行部署Flink程序


此处介绍两种部署Flink程序的方式:


方式一:Standalone 模式 单机【本地测试推荐】【重点】

1、官网下载 flink 包:https://flink.apache.org/downloads.html#update-policy-for-old-releases


33.png


2、解压 flink-1.10.2-bin-scala_2.12 进入到 conf 目录,修改配置

44.png


# jobmanager节点可用的内存大小。
jobmanager.heap.size: 1024m
# The heap size for the TaskManager JVM
# taskmanager节点可用的内存代大小。
taskmanager.heap.size: 1024m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
# 每台机器可用的cpu数量
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
# 默认情况下任务的并行度
parallelism.default: 1
slot 和 parallelism 总结:
1、slot 是静态的概念,是指 taskmanager 具有的并发执行能力
2、parallelism 是动态的概念,是指程序运行实际使用的并发能力
3、设置合适的 parallelism 来提高运算效率(kafka 应用一般和 partition 一一对应或成倍数关系配置)

flink从 1.8.0 版本开始,移除了对 hadoop 版本的依赖,在客户端包中需要提前将 hadoop 依赖添加到 flink 客户端 lib/ 目录下

注意:此处下载完flink对应的tar包后,还需要下载hadoop的jar包,最后将jar包放入lib目录


55.png

3、启动


进入bin目录 键入 ./start-cluster.sh

66.png

4、访问


注:我这里是将服务部署在自己的服务器上,访问的时候通过IP+端口访问,本地的话就是localhost:8081


http://IP地址:8081


至此就可以访问到如下前端页面,可以对 flink 集群和任务进行监控管理。

77.png

5、提交任务


后台命令方式提交:bin/flink run -h

打成jar包,前端提交

88.png

方式二:Yarn


以 Yarn 模式部署 Flink 任务时,要求 Flink 是有 Hadoop 支持的版本,Hadoop环境需要保证版本在 2.2 以上,并且集群中安装有 HDFS 服务。


Flink 提供了两种在 yarn 上运行的模式,分别为 Session-Cluster 和 Per-Job-Cluster模式。


模式一:yarn-session


原理:在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提交。这个 flink 集群会常驻在 yarn集群中,除非手工停止。当资源不足时,后提交的任务会进入等待,直到有任务结束释放资源

适用场景:适合规模小执行时间短的作业

99.png

部署运行步骤:


1、启动 yarn-session


bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
参数解读:
-n(--container):TaskManager的数量。
-s(--slots):每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
-jm:JobManager的内存(单位MB)。
-tm:每个taskmanager的内存(单位MB)。
-nm:yarn 的appName(现在yarn的ui上的名字)。
-d:后台执行。

2、启动任务

./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

模式二:yarn-cluster【日常使用频次最高方式】


原理:提交任务的时候创建新的 Application,用来运行程序,如果没有任务就不用创建

适用场景:大型批任务,复杂性高、数据量大流式任务

00.png

启动任务


./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
68 0
|
3月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
3天前
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
253 22
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
3月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
76 0
|
5月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
336 2
|
20天前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
3月前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
96 1
|
3月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
112 3
|
3月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
69 0
|
3月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
148 0

热门文章

最新文章