Spark笔记(pyspark)2

简介: Spark笔记(pyspark)

6.SparkSQL 数据清洗API

1.去重方法 dropDuplicates

功能:对DF的数据进行去重,如果重复数据有多条,取第一条

1698845430563.jpg2.删除有缺失值的行方法 dropna

功能:如果数据中包含null,通过dropna来进行判断,符合条件就删除这一行数据

1698845440814.jpg


3.填充缺失值数据 fillna

功能:根据参数的规则,来进行null的替换

1698845450981.jpg

7.DataFrame数据写出

spark.read.format()和df.write.format() 是DataFrame读取和写出的统一化标准API

SparkSQL 统一API写出DataFrame数据

1698845487342.jpg1698845500010.jpg

DataFrame可以从RDD转换、Pandas DF转换、读取文件、读取 JDBC等方法构建

10、SparkSQL

1.定义UDF函数

方式1语法:

udf对象 = sparksession.udf.register(参数1,参数2,参数3)

  • 参数1:UDF名称,可用于SQL风格
  • 参数2:被注册成UDF的方法名
  • 参数3:声明UDF的返回值类型

udf对象: 返回值对象,是一个UDF对象,可用于DSL风格

方式2语法:

udf对象 = F.udf(参数1, 参数2)

  • 参数1:被注册成UDF的方法名
  • 参数2:声明UDF的返回值类型

udf对象: 返回值对象,是一个UDF对象,可用于DSL风格

其中F是:from pyspark.sql import functions as F 其中,被注册成UDF的方法名是指具体的计算方法,如: def add(x, y): x + y

add就是将要被注册成UDF的方法名

2.使用窗口函数

开窗函数

开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。

开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

聚合函数和开窗函数

聚合函数是将多行变成一行, count,avg…

开窗函数是将一行变成多行;

聚合函数如果要显示其他的列必须将列加入到group by中

开窗函数可以不使用group by,直接将所有信息显示出来

开窗函数分类

1.聚合开窗函数

聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY子句、但不可以是ORDER BY子句。

2.排序开窗函数

排序函数(列) OVER(选项),这里的选项可以是ORDER BY子句,也可以是OVER(PARTITION BY子句ORDER BY子句),但不可以是PARTITION BY子句。

3.分区类型NTILE的窗口函数

1698845512517.jpg

11、PySpark参数

1.spark启动参数

spark启动任务一般通过下边这种方式:

/usr/bin/spark-submit
        --master yarn \
        --deploy-mode cluster \
        --driver-memory ${driver_memory} \
        --num-executors ${executor_num} \
        --executor-cores ${executor_cores} \
        --executor-memory ${executor_memory} \
        --conf spark.dynamicAllocation.maxExecutors=${executor_max} \
        --conf spark.driver.maxResultSize=${driver_memory} \
        --conf spark.yarn.maxAppAttempts=1 \
        --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
        --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
        --conf spark.ui.showConsoleProgress=true \
        --conf spark.executor.memoryOverhead=1g \
        --conf spark.yarn.nodemanager.localizer.cache.target-size-mb=4g \
        --conf spark.yarn.nodemanager.localizer.cache.cleanup.interval-ms=300000 \
        --files s3://learning/spark/log4j.properties \
        --py-files ../config/*.py,../util/*.py \
        --name "${WARN_SUB} => ${script} ${params}" \
        ${script} ${params}

2.参数设置

在spark中指定Python版本运行:conf spark.pyspark.python=/usr/bin/python2.7

1.2.1 --driver-memory:

一般设置1g-2g即可,如果程序中需要collect相对比较大的数据,这个参数可以适当增大

1.2.2 --num-executors | --executor-cores | --executor-memory

这三个参数是控制spark任务实际使用资源情况。其中

num-exectors*executor-memory

就是程序运行时需要的内存量(根据实际处理的数据量以及程序的复杂程度,需要针对不同的任务设置不同的参数)

一般情况下executor-cores可以设置1或者2就行了。设置的特别高,容易造成物理内存或者虚拟内存超限,最终导致任务失败。

需要注意的是,executor-memory设置最好控制在在4g以内(甚至2g),最好不要设置的特别大。(根据实际集群资源来配置)如果设置的特别大,可能会卡住整个集群,导致后续任务都无法启动。

num-executors是执行器数量,执行器越多,并行度越高,相对执行速度也会快。但是如果申请数量太多,也会造成资源的大量浪费。

一般数据量较小的任务,可以配置num-executors == 200,同时executor-memory==4g;这样申请资源大概在1TB左右。大型的任务可以根据实际情况调整num-executors即可。

num-executors

参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。

参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

参数调优建议:每个Executor进程的内存设置4G ~ 8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作业申请到的总内存量(也就是所有Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的总内存量最好不要超过资源队列最大总内存的1/3 ~ 1/2,避免你自己的Spark作业占用了队列所有的资源,导致别人的作业无法运行。

executor-cores

参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他人的作业运行。

1.2.3 --conf spark.dynamicAllocation.maxExecutors

集群任务是由yarn来管理的,启动任务之后,yarn会倾向于给每个任务分配尽可能多的executor数量,num-executors的设置并不是最大的executors数量,最大executors数量通过这个参数来控制。也就是说,一个任务最大的资源占用量 = spark.dynamicAllocation.maxExecutors * executor-memory。

1.2.4 日志级别设置

--conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties
--conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties
--files s3://learning/spark/log4j.properties

这三个配置是控制spark运行的日志输出级别的

1.2.5 spark.shuffle.memoryFraction

参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

1.2.6 spark.storage.memoryFraction

参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

3.spark调试

GC time太长,代表用于任务的内存太低,导致频繁GC,可以调小storage、shuffle的内存,增加任务内存

Peak Execution memory应该是任务用的峰值内存

shuffle read是任务读取的数据量,如果有的任务这个值明显特别高,说明出现数据倾斜

shuffle write是任务写出的数据量,同样可以表示数据倾斜

如果shuffle出现spill disk,说明shuffle内存不够,开始往硬盘写了。可以调大shuffle的内存,或者增大shuffle的partition数量。往硬盘写的数据如果不大,问题也不大。如果往硬盘溢写超过60G左右,节点可能就要崩了。

4.错误及解决方法

3.4G物理内存已经使用了3.4G(说明物理内存不够);16.9G虚拟内存已经使用了7.5G。


1698845525610.jpg

物理内存通常表示driver-memory;虚拟内存通常表示executor-memory?

Python运行spark时出现版本不同的错误

Exception: Python in worker has different version 3.9 than that in driver 3.7, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
import os
# 此处指定自己的python路径
os.environ["PYSPARK_PYTHON"] = "/miniconda3/envs/py37/bin/python"


相关文章
|
SQL 分布式计算 HIVE
pyspark笔记(RDD,DataFrame和Spark SQL)1
pyspark笔记(RDD,DataFrame和Spark SQL)
118 1
|
5月前
|
分布式计算 运维 Serverless
EMR Serverless Spark PySpark流任务体验报告
阿里云EMR Serverless Spark是一款全托管的云原生大数据计算服务,旨在简化数据处理流程,降低运维成本。测评者通过EMR Serverless Spark提交PySpark流任务,体验了从环境准备、集群创建、网络连接到任务管理的全过程。通过这次测评,可以看出阿里云EMR Serverless Spark适合有一定技术基础的企业,尤其是需要高效处理大规模数据的场景,但新用户需要投入时间和精力学习和适应。
7179 43
EMR Serverless Spark PySpark流任务体验报告
|
4月前
|
分布式计算 运维 Serverless
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。
262 7
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
|
5月前
|
分布式计算 运维 Serverless
通过Serverless Spark提交PySpark流任务的实践体验
EMR Serverless Spark服务是阿里云推出的一种全托管、一站式的数据计算平台,旨在简化大数据计算的工作流程,让用户更加专注于数据分析和价值提炼,而非基础设施的管理和运维。下面就跟我一起通过Serverless Spark提交PySpark流任务吧。
209 1
|
5月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
38 0
|
6月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
239 0
|
SQL 存储 分布式计算
pyspark笔记(RDD,DataFrame和Spark SQL)2
pyspark笔记(RDD,DataFrame和Spark SQL)
87 2
|
存储 分布式计算 资源调度
Spark笔记(pyspark)1
Spark笔记(pyspark)
107 0
|
SQL 机器学习/深度学习 分布式计算
spark与pyspark教程(一)
spark与pyspark教程(一)
401 0
|
分布式计算 Scala Spark
Scala写Spark笔记
Scala写Spark笔记
83 0