一、为什么有了flink的安装攻略还要再专门写1.12.0版本的安装博客
上次我发布了flink 1.7.2的安装方法,但是有同学反馈1.7已经比较老版本了,2020年12月10最新发布的Flink1.12.0版本,我就再写了一篇新版本的安装部署!
Flink1.12.0可以称得上是一个里程碑版本,由近 300 位开发者参与贡献者,提交了超过 1000 多个修复或优化。这些修改极大地提高了 Flink 的可用性,并且简化(且统一)了 Flink 的整个 API 栈。
Flink1.12.0其中一些比较重要的修改包括:
在 DataStream API 上添加了高效的批执行模式的支持。这是批处理和流处理实现真正统一的运行时的一个重要里程碑。
实现了基于Kubernetes的高可用性(HA)方案,作为生产环境中,ZooKeeper方案之外的另外一种选择。
官网地址:Apache Flink: Stateful Computations over Data Streams
二、flink-1.12的Yarn安装
1.下载安装包
Index of /dist/flink
2.上传flink-1.12.0-bin-scala_2.12.tgz到node1的指定目录
3.解压
tar -zxvf flink-1.12.0-bin-scala_2.12.tgz
4.如果出现权限问题,需要修改权限
chown -R root:root /usr/apps/flink-1.12.0
5.改名或创建软链接
#1. 改名 mv flink-1.12.0 flink #2.创建软连接,我们不想改名,又不想输入flink-1.12.0这么长的名字,就可以创建软连接 ln -s /usr/apps/flink-1.12.0 /usr/apps/flink
6、启动flink
bin/start-cluster.sh
7、使用jps可以查看到下面两个进程
- TaskManagerRunner
- StandaloneSessionClusterEntrypoint
8、访问Flink的Web UI
http://node1:8081/#/overview
slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。
9、停止Flink
/usr/apps/flink/bin/stop-cluster.sh
10、集群规划
服务器: node1(Master + Slave): JobManager + TaskManager
服务器: node2(Slave): TaskManager
服务器: node3(Slave): TaskManager
11、修改flink-conf.yaml
vim /usr/apps/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node1 taskmanager.numberOfTaskSlots: 2 web.submit.enable: true #历史服务器 jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/ historyserver.web.address: node1 historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/ #开启HA,使用文件系统作为快照存储 state.backend: filesystem #启用检查点,可以将快照保存到HDFS state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints #使用zookeeper搭建高可用 high-availability: zookeeper # 存储JobManager的元数据到HDFS high-availability.storageDir: hdfs://node1:8020/flink/ha/ # 配置ZK集群地址 high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
12、修改master
1. #vim /usr/apps/flink/conf/masters 2. node1:8081 3. node2:8081
13、修改slaves
#vim /usr/apps/flink/conf/slaves node1 node2 node3
14、添加HADOOP_CONF_DIR环境变量
1. #vim /etc/profile 2. export hadoop_conf_dir=/usr/apps/hadoop/etc/hadoop
15、分发到node2,node3节点
scp -r /usr/apps/flink/ node2:/usr/apps/flink scp -r /usr/apps/flink node3:/usr/apps/flink scp /etc/profile node2:/etc/profile scp /etc/profile node3:/etc/profile 或 for i in {2..3}; do scp -r flink node$i:$PWD; done
16、在各个节点加载一下资源
source /etc/profile
17、修改node2上的flink-conf.yaml
vim /usr/apps/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node2
16、启动集群测试,在node1上执行如下命令
#1.启动hadoop /usr/apps/hadoop/sbin/start-dfs.sh #2.启动Zookeeper /usr/apps/zookeeper/bin/zkServer.sh start #查看zookeeper是否启动:/usr/apps/zookeeper/bin/zkServer.sh status #3.启动Flink /usr/apps/flink/bin/start-cluster.sh #查看flink是否启动 jps
17、报错
Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see //nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/overview/.
因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar
18、下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
放入lib目录:cd /usr/apps/flink/lib
分发
scp -r /usr/apps/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node2:/usr/apps/flink/lib scp -r /usr/apps/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node3:/usr/apps/flink/lib
19、关闭yarn的内存检查
vim /usr/apps/hadoop/etc/hadoop/yarn-site.xml
<!-- 关闭yarn内存检查 --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
分发
scp -r /usr/apps/hadoop/etc/hadoop/yarn-site.xml node2:/usr/apps/hadoop/etc/hadoop/yarn-site.xml scp -r /usr/apps/hadoop/etc/hadoop/yarn-site.xml node3:/usr/apps/hadoop/etc/hadoop/yarn-site.xml
重启yarn
/usr/apps/hadoop/sbin/stop-all.sh /usr/apps/hadoop/sbin/start-all.sh
20、yarn中的会话模式
- 在yarn上启动一个Flink集群/会话,node1上执行以下命令
/usr/apps/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:
申请2个CPU、1600M内存 # -n 表示申请2个容器,这里指的就是多少个taskmanager # -tm 表示每个TaskManager的内存大小 # -s 表示每个TaskManager的slots数量 # -d 表示以后台程序方式运行
注意:该警告不用管 WARN org.apache.hadoop.hdfs.DFSClient - Caught exception java.lang.InterruptedException
访问UI
- 使用flink run提交任务:
/usr/apps/flink/bin/flink run /usr/apps/flink/examples/batch/WordCount.jar
- 关闭yarn-session:
yarn application -kill application_1650268924251_0001
21、yarn中的Job分离模式--企业用的更多
针对每个Flink任务在Yarn上启动一个独立的Flink集群并运行,结束后自动关闭并释放资源,----适用于大任务
- 直接提交job
1. /usr/apps/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 2. /usr/apps/flink/examples/batch/WordCount.jar
参数解释:
# -m jobmanager的地址 # -yjm 1024 指定jobmanager的内存信息 # -ytm 1024 指定taskmanager的内存信息
- 查看UI界面