PySpark使用
pyspark:
• pyspark = python + spark
• 在pandas、numpy进行数据处理时,一次性将数据读入 内存中,当数据很大时内存溢出,无法处理;此外,很
多执行算法是单线程处理,不能充分利用cpu性能
spark的核心概念之一是shuffle,它将数据集分成数据块, 好处是:
• 在读取数据时,不是将数据一次性全部读入内存中,而
是分片,用时间换空间进行大数据处理
• 极大的利用了CPU资源
• 支持分布式结构,弹性拓展硬件资源。
pyspark:
• 在数据结构上Spark支持dataframe、sql和rdd模型 • 算子和转换是Spark中最重要的两个动作
• 算子好比是盖房子中的画图纸,转换是搬砖盖房子。有 时候我们做一个统计是多个动作结合的组合拳,spark常 将一系列的组合写成算子的组合执行,执行时,spark会
对算子进行简化等优化动作,执行速度更快
pyspark操作: • 对数据进行切片(shuffle)
config(“spark.default.parallelism”, 3000)
假设读取的数据是20G,设置成3000份,每次每个进程
(线程)读取一个shuffle,可以避免内存不足的情况
• 设置程序的名字
appName(“taSpark”)
• 读文件
data = spark.read.csv(cc,header=None, inferSchema=“true”)
• 配置spark context
Spark 2.0版本之后只需要创建一个SparkSession即可
from pyspark.sql import SparkSession spark=SparkSession .builder .appName(‘hotel_rec_app’) .getOrCreate()
# Spark+python 进行wordCount from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .master("local[*]")\ .getOrCreate() # 将文件转换为RDD对象 lines = spark.read.text("input.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(lambda x, y: x + y) output = counts.collect() for (word, count) in output: print("%s: %i" % (word, count)) spark.stop()
PySpark中的DataFrame
• DataFrame类似于Python中的数据表,允许处理大量结
构化数据
• DataFrame优于RDD,同时包含RDD的功能
# 从集合中创建RDD rdd = spark.sparkContext.parallelize([ (1001, "张飞", 8341, "坦克"), (1002, "关羽", 7107, "战士"), (1003, "刘备", 6900, "战士") ]) # 指定模式, StructField(name,dataType,nullable) # name: 该字段的名字,dataType:该字段的数据类型, nullable: 指示该字段的值是否为空 from pyspark.sql.types import StructType, StructField, LongType, StringType # 导入类型 schema = StructType([ StructField("id", LongType(), True), StructField("name", StringType(), True), StructField("hp", LongType(), True), #生命值 StructField("role_main", StringType(), True) ]) # 对RDD应用该模式并且创建DataFrame heros = spark.createDataFrame(rdd, schema) heros.show() # 利用DataFrame创建一个临时视图 heros.registerTempTable("HeroGames") # 查看DataFrame的行数 print(heros.count()) # 使用自动类型推断的方式创建dataframe data = [(1001, "张飞", 8341, "坦克"), (1002, "关羽", 7107, "战士"), (1003, "刘备", 6900, "战士")] df = spark.createDataFrame(data, schema=['id', 'name', 'hp', 'role_main']) print(df) #只能显示出来是DataFrame的结果 df.show() #需要通过show将内容打印出来 print(df.count()) 3 DataFrame[id: bigint, name: string, hp: bigint, role_main: string] | id|name| hp|role_main| +----+-------+-----+-------------+ |1001|张飞|8341| 坦克| |1002|关羽|7107| 战士| |1003|刘备|6900| 战士| +----+-------+-----+-------------+ 3
从CSV文件中读取 heros = spark.read.csv("./heros.csv", header=True, inferSchema=True) heros.show() • 从MySQL中读取 df = spark.read.format('jdbc').options( url='jdbc:mysql://localhost:3306/wucai?useUnicode=true& useJDBCCompliantTimezoneShift=true&useLegacyDatetim eCode=false&serverTimezone=Asia/Shanghai', dbtable='heros', user='root', password='passw0rdcc4' ).load() print('连接JDBC,调用Heros数据表') df.show()