PyFlink 1.11 游乐场 - 开发环境准备

简介: # 开发环境依赖 PyFlink作业的开发和运行需要依赖Python 3.5/3.6/3.7 版本和Java 8或者Java 11,本游乐场所使用的环境是Java 1.8.0_211, Python 3.7.7 还有一些其他基础软件如下; - Java 1.8.0_211 - Python 3.7.7 - PIP 20.0.2 - PyCharm Runtime version: 11.

开发环境依赖

PyFlink作业的开发和运行需要依赖Python 3.5/3.6/3.7 版本和Java 8或者Java 11,本游乐场所使用的环境是Java 1.8.0_211, Python 3.7.7 还有一些其他基础软件如下;

  • Java 1.8.0_211
  • Python 3.7.7
  • PIP 20.0.2
  • PyCharm Runtime version: 11.0.7
  • MocOS 10.14.6

PyCharm 配置 Python interpreter

应用PyCharm进行开发首先要配置一下项目所使用的Python环境,配置路径PyCharm -> Preferences -> Project Interpreter如下:
image.png
点击 Add 配置新的环境,如下:
image.png
一路”OK“,完成配置。

安装PyFlink

我们先利用PyCharm创建一些项目,名为PyFlinkPlayground, 并为项目选择我们刚才创建的Virtualenv环境,如下:
image.png

创建之后,我们会看到External Libraries 里面使用了PlaygroundEnv, 但是初始化并没有PyFlink,所以我们需要进行显示的安装,如下:
image.png

我们可以手工安装PyFlink,直接在PyCharm的Terminal下进行安装,这时候我们自动就是启动的PlaygroundEnv环境,在安装的过程中你也可以看到site-packages内容会不断增加,

(PlaygroundEnv) jincheng:~ jincheng.sunjc$ python --version
Python 3.7.7
(PlaygroundEnv) jincheng:~ jincheng.sunjc$ python -m pip install apache-flink==1.11.1
Collecting apache-flink==1.11.1
  Using cached apache_flink-1.11.1-cp37-cp37m-macosx_10_9_x86_64.whl (206.7 MB)

...
...
Successfully installed apache-beam-2.19.0 apache-flink-1.11.1 avro-python3-1.9.1 certifi-2020.6.20 chardet-3.0.4 cloudpickle-1.2.2 crcmod-1.7 dill-0.3.1.1 docopt-0.6.2 fastavro-0.21.24 future-0.18.2 grpcio-1.30.0 hdfs-2.5.8 httplib2-0.12.0 idna-2.10 jsonpickle-1.2 mock-2.0.0 numpy-1.19.1 oauth2client-3.0.0 pandas-0.25.3 pbr-5.4.5 protobuf-3.12.4 py4j-0.10.8.1 pyarrow-0.15.1 pyasn1-0.4.8 pyasn1-modules-0.2.8 pydot-1.4.1 pymongo-3.11.0 pyparsing-2.4.7 python-dateutil-2.8.0 pytz-2020.1 requests-2.24.0 rsa-4.6 six-1.15.0 typing-3.7.4.3 typing-extensions-3.7.4.2 urllib3-1.25.10
(PlaygroundEnv) jincheng:~ jincheng.sunjc$ 

最终完成之后你可以在 site-packages下面找的 pyflink目录,如下:
image.png
有了这些信息我们就可以进行PyFlink的作业开发了。

HelloWorld 示例

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

def hello_world():
    """
    从随机Source读取数据,然后直接利用PrintSink输出。
    """
    settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
    source_ddl = """
                    CREATE TABLE random_source (
                        f_sequence INT,
                        f_random INT,
                        f_random_str STRING
                    ) WITH (
                        'connector' = 'datagen',
                        'rows-per-second'='5',
                        'fields.f_sequence.kind'='sequence',
                        'fields.f_sequence.start'='1',
                        'fields.f_sequence.end'='1000',
                        'fields.f_random.min'='1',
                        'fields.f_random.max'='1000',
                        'fields.f_random_str.length'='10'
                    )
                    """

    sink_ddl = """
                  CREATE TABLE print_sink (
                    f_sequence INT,
                    f_random INT,
                    f_random_str STRING 
                ) WITH (
                  'connector' = 'print'
                )
        """

    # 注册source和sink
    t_env.execute_sql(source_ddl);
    t_env.execute_sql(sink_ddl);

    # 数据提取
    tab = t_env.from_path("random_source");
    # 这里我们暂时先使用 标注了 deprecated 的API, 因为新的异步提交测试有待改进...
    tab.insert_into("print_sink");
    # 执行作业
    t_env.execute("Flink Hello World");

if __name__ == '__main__':
    hello_world()

上面代码在PyCharm里面右键运行就应该打印如下结果了:

image.png

开发日志

正常来讲我们可能开发一些UDF,可能打印一些日志或者特殊情况还可能进行Python代码的调试,怎么解?

  • 首先,我们定义一个UDF,在UDF里面添加调试日志,如下:
# 定义UDF
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def pass_by(str):
    logging.error("Some debugging infomation...")
    return str
  • 然后在SQL里面使用这个UDF,如下:
# 注册 UDF
t_env.register_function('pass_by', pass_by)
# 使用UDF
tab.select("f_sequence, f_random, pass_by(f_random_str) ")
  • 完整的代码
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

import logging

# 定义UDF
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def pass_by(str):
    logging.error("Some debugging infomation...")
    return "pass_by_" + str

def hello_world():
    """
    从随机Source读取数据,然后直接利用PrintSink输出。
    """
    settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
    t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

    source_ddl = """
                    CREATE TABLE random_source (
                        f_sequence INT,
                        f_random INT,
                        f_random_str STRING
                    ) WITH (
                        'connector' = 'datagen',
                        'rows-per-second'='5',
                        'fields.f_sequence.kind'='sequence',
                        'fields.f_sequence.start'='1',
                        'fields.f_sequence.end'='1000',
                        'fields.f_random.min'='1',
                        'fields.f_random.max'='1000',
                        'fields.f_random_str.length'='10'
                    )
                    """

    sink_ddl = """
                  CREATE TABLE print_sink (
                    f_sequence INT,
                    f_random INT,
                    f_random_str STRING 
                ) WITH (
                  'connector' = 'print'
                )
        """

    # 注册source和sink
    t_env.execute_sql(source_ddl);
    t_env.execute_sql(sink_ddl);

    # 注册 UDF
    t_env.register_function('pass_by', pass_by)

    # 数据提取
    tab = t_env.from_path("random_source");
    # 这里我们暂时先使用 标注了 deprecated 的API, 因为新的异步提交测试有待改进...
    tab.select("f_sequence, f_random, pass_by(f_random_str) ").insert_into("print_sink")
    # 执行作业
    t_env.execute("Flink Hello World");

if __name__ == '__main__':
    hello_world()

那么运行之后,日志在哪里呢?就是在项目的 PlaygroundEnv -> site-packages -> pyflink -> log 目录 ,如下:

image.png

到这里,简单的 开发环境就OK了,大家可以改改代码,直观体验一下。。。

游乐场设施

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
NoSQL 编译器 Linux
CodeBlocks-20.03下载安装及中文教程
CodeBlocks强大之处 1、跨平台,windows、linux 、mac都可以用 2、轻量化,远不及VS占用空间 3、完全免费
3553 1
CodeBlocks-20.03下载安装及中文教程
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
614 1
|
SQL 机器学习/深度学习 Java
用Python进行实时计算——PyFlink快速入门
Flink 1.9.0及更高版本支持Python,也就是PyFlink。 在最新版本的Flink 1.10中,PyFlink支持Python用户定义的函数,使您能够在Table API和SQL中注册和使用这些函数。但是,听完所有这些后,您可能仍然想知道PyFlink的架构到底是什么?作为PyFlink的快速指南,本文将回答这些问题。
2817 0
用Python进行实时计算——PyFlink快速入门
|
存储 API Docker
只需10分钟!就能用Flask,Docker和Jenkins部署机器学习模型
一杯茶的功夫部署完成机器学习模型!
3712 0
|
3月前
|
数据采集 Web App开发 人工智能
如何让AI“看懂”网页?拆解 Browser-Use 的三大核心技术模块
Browser-Use 是一种基于大语言模型(LLM)的浏览器自动化技术,通过融合视觉理解、DOM解析和动作预测等模块,实现对复杂网页任务的自主操作。它突破了传统固定选择器和流程编排的限制,具备任务规划与语义理解能力,可完成注册、比价、填报等多步骤操作。其核心功能包括视觉与HTML融合解析、多标签管理、元素追踪、自定义动作、自纠错机制,并支持任意LLM模型。Browser-Use标志着浏览器自动化从“规则驱动”向“认知驱动”的跃迁,大幅降低维护成本,提升复杂任务的处理效率与适应性。
2062 29
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
950 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
9月前
|
传感器 算法 Java
基于 pyflink 的算法工作流设计和改造
本文分享了硕橙科技大数据工程师程兴源在Flink Forward Asia 2024上的演讲内容,围绕工业互联网场景下的Flink应用展开。主要内容包括:为何选择Flink、算法工作流设计、性能优化实践、上下游链路协作思考及未来展望。团队通过Flink处理工业设备数据(如温度、振动等),实现故障预测与分析。文章详细探讨了性能优化路径(如批处理、并行度提升)、KeyBy均衡化、内存管理等技术细节,并介绍了数据补全方法和告警规则的设计。最后,对未来基于Flink的编码强化、CEP模式改进及工业数据归因目标进行了展望。
369 7
基于 pyflink 的算法工作流设计和改造
|
并行计算 API C++
Nvidia TensorRT系列01-TensorRT的功能1
NVIDIA TensorRT是一个高性能深度学习推理优化器和运行时,支持C++和Python API。其编程模型分为构建阶段和运行时阶段,前者优化模型,后者执行推理。TensorRT支持多种数据类型和精度,包括FP32、FP16、INT8等,并提供插件机制以扩展支持的操作。
443 0
|
SQL 存储 Linux
从配置源到数据库初始化一步步教你在CentOS 7.9上安装SQL Server 2019
【11月更文挑战第16天】本文介绍了在 CentOS 7.9 上安装 SQL Server 2019 的详细步骤,包括配置系统源、安装 SQL Server 2019 软件包以及数据库初始化,确保 SQL Server 正常运行。
598 4
|
SQL 机器学习/深度学习 大数据
【赵渝强老师】Flink生态圈组件
Flink 是一个大数据计算引擎,支持批处理和流处理。其优势在于流处理引擎 DataStream。Flink 的生态圈分为三层:平台部署层、核心引擎层和 API&Library 层。平台部署层支持多种部署模式,核心引擎层负责任务执行,API&Library 层提供丰富的开发工具。
232 1