【译】深入分析Spark UDF的性能

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 这篇博客会阐述一份关于Apache Spark的在Scala UDF、 PySpark UDF 和PySpark Pandas UDF之间的性能评测报告。

原文链接 https://medium.com/@QuantumBlack/spark-udf-deep-insights-in-performance-f0a95a4d8c62


编译:抚月,阿里巴巴计算平台事业部 EMR 高级工程师,Apache HDFS Committer,目前从事开源大数据存储和优化方面的工作。


这篇博客会阐述一份关于Apache Spark的在Scala UDF、 PySpark UDF 和PySpark Pandas UDF之间的性能评测报告。


Spark提供了多种解决方案来应对复杂挑战, 但是我们面临了很多场景, 原生的函数不足以解决问题。因此,Spark允许我们注册自定义函数(User-Defined Functions, 或者叫 UDFs)

在这片文章, 我们会探索Spark的UDF的性能特点。

Spark支持多种语言,比如Python, Scala, Java, R, SQL. 但是通常数据操作都是用PySpark或者Spark Scala写的。 我们认为Pyspark被大多数用户采用, 是因为以下原因:

  1. 更快的学习曲线  -- Python比Scala更简单。
  2. 更广的社区支持  -- 程序员对Pyspark性能等建议,反馈到社区,形成更好的生态。
  3. 丰富的可用的类库 -- Python有很多机器学习、时序分析、数理统计的类库。
  4. 很小的性能差异 -- Spark DataFrames引入之后,意味着Scala和Python的性能几乎相同。 Datafarme现在是按照带名字的列(named columns)来组织的, 这样Spark可以更好地理解Schema。而那些用来构建dataframe的操作,会被Catalyst Optimizer编译成物理执行计划(physical execution plan)来加速计算。
  5. 数据工程师和数据科学家,交接代码也更简单。有一些dataframe的操作需要UDFs, PySpark可能会有性能问题。有一些解决办法,就是将PySpark和Scala UDF, UDF Wrapper一起使用。

PySpark作业提交的时候, driver端跑在Python上, driver会创建一个SparkSession对象以及Dataframes/RDDs. 这些Python对象是一些wrapper对象, 本质是JVM(Java)对象。 为了简化,PySpark提供了一个wrapper来跑原生Scala代码。

Spark UDF 函数

通过Scala, Python 或者 Java 注册自定义函数,是非常通用的方法, 来扩展SQL用户的能力, 是的用户可以调用这些函数而不需要再写代码。

例如, 将一个100w行的集合乘以1000:

def times1000(field):
  return field * 1000.00

或者, 对经纬度数据集进行反向地理编码(reverse geocode):

import geohash
def geohash_pyspark(lat, lon):
  return geohash.encode(lat, lon)

Spark SQL提供了一种方法, 你可以用自己的编程语言来传入1个函数,从而注册UDF。Scala和Python可以用原生的函数或者lamdba语法,除了Java繁琐一些,需要扩展这个UDF类。
UDF可以作用于多种不同的数据类型,并返回一种不同的类型。在Python和Java里,我们需要指定发返回类型。

UDF可以通过以下方式进行注册:

spark.udf.register("UDF_Name", function_name, returnType())

*returnType() 在Python和Java里是强制的。

多种Spark UDF和执行方式

在分布式模式下,Spark使用master/worker架构来执行。调度器(driver)来跟大量的workers(或者叫executors)进行通信。 driver和worker跑在自己的Java进程里。
driver端通过main()方法,创建了SparkContext, RDDs并执行一些变换操作。 Executors负责跑一个个的任务。

Screen Shot 2019-12-14 at 20.40.53.png
1576224390592-89e2031e-a0f6-4acd-9b4b-58d5d667f264.png

性能基准测试

我们创建了一个随机的经纬度数据集, 包含100w条记录, 共1.2GB,来测试3种Spark UDF类型的性能。我们创建了2个UDF:一个简单的乘以1000的函数, 一个复杂的geohash函数。 (所以总共有2 * 3 = 6组测试)

集群配置: 8个节点
Driver节点: 16核 122GB内存
Worker节点: 4核 30.5GB内存,开启自动扩容
Notebook代码:  https://bit.ly/2YxiVp4 使用 QuantumBlack’s的方法来跑 Scala UDF, PySpark UDF and PySpark Pandas UDF 的测试。

除了上面3种类型的UDF,我们还创建了Python wrapper, 从而在Pyspark中调用Scala UDF。我们发现这种方式, 既可以使用简单的python编程,又能兼顾Scala UDF的性能。

用Pyspark代码来创建一个Python wrapper:

from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq
from pyspark.sql.functions import col

def udfGeohashScalaWrapper(lat, lon):
    _geohash = sc._jvm.sparkudfperformance.UDFs.udfGeohash()
    return Column(_geohash.apply(_to_seq(sc, [lat, lon], _to_java_column)))
def udfTimes1000ScalaWrapper(field):
    _times1000 = sc._jvm.sparkudfperformance.UDFs.udfTimes1000()
    return Column(_times1000.apply(_to_seq(sc, [field], _to_java_column)))

Databricks对 Pandas UDF 做过一份性能报告  https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

重要结论

下面是测试结果

1576224428174-0d814c3d-160e-4ffa-8551-b4b5705c149b.png


测试结果中, Scala UDF的性能是最好的。 前面提到, Scala和Python之间的转换步骤, 使得Python UDF需要处理更多东西。
我们同时发现,PySpark Pandas UDF在小数据集或者简单函数上,性能好于PySpark UDF。而如果是一个复杂的函数,比如引入了geohash,这种场景下 PySpark UDF的性能会比PySpark Pandas UDF好10倍。
我们还发现了,在PySpark代码里, 创建一个Python wrapper去调用Scala UDF,性能比这两种PySpark UDFs好15倍。


综合考虑了上面的一些性能特征, QuantumBlack公司现在采用的方式是:

  • 使用 PySpark UDF, 如果数据集不大,并且需要用简单函数进行快速的数据洞察。
  • 构架一个可复用的Scala UDF的内置库。
  • 创建Python wrapper来调用Scala UDF
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
6月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
211 1
Spark快速大数据分析PDF下载读书分享推荐
|
8月前
|
移动开发 分布式计算 Spark
Spark的几种去重的原理分析
Spark的几种去重的原理分析
163 0
|
3月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
217 2
|
3月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
136 0
|
6月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23749 42
|
5月前
|
大数据 RDMA
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
58 0
|
8月前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
56624 7
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
7月前
|
机器学习/深度学习 数据采集 分布式计算
基于spark的大数据分析预测地震受灾情况的系统设计
基于spark的大数据分析预测地震受灾情况的系统设计
214 1
|
8月前
|
SQL 分布式计算 关系型数据库
Spark 分析计算连续三周登录的用户数
本文介绍了如何使用窗口函数`range between`来查询`login_time`为2022-03-10的用户最近连续三周的登录数。首先在MySQL中创建`log_data`表并插入数据,接着定义需求为找出该日期前连续三周活跃的用户数。通过Spark SQL,分步骤实现:1)确定统计周期,2)筛选符合条件的数据,3)计算用户连续登录状态。在初始实现中出现错误,因未考虑日期在周中的位置,修正后正确计算出活跃用户数。
138 6
|
8月前
|
分布式计算 Java 关系型数据库