EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务

简介: Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。

Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。


背景信息

Apache Livy 通过 REST 接口与 Spark 进行交互,极大简化了 Spark 和应用程序服务器之间的通信复杂度。关于 Livy API,请参见REST API


前提条件


操作步骤

步骤一:创建 Gateway 及访问 Token

  1. 创建 Gateway。
  1. 进入 Compute 页面。
  1. 登录E-MapReduce控制台
  2. 在左侧导航栏,选择EMR Serverless > Spark
  3. Spark页面,单击目标工作空间名称。
  4. EMR Serverless Spark页面,单击左侧导航栏中的Compute
  1. Compute页面,单击Gateway
  2. 单击创建Gateway
  3. 在创建Gateway页面,输入名称(例如,Livy-gateway),单击创建


  1. 创建Token。
  1. Gateway页面,单击Livy-gateway操作列的Token管理
  2. 单击创建Token
  3. 创建Token对话框中,输入名称(例如,Livy-token),单击确定
  4. 复制Token信息。


重要
Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。


步骤二:配置 Apache Airflow

  1. 执行以下命令,在Apache Airflow环境中安装Apache Livy。
pip install apache-airflow-providers-apache-livy


  1. 添加Connection。
  • UI 方式

在Airflow中找到默认为livy_default的Connection,并对其信息进行修改;或者您也可以在Airflow Web页面手动添加Connection,详情请参见创建Connection


涉及以下信息:

  • Host:填写为Gateway中的Endpoint信息。
  • Schema:填写为https
  • Extra:填写JSON字符串,x-acs-spark-livy-token为您前一个步骤中复制的Token信息。
{
  "x-acs-spark-livy-token": "6ac**********kfu"
}


  • CLI 方式

通过Airflow CLI执行相应命令来建立Connection,详情请参见创建Connection

airflow connections add 'livy_default' \
    --conn-json '{
        "conn_type": "livy",
        "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",   # Gateway中的Endpoint信息。
        "schema": "https",
        "extra": {
            "x-acs-spark-livy-token": "6ac**********kfu"  # 为您前一个步骤中复制的Token信息。
        }
    }'



步骤三:DAG 示例

Airflow的DAG(Directed Acyclic Graph)定义允许您声明任务执行的方式,以下是通过Airflow使用Livy Operator执行Spark任务的示例。


从阿里云OSS获取并执行Python脚本文件。

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)
# define livy task with LivyOperator
# 请根据实际情况替换file内容。
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task",
    dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task


说明

file为您的 Spark 任务对应的文件路径,本文示例为上传至阿里云 OSS 上的 JAR 包spark-examples_2.12-3.3.1.jar的路径,请您根据实际情况替换。上传操作可参见简单上传


步骤四:查看提交至 EMR 的任务

  1. EMR Serverless Spark页面,单击左侧导航栏中的任务历史
  2. 任务历史开发任务页签,您可以查看提交的任务。


相关文档

在Apache Airflow中,您也可以选择使用EMR提供的EmrServerlessSparkStartJobRunOperator接口来提交EMR Serverless Spark任务,提供了一种除了Livy之外的便捷途径。更多详情,请参见通过Apache Airflow向EMR Serverless Spark提交任务


快速跳转

  1. EMR Serverless Spark 版官网:https://www.aliyun.com/product/bigdata/serverlessspark
  2. 产品控制台:https://emr-next.console.aliyun.com/
  3. 产品文档:https://help.aliyun.com/zh/emr/emr-serverless-spark/



EMR Serverless Spark 在 2024年5月正式开启公测,在公测期间可以免费使用最高 100 CU 计算资源,欢迎试用。如果您在使用 EMR Serverless Spark 版的过程中遇到任何疑问,可钉钉扫描以下二维码加入钉钉群(群号:58570004119)咨询。

目录
相关文章
|
4月前
|
运维 监控 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
国诚投顾携手阿里云,依托Serverless架构实现技术全面升级,构建高弹性、智能化技术底座,提升业务稳定性与运行效率。通过云原生API网关、微服务治理与智能监控,实现流量精细化管理与系统可观测性增强,打造安全、敏捷的智能投顾平台,助力行业数字化变革。
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
|
8月前
|
SQL 分布式计算 Serverless
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
鹰角网络为应对游戏业务高频活动带来的数据潮汐、资源弹性及稳定性需求,采用阿里云 EMR Serverless Spark 替代原有架构。迁移后实现研发效率提升,支持业务快速发展、计算效率提升,增强SLA保障,稳定性提升,降低运维成本,并支撑全球化数据架构部署。
854 56
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
|
4月前
|
运维 监控 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
通过与阿里云深度合作,国诚投顾完成了从传统 ECS 架构向云原生 Serverless 架构的全面转型。新的技术架构不仅解决了原有系统在稳定性、弹性、运维效率等方面的痛点,还在成本控制、API 治理、可观测性、DevOps 自动化等方面实现了全方位升级。
|
8月前
|
分布式计算 运维 搜索推荐
立马耀:通过阿里云 Serverless Spark 和 Milvus 构建高效向量检索系统,驱动个性化推荐业务
蝉妈妈旗下蝉选通过迁移到阿里云 Serverless Spark 及 Milvus,解决传统架构性能瓶颈与运维复杂性问题。新方案实现离线任务耗时减少40%、失败率降80%,Milvus 向量检索成本降低75%,支持更大规模数据处理,查询响应提速。
447 57
|
8月前
|
存储 运维 Serverless
千万级数据秒级响应!碧桂园基于 EMR Serverless StarRocks 升级存算分离架构实践
碧桂园服务通过引入 EMR Serverless StarRocks 存算分离架构,解决了海量数据处理中的资源利用率低、并发能力不足等问题,显著降低了硬件和运维成本。实时查询性能提升8倍,查询出错率减少30倍,集群数据 SLA 达99.99%。此次技术升级不仅优化了用户体验,还结合AI打造了“一看”和“—问”智能场景助力精准决策与风险预测。
745 69
|
6月前
|
人工智能 分布式计算 DataWorks
一体系数据平台的进化:基于阿里云 EMR Serverless Spark 的持续演进
本文介绍了一体系汽配供应链平台如何借助阿里云EMR Serverless Spark实现从传统Hadoop平台向云原生架构的迁移。通过融合高质量零部件供应与创新互联网科技,一体系利用EMR Serverless Spark和DataWorks构建高效数据分析体系,解决大规模数据处理瓶颈。方案涵盖实时数据集成、Lakehouse搭建、数仓分层设计及BI/ML应用支持,显著提升数据处理性能与业务响应速度,降低运维成本,为数字化转型奠定基础。最终实现研发效率提升、运维压力减轻,并推动AI技术深度整合,迈向智能化云原生数据平台。
234 4
|
6月前
|
分布式计算 运维 监控
Fusion 引擎赋能:流利说如何用阿里云 Serverless Spark 实现数仓计算加速
本文介绍了流利说与阿里云合作,利用EMR Serverless Spark优化数据处理的全过程。流利说是科技驱动的教育公司,通过AI技术提升用户英语水平。原有架构存在资源管理、成本和性能等痛点,采用EMR Serverless Spark后,实现弹性资源管理、按需计费及性能优化。方案涵盖数据采集、存储、计算到查询的完整能力,支持多种接入方式与高效调度。迁移后任务耗时减少40%,失败率降低80%,成本下降30%。未来将深化合作,探索更多行业解决方案。
362 1

推荐镜像

更多