开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :Apache Flink Python API 的现状及未来规划(二)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10042
Apache Flink Python API 的现状及未来规划(二)
三,Apeche Flink Python API 核心算子介绝及应用
1,Python Table API 算子
除了单流之后,还有双流的操作,比如说双流转,双流的这个 minutes,所以说这种算子,在这个 python 的 API 里面也得到了很好的支持。
Windows 跟其他的词有些特殊,第一个,就是在实际当中,会有一个时间的属性,python 的语法和这个 Java 的语法是一模一样的,它的接口是保持一致的,那么 Tom 也是一个 table。window 然后再加一个 window 的一个定义。
2,Python Table API 算子—Watermark 定义
.with_format(
Json()
.fail_on_missing_field(True)
.json_schema(
"{"
“type: 'object',"
“properties:{"
“a:{"
“type:'string”
"},"
“time:{"
“type: 'string',"
“format: 'date-time"
"}"
"}"
"}"
}
.with_schema(
Schema()
.field("rowtime",DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from field("time")
.watermarks_periodic_bounded(60000))
.field("a",DataTypes.STRING())
)
python 能够和 Java API 进行对其功能进行对齐。但是始终没持续那么流畅,他不是按顺序的,可以说乱序,对于这种乱性,就是一个流畅客观存在的一种状态,在这个福利上,用 word 的机制来解决这种乱序的问题。
假设我有一个节省的数据,有一个 a 字段可以实现字段时间字段 daytime,增加一个 rom 的列,那么必须是一个 times name 的类型,并且点燃之后要围绕 Tom 定义。
但是在表里面用的字段的名称是 rom time,也就是如果写 window 的话,用的应该是 real time 这个字符串。
下面这个 what mark would mark ,有多种定义方式。
这个6万单位其实是毫秒,那其实60秒一分钟他说明如果数据是乱序的,其实是能够处理在一分钟之内的乱序的,所以这个值调的越大数据乱序接受程度越高,但是有一点就是它数据的延迟也会越高。
3,Python Table API -Java UDF
Java UDF
虽然我们在 Flink-1.9中没有支持 Python 的 UDF,但在 Fink 1.9 版木中我们可以使用 lava UDF。
1. 创建 java 项目,并配置 pom 依赖如下:
>
org apache.flink
flink-table-common
1.9-SNAPSHOT
provided
2.编写一个计算字符申长度的函数 UDFLength
package org.apache.flink.udf;
import org.apache.flink.table.functions.ScalarFunction;
public class UDFLength extends ScalarFunction
{
public int eval(String str)(
return strlength();}
}
}
3.注册和使用:
t_envregister
_
java_function("len","org.apache.flink.udf.UDFLength")
....
select("word, lerdword),count(1) as count")
4,Python Table API -Java UDFs
开发 PythonJob,并使用上面自定义的 UDFLength 函数:https:/lgithub.com/sunjincheng121/enjoyment.code/blob/master/mypyFlink/enjoyment/word count_udfpy
提交 Python Job,并上传 UDFJAR 包:
/bin/flink run-py word_count_udf.py -j /flink-udf-1.0.0.jar
.Scalar Function
T_envregister_iava_function("len","org.apache.flink.udf.UDFLength")
....
.select("word, len(word), count(1) as count")
.Table Function
t.env.register..ava..function("split", "com.pyflink.table,Split")
tab.join.laterall"Split(a) as (word, length)").select("a, word, length")
.Aggregate Function
t.env.register.javafunction("wAvg", "com.pyflink.table.WeightedAvg") tab.group by("a").select("a, wAvg(b) as d")
5,Python Table API 常用链接
Python Table API 文档
https://ci.apache.org/projects/flink/flink-docs-master/api/python/
Python Table API IDE 开发环境
https://cwiki.apache.org/confluence/display/FLINK/Setting+up+a+Flink+development+environment
Python Shell
https://ci.apache.org/projects/flink/flink-docs-master/ops/pythen_shell.html
Python Table API Tutorial
https://ci.apache.org/projects/flink/flink-docs-master/tutorials/python table api.html
https://enjoyment.cool/
对那么 spark 本身它其实它指的是一些 udf 的性能上的一个损失,对于1.9来讲,目前的1.9来讲其实没有性能损失在里面,原因是这个,本身我们所有的案算子都是一个 Flink 内部的一个 native 算子,直接利用构建构建甲瓦的这构件加盟的这个 graph。 graph 其实跟目前这样的架构会共用和共享,CAD 里面的优化机制都能享受,在1.9版本里面只是切换了一种语言的入口。
从性能的角度现在没有损失。这个几个有价值的数据,研究方向和机器学习,研究方向论文,最好中文,这些都是机器学习的这个大数据的研究方向,在阿里内部也有很多这种方向。