TensorFlow 与 ApacheFlink 的结合(二)| 学习笔记

简介: 快速学习 TensorFlow 与 ApacheFlink 的结合。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 TensorFlow 与 ApacheFlink 的结合(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10040


TensorFlow 与 ApacheFlink 的结合(二)


三、TensorFlow On Flink(TensorFlow 如何在 Flink 集群上运行)

1.TensorFlow 分布式运行

image.png

TensorFlow 分布式训练一般分为 worker 和 ps 角色。worker 负责机器学习计算,ps 负责参数更新。

2.TensorFlow Batch 训练运行模式 image.png

Batch 模式下,样本数据可以是放在 HDFS 上的,对于 Flink 作业而言,它会起一个source 的 operator,然后 TensorFlow 的 work 角色就会启动。如上图所示,如果 worker 的角色有三个节点,那么 source 的并行度就会设为 3。同理下面 ps 角色有 2 个,所以 ps source 节点就会设为 2。

而 Application Manager 和别的角色并没有数据交换,所以 Application Manager 是单独的一个节点,因此它的 source 节点并行度始终为 1。这样 Flink 作业上启动了三个 worker 和两个 ps 节点,worker 和 ps 之间的通讯是通过原始的 TensorFlow 的 GRPC 通讯来实现的,并不是走 Flink 的通信机制。

image.png

3.TensorFlow stream 训练运行模式

如上图所示,前面有两个 source operator,然后接 join operator,把两份数据合并为一份数据,再加自定义处理的节点,生成样本数据。在 stream 模式下,worker 的角色是通过 UDTF 或者 flatmap 来实现的。

同时,TensorFlow worker node 有3 个,所以 flatmap 和 UDTF 相对应的 operator 的并行度也为 3, 由于ps 角色并不去读取数据,所以是通过 flink source operator 来实现。如果已经训练好的模型,如何去支持实时的预测。

 image.png

4.使用 Python 进行预测

使用 Python 进行预测流程如图所示,如果 TensorFlow 的模型是分布式训练出来的模型,并且这个模型非常大,比如说单机放不下的情况,一般出现在推荐和搜索的场景下。那么实时预测和实时训练原理相同,唯一不同的地方是多了一个加载模型的过程。

在预测的情况下,通过读取模型,将所有的参数加载到 ps 里面去,然后上游的数据还是经过和训练时候一样的处理形式,数据流入到 worker 这样一个角色中去进行处理,将预测的分数再写回到 flink operator,并且发送到下游 operator。

5.使用 Java 进行预测

image.png

如图所示,模型单机进行预测时就没必要再去起 ps 节点,单个 worker 就可以装下整个模型进行预测,尤其是使用 TensorFlow 导出 save model。

同时,因为 saved model 格式包含了整个深度学习预测的全部计算逻辑和输入输出,所以不需要运行 Python 的代码就可以进行预测。此外,还有一种方式可以进行预测。前面 source、join、UDTF 都是对数据进行加工处理变成预测模型可以识别的数据格式,在这种情况下,可以直接在 Java 进程里面通过 TensorFlow Java API,将训练好的模型 load 到内存里,这时会发现并不需要 ps 角色, worker 角色也都是 Java 进程,并不是 Python 的进程,所以我们可以直接在 Java 进程内进行预测,并且可以将预测结果继续发给 Flink 的下游。

6.TensorFlow Example

代码示例:

import tensorflow as tf

cluster = tf.train.ClusterSpec({

"worker"": [

"A_IP:2222",

"B_IP:1234",

"C_IP:2222"]

"ps": [

"D_IP:2222",

]]}'

isps = False

if isps:

server = tf.train.Server(cluster, job_name="ps', task_index=0)

server.join()

else:

server = tf.train.Server(cluster, job_name='worker', task_index=0)

withtf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:0', cluster=cluster))

w = tf.get_variable('w',(2,2), tf.float32, initializer=tf.constant_initializer(2))

b = tf.get_variable('b',(2, 2), tf.float32, initializer=tf.constant_initializer(5))

addwb = w + b

mutwb = w  

bdivwb = w / b 

7.tensorflow add example

python code:

import tensorflow as tf

import time

import sys

from flink_ml_tensorflow.tensorflow_context import TFContext

def build_graph():

global a

i = 1

a = tf.placeholder(tf.float32, shape=None, name="a")

b = tf.reduce_mean(a, name="b")

r_list = []

v = tf.Variable(dtype=tf.float32, initial_value=tf.constant(1.0), name="v_" + str(i))

c = tf.add(b, v, name="c_" + str(i))

add = tf.assign(v, c, name="assign_" + str(i))

sum = tf.summary.scalar(name="sum_" + str(i), tensor=c)

r_list.append(add)

global_step = tf.contrib.framework.get_or_create_global_step()

global_step_inc = tf.assign_add(global_step, 1)

r_list.append(global_step_inc)

return r_list

def map_func(context):

tf_context = TFContext(context)

job_name = tf_context.get_role_name()

index = tf_context.get_index()

cluster_json = tf_context.get_tf_cluster()

cluster = tf.train.ClusterSpec(cluster=cluster_json)

server = tf.train.Server(cluster, job_name=job_name, task_index=index)

sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False,

device_filters=["/job:ps", "/job:worker/task:%d" % index])

t = time.time() 

f 'ps' == job_name:

from time import sleep

while True:

sleep(1) 

else:

with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:' + str(index), cluster=cluster)):

train_ops = build_graph()

hooks = [tf.train.StopAtStepHook(last_step=2)]

with tf.train.MonitoredTrainingSession(master=server.target, config=sess_config,

checkpoint_dir="./target/tmp/s1/" + str(t),

hooks=hooks) as mon_sess:

while not mon_sess.should_stop():

print (mon_sess.run(train_ops, feed_dict={a: [1.0, 2.0, 3.0]}))

sys.stdout.flush()

在分布式运行的时候,需要配置一个地址去协调通讯,启动一个测试的 service。

8. TensorFlow Failover

image.png

如果有些节点失败了,可以重启整个 cluster,然后自动重新运行。如下图所示,

image.png

如果有节点失败以后变成 failed 的状态,就会重启整个 restart cluster 让整个作业重新运行,实现了自动的 failover 技术应用场景:搜索排序和推荐系统去运行

github 中有详细的介绍,包括如何去安装配置

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
缓存 TensorFlow 算法框架/工具
TensorFlow学习笔记(一): tf.Variable() 和tf.get_variable()详解
这篇文章详细介绍了TensorFlow中`tf.Variable()`和`tf.get_variable()`的使用方法、参数含义以及它们之间的区别。
513 0
|
TensorFlow 算法框架/工具
Tensorflow学习笔记(二):各种tf类型的函数用法集合
这篇文章总结了TensorFlow中各种函数的用法,包括创建张量、设备管理、数据类型转换、随机数生成等基础知识。
587 0
|
机器学习/深度学习 TensorFlow 算法框架/工具
Whale 基于 Tensorflow 深度学习分布式训练框架|学习笔记
快速学习 Whale 基于 Tensorflow 深度学习分布式训练框架。
873 0
Whale 基于 Tensorflow 深度学习分布式训练框架|学习笔记
|
人工智能 TensorFlow 算法框架/工具
人工智能|Tensorflow-2.0学习笔记:基础操作篇一
人工智能|Tensorflow-2.0学习笔记:基础操作篇一
330 0
|
机器学习/深度学习 存储 算法
TensorFlow 实现图像分类|学习笔记
快速学习 TensorFlow 实现图像分类。
293 0
TensorFlow 实现图像分类|学习笔记
|
Ubuntu TensorFlow 算法框架/工具
最新 Tensorflow 2.2极简安装教程 | 学习笔记
快速学习最新 Tensorflow 2.2极简安装教程
最新 Tensorflow 2.2极简安装教程 | 学习笔记
|
机器学习/深度学习 TensorFlow 算法框架/工具
逻辑回归的 tensorflow 实现 | 学习笔记
快速学习逻辑回归的 tensorflow 实现
逻辑回归的 tensorflow 实现 | 学习笔记
|
机器学习/深度学习 存储 TensorFlow
《TensorFlow技术解析与实战》学习笔记2
《TensorFlow技术解析与实战》学习笔记2
172 0
|
机器学习/深度学习 算法 BI
《TensorFlow深度学习应用实践》学习笔记1
《TensorFlow深度学习应用实践》学习笔记1
162 0
|
机器学习/深度学习 分布式计算 算法
《TensorFlow技术解析与实战》学习笔记1
《TensorFlow技术解析与实战》学习笔记1
183 0

热门文章

最新文章