前言
要想了解PySpark能够干什么可以去看看我之前写的文章,里面很详细介绍了Spark的生态:
Spark框架深度理解一:开发缘由及优缺点
Spark框架深度理解二:生态圈
Spark框架深度理解三:运行架构、核心数据集RDD
PySpark只是通过JVM转换使得Python代码能够在Spark集群上识别运行。故Spark的绝大多数功能都可以被Python程序使用。
上篇文章:一文速学-PySpark数据分析基础:PySpark原理详解
已经把PySpark运行原理讲的很清楚了,现在我们需要了解PySpark语法基础来逐渐编写PySpark程序实现分布式数据计算。
已搭建环境:
Spark:3.3.0
Hadoop:3.3.3
Scala:2.11.12
JDK:1.8.0_201
PySpark:3.1.2
一、PySpark基础功能
PySpark是Python中Apache Spark的接口。它不仅可以使用Python API编写Spark应用程序,还提供了PySpark shell,用于在分布式环境中交互分析数据。PySpark支持Spark的大多数功能,如Spark SQL、DataFrame、Streaming、MLlib(机器学习)和Spark Core。
1.Spark SQL 和DataFrame
Spark SQL是用于结构化数据处理的Spark模块。它提供了一种称为DataFrame的编程抽象,是由SchemaRDD发展而来。不同于SchemaRDD直接继承RDD,DataFrame自己实现了RDD的绝大多数功能。可以把Spark SQL DataFrame理解为一个分布式的Row对象的数据集合。
Spark SQL已经集成在spark-shell中,因此只要启动spark-shell就可以使用Spark SQL的Shell交互接口。如果在spark-shell中执行SQL语句,需要使用SQLContext对象来调用sql()方法。Spark SQL对数据的查询分成了两个分支:SQLContext和HiveContext,其中HiveContext继承了SQLContext,因此HiveContext除了拥有SQLContext的特性之外还拥有自身的特性。
Spark SQL允许开发人员直接处理RDD,同时也可查询例如在 Apache Hive上存在的外部数据。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析。
2.Pandas API on Spark
Spark上的pandas API可以扩展使用 python pandas库。
轻松切换到pandas API和PySpark API上下文,无需任何开销。
有一个既适用于pandas(测试,较小的数据集)又适用于Spark(分布式数据集)的代码库。
熟练使用pandas的话很快上手
3.Streaming
Apache Spark中的Streaming功能运行在Spark之上,支持跨Streaming和历史数据的强大交互和分析应用程序,同时继承了Spark的易用性和容错特性。Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。
4.MLBase/MLlib
MLlib构建在Spark之上,是一个可扩展的机器学习库,它提供了一组统一的高级API,帮助用户创建和调整实用的机器学习管道。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。
ML Optimizer会选择它认为最适合的已经在内部实现好了的机器学习算法和相关参数,来处理用户输入的数据,并返回模型或别的帮助分析的结果;
MLI 是一个进行特征抽取和高级ML编程抽象的算法实现的API或平台;
MLlib是Spark实现一些常见的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维以及底层优化,该算法可以进行可扩充; MLRuntime 基于Spark计算框架,将Spark的分布式计算应用到机器学习领域。
5.Spark Core
Spark Core是Spark平台的底层通用执行引擎,所有其他功能都构建在其之上。它提供了RDD(弹性分布式数据集)和内存计算能力。
二、PySpark依赖
Dependencies
Package 最低版本限制 Note
pandas 1.0.5 支撑Spark SQL
Numpy 1.7 满足支撑MLlib基础API
pyarrow 1.0.0 支撑Spark SQL
Py4j 0.10.9.5 要求
pandas 1.0.5 pandas API on Spark需要
pyarrow 1.0.0 pandas API on Spark需要
Numpy 1.14 pandas API on Spark需要
请注意,PySpark需要Java 8或更高版本,并正确设置Java_HOME。如果使用JDK 11,请设置Dio.netty.tryReflectionSetAccessible=true 以获取与箭头相关的功能。
AArch64(ARM64)用户注意:PyArrow是PySpark SQL所必需的,但PyArrow 4.0.0中引入了对AArch64的PyArrow支持。如果由于PyArrow安装错误导致PyArrow安装在AArch64上失败,可以按如下方式安装PyArrow>=4.0.0:
pip install "pyarrow>=4.0.0" --prefer-binary
三、DataFrame
PySpark应用程序从初始化SparkSession开始,SparkSession是PySpark的入口点,如下所示。如果通过PySpark可执行文件在PySpark shell中运行它,shell会自动在变量spark中为用户创建会话。
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()
1.创建
PySpark DataFrame能够通过pyspark.sql.SparkSession.createDataFrame创建,通常通过传递列表(list)、元组(tuples)和字典(dictionaries)的列表和pyspark.sql.Rows,Pandas DataFrame,由此类列表组成的RDD转换。pyspark.sql.SparkSession.createDataFrame接收schema参数指定DataFrame的架构(优化可加速)。省略时,PySpark通过从数据中提取样本来推断相应的模式。
创建不输入schema格式的DataFrame
from datetime import datetime, date import pandas as pd from pyspark.sql import Row df = spark.createDataFrame([ Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)), Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)), Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)) ]) df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
创建带有schema的DataFrame
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)) ], schema='a long, b double, c string, d date, e timestamp') df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
从Pandas DataFrame创建
pandas_df = pd.DataFrame({ 'a': [1, 2, 3], 'b': [2., 3., 4.], 'c': ['string1', 'string2', 'string3'], 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)], 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)] }) df = spark.createDataFrame(pandas_df) df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
通过由元组列表组成的RDD创建
rdd = spark.sparkContext.parallelize([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)) ]) df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e']) df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
以上的DataFrame格式创建的都是一样的。
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: double (nullable = true)
|-- c: string (nullable = true)
|-- d: date (nullable = true)
|-- e: timestamp (nullable = true)
2.查看
DataFrame.show()
使用格式:
df.show(<int>)
df.show(1)
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row
spark.sql.repl.eagerEval.enabled
spark.sql.repl.eagerEval.enabled用于在notebooks(如Jupyter)中快速生成PySpark DataFrame的配置。控制行数可以使用spark.sql.repl.eagerEval.maxNumRows。
spark.conf.set('spark.sql.repl.eagerEval.enabled', True) df
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows',1) df
纵向显示
行也可以垂直显示。当行太长而无法水平显示时,纵向显示就很明显。
df.show(1, vertical=True)
-RECORD 0------------------
a | 1
b | 2.0
c | string1
d | 2000-01-01
e | 2000-01-01 12:00:00
only showing top 1 row
查看DataFrame格式和列名
df.columns
['a', 'b', 'c', 'd', 'e']
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: double (nullable = true)
|-- c: string (nullable = true)
|-- d: date (nullable = true)
|-- e: timestamp (nullable = true)
查看统计描述信息
df.select("a", "b", "c").describe().show()
+-------+---+---+-------+
|summary| a| b| c|
+-------+---+---+-------+
| count| 3| 3| 3|
| mean|2.0|3.0| null|
| stddev|1.0|1.0| null|
| min| 1|2.0|string1|
| max| 3|4.0|string3|
+-------+---+---+-------+
DataFrame.collect()将分布式数据收集到驱动程序端,作为Python中的本地数据。请注意,当数据集太大而无法容纳在驱动端时,这可能会引发内存不足错误,因为它将所有数据从执行器收集到驱动端。
df.collect()
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]
为了避免引发内存不足异常可以使用DataFrame.take()或者是DataFrame.tail():
df.take(1)
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]
df.tail(1)
[Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]
PySpark DataFrame转换为Pandas DataFrame
PySpark DataFrame还提供了到pandas DataFrame的转换,以利用pandas API。注意,toPandas还将所有数据收集到driver端,当数据太大而无法放入driver端时,很容易导致内存不足错误。
df.toPandas()
3.查询
PySpark DataFrame是惰性计算的,仅选择一列不会触发计算,但它会返回一个列实例:
df.a
Column<'a'>
大多数按列操作都返回列:
from pyspark.sql import Column from pyspark.sql.functions import upper type(df.c) == type(upper(df.c)) == type(df.c.isNull())
True
上述生成的Column可用于从DataFrame中选择列。例如,DataFrame.select()获取返回另一个DataFrame的列实例:
df.select(df.c).show()
+-------+
| c|
+-------+
|string1|
|string2|
|string3|
+-------+
添加新列实例:
df.withColumn('upper_c', upper(df.c)).show()
+---+---+-------+----------+-------------------+-------+
| a| b| c| d| e|upper_c|
+---+---+-------+----------+-------------------+-------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+
条件查询DataFrame.filter()
df.filter(df.a == 1).show()
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
4.运算
Pandas_udf
PySpark支持各种UDF和API,允许用户执行Python本机函数。另请参阅最新的Pandas UDF( Pandas UDFs)和Pandas Function API( Pandas Function APIs)。例如,下面的示例允许用户在Python本机函数中直接使用pandas Series中的API。
Apache Arrow in PySpark
import pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf('long') def pandas_plus_one(series: pd.Series) -> pd.Series: # Simply plus one by using pandas Series. return series + 1 df.select(pandas_plus_one(df.a)).show()
+------------------+
|pandas_plus_one(a)|
+------------------+
| 2|
| 3|
| 4|
+------------------+
DataFrame.mapInPandas
DataFrame.mapInPandas允许用户在pandas DataFrame中直接使用API,而不受结果长度等任何限制。
def pandas_filter_func(iterator): for pandas_df in iterator: yield pandas_df[pandas_df.a == 1] df.mapInPandas(pandas_filter_func, schema=df.schema).show()
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
5.分组
PySpark DataFrame还提供了一种使用常见方法,即拆分-应用-合并策略来处理分组数据的方法。它根据特定条件对数据进行分组,对每个组应用一个函数,然后将它们组合回DataFrame。
df = spark.createDataFrame([ ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30], ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60], ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2']) df.show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
| red|banana| 1| 10|
| blue|banana| 2| 20|
| red|carrot| 3| 30|
| blue| grape| 4| 40|
| red|carrot| 5| 50|
|black|carrot| 6| 60|
| red|banana| 7| 70|
| red| grape| 8| 80|
+-----+------+---+---+
分组,然后将avg()函数应用于结果组。
df.groupby('color').avg().show()
+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
| red| 4.8| 48.0|
| blue| 3.0| 30.0|
|black| 6.0| 60.0|
+-----+-------+-------+
还可以使用pandas API对每个组应用Python自定义函数。
def plus_mean(pandas_df): return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean()) df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot| 0| 60|
| blue|banana| -1| 20|
| blue| grape| 1| 40|
| red|banana| -3| 10|
| red|carrot| -1| 30|
| red|carrot| 0| 50|
| red|banana| 2| 70|
| red| grape| 3| 80|
+-----+------+---+---+
联合分组和应用函数
df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ('time', 'id', 'v1')) df2 = spark.createDataFrame( [(20000101, 1, 'x'), (20000101, 2, 'y')], ('time', 'id', 'v2')) def asof_join(l, r): return pd.merge_asof(l, r, on='time', by='id') df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas( asof_join, schema='time int, id int, v1 double, v2 string').show()
+--------+---+---+---+
| time| id| v1| v2|
+--------+---+---+---+
|20000101| 1|1.0| x|
|20000102| 1|3.0| x|
|20000101| 2|2.0| y|
|20000102| 2|4.0| y|
+--------+---+---+---+
6.获取数据输入/输出
CSV简单易用。Parquet和ORC是高效紧凑的文件格式,读写速度更快。
PySpark中还有许多其他可用的数据源,如JDBC、text、binaryFile、Avro等。另请参阅Apache Spark文档中最新的Spark SQL、DataFrames和Datasets指南。Spark SQL, DataFrames and Datasets Guide
CSV
df.write.csv('foo.csv', header=True) spark.read.csv('foo.csv', header=True).show()
这里记录一个报错:
java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO$Windows.access0
将Hadoop安装目录下的 bin 文件夹中的 hadoop.dll 和 winutils.exe 这两个文件拷贝到 C:\Windows\System32 下,问题解决。
+---+---+-------+----------+--------------------+
| a| b| c| d| e|
+---+---+-------+----------+--------------------+
| 1|2.0|string1|2000-01-01|2000-01-01T12:00:...|
| 2|3.0|string2|2000-02-01|2000-01-02T12:00:...|
| 3|4.0|string3|2000-03-01|2000-01-03T12:00:...|
+---+---+-------+----------+--------------------+
Parquet
1. df.write.parquet('bar.parquet') 2. spark.read.parquet('bar.parquet').show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot| 6| 60|
| blue|banana| 2| 20|
| blue| grape| 4| 40|
| red|carrot| 5| 50|
| red|banana| 7| 70|
| red|banana| 1| 10|
| red|carrot| 3| 30|
| red| grape| 8| 80|
+-----+------+---+---+
ORC
df.write.orc('zoo.orc') spark.read.orc('zoo.orc').show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
| red|banana| 7| 70|
| red| grape| 8| 80|
|black|carrot| 6| 60|
| blue|banana| 2| 20|
| red|banana| 1| 10|
| red|carrot| 5| 50|
| red|carrot| 3| 30|
| blue| grape| 4| 40|
+-----+------+---+---+
四、结合Spark SQL
DataFrame和Spark SQL共享同一个执行引擎,因此可以无缝地互换使用。例如,可以将数据帧注册为表,并按如下方式轻松运行SQL:
df.createOrReplaceTempView("tableA") spark.sql("SELECT count(*) from tableA").show()
+--------+
|count(1)|
+--------+
| 8|
+--------+
此外UDF也可在现成的SQL中注册和调用
@pandas_udf("integer") def add_one(s: pd.Series) -> pd.Series: return s + 1 spark.udf.register("add_one", add_one) spark.sql("SELECT add_one(v1) FROM tableA").show()
这些SQL表达式可以直接混合并用作PySpark列。
from pyspark.sql.functions import expr df.selectExpr('add_one(v1)').show() df.select(expr('count(*)') > 0).show()