开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问一下,有没有人用过zeppelin的,都是怎么支持flink 1.15.0。。

请问一下,有没有人用过zeppelin的,都是怎么支持flink 1.15.0。。

展开
收起
游客3oewgrzrf6o5c 2022-08-17 14:35:33 521 0
1 条回答
写回答
取消 提交回答
  • 全栈JAVA领域创作者

    Zeppelin是一个基于Web的交互式计算环境,支持多种语言,包括Python、R、SQL、Java等。它可以在浏览器中运行,提供了类似于命令行的交互式界面,并支持可视化、自动补全、调试等功能。 Zeppelin支持使用Flink作为数据处理引擎,可以通过在zeppelin notebook中直接调用Flink SQL语句来实现。以下是一个使用Zeppelin支持Flink 1.15.0的示例:

    scala Copy code from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, col

    创建SparkSession对象

    spark = SparkSession.builder \n .appName("Flink SQL with Zeppelin") \n .enableHiveSupport() \n .getOrCreate()

    从json文件读取数据

    json_df = spark \n .read \n .option("header", "true") \n .json("input.json") \n .createOrReplaceTempView("json_table")

    将数据转换为Flink表格

    flink_table = spark \n .sql("FROM JsonTableSource") \n .option("header", "true") \n .option("path", "jars/flink-1.15.0-bin-scala_2.11/lib/flink-streaming-java_2.11-1.15.0.jar") \n .getOrCreateExternalTable("flink_table")

    将数据写入Flink表格

    flink_table \n .writeStream \n .outputMode("append") \n .format("csv") \n .option("path", "jars/flink-1.15.0-bin-scala_2.11/lib/flink-streaming-java_2.11-1.15.0.jar") \n .option("checkpointLocation", "chkpt/") \n .saveAsTable("output_table") 上述示例中,我们首先创建了一个SparkSession对象,然后从一个JSON文件中读取数据,将其转换为Flink表格,并将数据写入到一个Flink表格中。其中,from_json和col函数是针对Python语言的支持,可以在zeppelin notebook中使用。 需要注意的是,Zeppelin支持的Flink版本需要与Zeppelin的Flink版本相匹配。如果使用的Flink版本不同,需要先安装相应版本的Flink和Zeppelin,并将Zeppelin的配置文件中的相关参数进行更改。

    2023-06-19 19:04:53
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载