个推技术实践 | Spark性能调优看这篇,性能提升60%↑ 成本降低50%↓

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 对企业来讲,效率和成本始终是其进行海量数据处理和计算时所必须关注的问题。如何充分发挥Spark的优势,在进行大数据作业时真正实现降本增效呢?个推将多年积累的Spark性能调优妙招进行了总结,与大家分享。

Spark是目前主流的大数据计算引擎,功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。作为一种内存计算框架,Spark运算速度快,并能够满足UDF、大小表Join、多路输出等多样化的数据计算和处理需求。


作为国内专业的数据智能服务商,个推从早期的1.3版本便引入Spark,并基于Spark建设数仓,进行大规模数据的离线和实时计算。由于Spark 在2.x版本之前的优化重心在计算引擎方面,而在元数据管理方面并未做重大改进和升级。因此个推仍然使用Hive进行元数据管理,采用Hive元数据管理+ Spark计算引擎的大数据架构,以支撑自身大数据业务发展。个推还将Spark广泛应用到报表分析、机器学习等场景中,为行业客户和政府部门提供实时人口洞察、群体画像构建等服务。


1.png

▲个推在实际业务场景中,分别使用SparkSQL 和 HiveSQL对一份3T数据进行了计算,上图展示了跑数速度。数据显示:在锁死队列(120G内存,<50core)前提下, SparkSQL2.3 的计算速度是Hive1.2 的5-10倍。


对企业来讲,效率和成本始终是其进行海量数据处理和计算时所必须关注的问题。如何充分发挥Spark的优势,在进行大数据作业时真正实现降本增效呢?个推将多年积累的Spark性能调优妙招进行了总结,与大家分享。


Spark性能调优-基础篇

众所周知,正确的参数配置对提升Spark的使用效率具有极大助力。因此,针对 不了解底层原理的Spark使用者,我们提供了可以直接抄作业的参数配置模板,帮助相关数据开发、分析人员更高效地使用Spark进行离线批处理和SQL报表分析等作业。


推荐参数配置模板如下:


Spark-submit 提交方式脚本

/xxx/spark23/xxx/spark-submit --master yarn-cluster  \--name ${mainClassName} \--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \--conf spark.yarn.maxAppAttempts=2 \--conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC \--driver-memory 2g \--conf spark.sql.shuffle.partitions=1000 \--conf hive.metastore.schema.verification=false \--conf spark.sql.catalogImplementation=hive \--conf spark.sql.warehouse.dir=${warehouse} \--conf spark.sql.hive.manageFilesourcePartitions=false \--conf hive.metastore.try.direct.sql=true \--conf spark.executor.memoryOverhead=512M \--conf spark.yarn.executor.memoryOverhead=512 \--executor-cores 2 \--executor-memory 4g \--num-executors 50 \--class 启动类 \${jarPath} \-M ${mainClassName}



spark-sql 提交方式脚本

option=/xxx/spark23/xxx/spark-sqlexport SPARK_MAJOR_VERSION=2${option} --master yarn-client \--driver-memory 1G \--executor-memory 4G \--executor-cores 2 \--num-executors 50 \--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties" \--conf spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER \--conf spark.sql.auto.repartition=true \--conf spark.sql.autoBroadcastJoinThreshold=104857600 \--conf "spark.sql.hive.metastore.try.direct.sql=true" \--conf spark.dynamicAllocation.enabled=true \--conf spark.dynamicAllocation.minExecutors=1 \--conf spark.dynamicAllocation.maxExecutors=200 \--conf spark.dynamicAllocation.executorIdleTimeout=10m \--conf spark.port.maxRetries=300 \--conf spark.executor.memoryOverhead=512M \--conf spark.yarn.executor.memoryOverhead=512 \--conf spark.sql.shuffle.partitions=10000 \--conf spark.sql.adaptive.enabled=true \--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728 \--conf spark.sql.parquet.compression.codec=gzip \--conf spark.sql.orc.compression.codec=zlib \--conf spark.ui.showConsoleProgress=true-f pro.sqlpro.sql 为业务逻辑脚本


Spark性能调优-进阶篇

针对有意愿了解Spark底层原理的读者,本文梳理了standalone、Yarn-client、Yarn-cluster等3种常见任务提交方式的交互图,以帮助相关使用者更直观地理解Spark的核心技术原理、为阅读接下来的进阶篇内容打好基础。


standalone

2.png


1) spark-submit 提交,通过反射的方式构造出1个DriverActor 进程;

2) Driver进程执行编写的application,构造sparkConf,构造sparkContext;

3) SparkContext在初始化时,构造DAGScheduler、TaskScheduler,jetty启动webui;

4) TaskScheduler 有sparkdeployschedulebackend 进程,去和Master通信,请求注册Application;

5) Master 接受通信后,注册Application,使用资源调度算法,通知Worker,让worker启动Executor;

6) worker会为该application 启动executor,executor 启动后,会反向注册到TaskScheduler;

7) 所有Executor 反向注册到TaskScheduler 后,Driver 结束sparkContext 的初始化;

8) Driver继续往下执行编写的application,每执行到1个action,就会创建1个job;

9) job 会被提交给DAGScheduler,DAGScheduler 会对job 划分为多个stage(stage划分算法),每个stage创建1个taskSet;

10) taskScheduler会把taskSet里每1个task 都提交到executor 上执行(task 分配算法);

11) Executor 每接受到1个task,都会用taskRunner来封装task,之后从executor 的线程池中取出1个线程,来执行这个taskRunner。(task runner:把编写的代码/算子/函数拷贝,反序列化,然后执行task)。


Yarn-client

Yarn-client.png

1) 发送请求到ResourceManager(RM),请求启动ApplicationMaster(AM);

2) RM 分配container 在某个NodeManager(NM)上,启动AM,实际是个ExecutorLauncher;

3) AM向RM 申请container;

4) RM给AM 分配container;

5) AM 请求NM 来启动相应的Executor;

6) executor 启动后,反向注册到Driver进程;

7) 后序划分stage,提交taskset 和standalone 模式类似。


Yarn-cluster

Yarn-cluster.png

1) 发送请求到ResourceManager(RM),请求启动ApplicationMaster(AM);

2) RM 分配container 在某个NodeManager(NM)上,启动AM;

3) AM向RM 申请container;

4) RM给AM 分配container;

5) AM 请求NM 来启动相应的Executor;

6) executor 启动后,反向注册到AM;

7) 后序划分stage,提交taskset 和standalone 模式类似。


理解了以上3种常见任务的底层交互后,接下来本文从存储格式、数据倾斜、参数配置等3个方面来展开,为大家分享个推进行Spark性能调优的进阶姿势。


存储格式(文件格式、压缩算法)

众所周知,不同的SQL引擎在不同的存储格式上,其优化方式也不同,比如Hive更倾向于orc,Spark则更倾向于parquet。同时,在进行大数据作业时,点查、宽表查询、大表join操作相对频繁,这就要求文件格式最好采用列式存储,并且可分割。因此我们推荐以parquet、orc 为主的列式存储文件格式和以gzip、snappy、zlib 为主的压缩算法。在组合方式上,我们建议使用parquet+gzip、orc+zlib的组合方式,这样的组合方式兼顾了列式存储与可分割的情况,相比txt+gz 这种行式存储且不可分割的组合方式更能够适应以上大数据场景的需求。

个推以线上500G左右的数据为例,在不同的集群环境与SQL引擎下,对不同的存储文件格式和算法组合进行了性能测试。测试数据表明:相同资源条件下,parquet+gz 存储格式较text+gz存储格式在多值查询、多表join上提速至少在60%以上。


结合测试结果,我们对不同的集群环境与SQL引擎下所推荐使用的存储格式进行了梳理,如下表:

3.png


同时,我们也对parquet+gz、orc+zlib的内存消耗进行了测试。以某表的单个历史分区数据为例,parquet+gz、orc+zlib比txt+gz 分别节省26%和49%的存储空间。


完整测试结果如下表:

4.png


可见,parquet+gz、orc+zlib确实在降本提效方面效果显著。那么,如何使用这两种存储格式呢?步骤如下:


➤hive 与 spark 开启指定文件格式的压缩算法


spark:

set spark.sql.parquet.compression.codec=gzip;set spark.sql.orc.compression.codec=zlib;


hive:

set hive.exec.compress.output=true;set mapreduce.output.fileoutputformat.compress=true;set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;


➤建表时指定文件格式

parquet 文件格式(序列化,输入输出类)

CREATE EXTERNAL TABLE `test`(rand_num double)PARTITIONED BY (`day` int)ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat';


orc 文件格式(序列化,输入输出类)

ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.orc.OrcSerde'STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat';


➤线上表调

ALTER TABLE db1.table1_std SET TBLPROPERTIES ('parquet.compression'='gzip');ALTER TABLE db2.table2_std SET TBLPROPERTIES ('orc.compression'='ZLIB');


➤ctas 建表

create table tablename stored as parquet as select ……;create table tablename stored as orc TBLPROPERTIES ('orc.compress'='ZLIB')  as select ……;



数据倾斜


数据倾斜分为map倾斜和reduce倾斜两种情况。本文着重介绍reduce 倾斜,如SQL 中常见的group by、join 等都可能是其重灾区。数据倾斜发生时,一般表现为:部分task 显著慢于同批task,task 数据量显著大于其他task,部分taskOOM、spark shuffle 文件丢失等。


如下图示例,在duration 列和shuffleReadSize/Records列,我们能明显发现部分task 处理数据量显著升高,耗时变长,造成了数据倾斜:


5.png



如何解决数据倾斜?


我们总结了7种数据倾斜解决方案,能够帮助大家解决常见的数据倾斜问题:


解决方案一:使用 Hive ETL预处理数据

即在数据血缘关系中,把倾斜问题前移处理,从而使下游使用方无需再考虑数据倾斜问题。

⁕该方案适用于下游交互性强的业务,如秒级/分钟级别提数查询。


解决方案二:过滤少数导致倾斜的key

即剔除倾斜的大key,该方案一般结合百分位点使用,如99.99%的id 记录数为100条以内,那么100条以外的id 就可考虑予以剔除。


⁕该方案在统计型场景下较为实用,而在明细场景下,需要看过滤的大key 是否为业务所侧重和关注。


解决方案三:提高shuffle操作的并行度

即对spark.sql.shuffle.partitions参数进行动态调整,通过增加shuffle write task写出的partition数量,来达到key的均匀分配。SparkSQL2.3 在默认情况下,该值为200。开发人员可以在启动脚本增加如下参数,对该值进行动态调整:

conf spark.sql.shuffle.partitions=10000conf spark.sql.adaptive.enabled=true conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728


⁕该方案非常简单,但是对于key的均匀分配却能起到较好的优化作用。比如,原本10个key,每个50条记录,只有1个partition,那么后续的task需要处理500条记录。通过增加partition 数量,可以使每个task 都处理50条记录,10个task 并行跑数,耗时只需要原来1个task 的1/10。但是该方案对于大key较难优化,比如,某个大key记录数有百万条,那么大key 还是会被分配到1个task 中去。


解决方案四:将reducejoin转为mapjoin

指的是在map端join,不走shuffle过程。以Spark为例,可以通过广播变量的形式,将小RDD的数据下发到各个Worker节点上(Yarn 模式下是NM),在各个Worker节点上进行join。

⁕该方案适用于小表join大表场景(百G以上的数据体量)。此处的小表默认阈值为10M,低于此阈值的小表,可分发到worker节点。具体可调整的上限需要小于container分配的内存。


解决方案五:采样倾斜key并分拆join操作

如下图示例:A表 join B表,A表有大key、B表无大key,其中大key的id为1,有3条记录。

6.png

如何进行分拆join操作呢?

首先将A表、B表中id1单独拆分出来,剔除大key的A' 和 B' 先join,达到非倾斜的速度;

针对A表大key添加随机前缀,B表扩容N倍,单独join;join后剔除随机前缀即可;

再对以上2部分union。


⁕该方案的本质还是减少单个task 处理过多数据时所引发的数据倾斜风险,适用于大key较少的情况。


解决方案六:使用随机前缀和扩容RDD进行join

比如,A 表 join B表,以A表有大key、B表无大key为例:

对A表每条记录打上[1,n] 的随机前缀,B表扩容N倍,join。

join完成后剔除随机前缀。


⁕该方案适用于大key较多的情况,但也会增加资源消耗。


解决方案七:combiner

即在map端做combiner操作,减少shuffle 拉取的数据量。

⁕该方案适合累加求和等场景。


在实际场景中,建议相关开发人员具体情况具体分析,针对复杂问题也可将以上方法进行组合使用。


Spark 参数配置


7.png


针对无数据倾斜的情况,我们梳理总结了参数配置参照表帮助大家进行Spark性能调优,这些参数的设置适用于2T左右数据的洞察与应用,基本满足大多数场景下的调优需求。


总结

目前,Spark已经发展到了Spark3.x,最新版本为Spark 3.1.2 released (Jun 01, 2021)。Spark3.x的许多新特性,如动态分区修剪、Pandas API的重大改进、增强嵌套列的裁剪和下推等亮点功能,为进一步实现降本增效提供了好思路。未来,个推也将继续保持对Spark演进的关注,并持续展开实践和分享。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
8月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
771 1
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
153 2
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
138 1
|
6月前
|
分布式计算 Java Serverless
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
本文以 ECS 连接 EMR Serverless Spark 为例,介绍如何通过 EMR Serverless spark-submit 命令行工具进行 Spark 任务开发。
456 7
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
|
4月前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
88 0
|
6月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
179 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
5月前
|
大数据 RDMA
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
58 0
|
5月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
251 0
|
6月前
|
分布式计算 Hadoop Serverless
数据处理的艺术:EMR Serverless Spark实践及应用体验
阿里云EMR Serverless Spark是基于Spark的全托管大数据处理平台,融合云原生弹性与自动化,提供任务全生命周期管理,让数据工程师专注数据分析。它内置高性能Fusion Engine,性能比开源Spark提升200%,并有成本优化的Celeborn服务。支持计算存储分离、OSS-HDFS兼容、DLF元数据管理,实现一站式的开发体验和Serverless资源管理。适用于数据报表、科学项目等场景,简化开发与运维流程。用户可通过阿里云控制台快速配置和体验EMR Serverless Spark服务。
|
7月前
|
分布式计算 运维 Serverless
通过Serverless Spark提交PySpark流任务的实践体验
EMR Serverless Spark服务是阿里云推出的一种全托管、一站式的数据计算平台,旨在简化大数据计算的工作流程,让用户更加专注于数据分析和价值提炼,而非基础设施的管理和运维。下面就跟我一起通过Serverless Spark提交PySpark流任务吧。
394 1