在spark设置并行度一般通过两种方式来设置:
1.spark.default.parrallelism
2.textFile()传入第二个参数,指定partition数量
使用spark sql的时候会出现什么问题?
但是如果使用来spark sql,用spark sql的那个stage的并行度,你没办法自己指定,因为spark sql 自己会默认根据hive表对应的hdfs的block,自动设置spark sql查询所在的那个stage的并行度。
你自己通过spark.default.parallelism参数指定的并行度,只会在没有spark sql的stage中生效。
比如:你的第一个stage,用spark sql从hive表中查询了一些数据,然后做了一些transformation操作,接着做了一个shuffle操作(例如groupByKey);下一个stage,在stage之后,做了一些transformation操作。
hive表,对应了一个hdfs文件,有20个block;你自己设置了 spark.default.parallelish参数为100;
你的第一个stage的并行度,是不受你设置的参数控制的,就只有20task;第二个stage的并行度,才是你自己设置的100;
这样会产生的问题就是:在第一个stage中,可能有非常复杂的业务逻辑或者算法,如果只有默认的20个并行度的话,每个task要处理很大的数据量,这就会导致第一个stage执行的速度特别慢。而第二个就很快。
解决方法
直接对spark sql查询出来的rdd使用repartition,进行重新分区。
三种设置方式:
- 直接设置分区数量
dataFrame.repartition(10)
根据字段进行分区,分区数量由 spark.sql.shuffle.partition 决定
dataFrame.repartition($"name")
根据字段进行分区,将获得100个分区的DataFrame,这种方式可以在join的时候极大的提高效率,但是同时得注意出现数据倾斜的问题
dataFrame.repartition(100,$"name")
首先遵循sql规范,然后可以提高你的并行度,最后,聚合的sql肯定会遇到shuffle,这就需要你解决好shuffle的问题,下面是我这你的一些技巧,希望对你有帮助
/**
* @author BlueCat丶懒猫
* @title: SparkShuffleSolutions
* @date 2019/11/18 12:37
* @desc:
* 2.1 数据倾斜原理
* 在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,此时如果某个key对应的数据量特别大的话,就会发生数据倾斜
* 2.2 数据倾斜问题发现与定位
* 通过Spark Web UI来查看当前运行的stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。
* 知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,
* 这部分代码中肯定会有一个shuffle类算子。通过countByKey查看各个key的分布。
* 2.3 数据倾斜解决方案
* 2.3.1 过滤少数导致倾斜的key
* 2.3.2 提高shuffle操作的并行度
* 2.3.3 局部聚合和全局聚合 => solution1
* 2.3.4 将reduce join转为map join((小表几百M或者一两G)) => solution2
* 2.3.5 采样倾斜key并分拆join操作(join的两表都很大,但仅一个RDD的几个key的数据量过大) => solution3
* 2.3.6 使用随机前缀和扩容RDD进行join(RDD中有大量的key导致数据倾斜) => solution4
* 4 spark shuffle参数调优
* spark.shuffle.file.buffer
* 默认值:32k
* 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
* 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
* spark.reducer.maxSizeInFlight
* 默认值:48m
* 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
* 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。 * spark.shuffle.io.maxRetries
* 默认值:3
* 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
* 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
* spark.shuffle.io.retryWait
* 默认值:5s
* 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
* 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
* spark.shuffle.memoryFraction
* 默认值:0.2
* 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
* 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
* spark.shuffle.manager
* 默认值:sort
* 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。 * 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。 * spark.shuffle.sort.bypassMergeThreshold * 默认值:200 * 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。 * 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。 * spark.shuffle.consolidateFiles * 默认值:false * 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。 * 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
*/
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。