spark与pyspark教程(一)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: spark与pyspark教程(一)

大数据生态圈简介


大数据生态圈可以分为7层,总的可以归纳为数据采集层、数据计算层和数据应用层。


20210611172810408.png

spark


1.简介


spark是一种计算引擎,类似于hadoop架构下mapreduce,与mapreduce不同的是将计算的结果存入hdfs分布式文件系统。spark则是写入内存中,像mysql一样可以实现实时的计算,包括SQL查询。

spark不单单支持传统批量处理应用,更支持交互式查询、流式计算、机器学习、图计算等各种应用,

spark是由scala语言开发,具备python的接口,pyspark。


2.spark组件


spark包含着多个紧密集成的组件,如图所示:


20210609132418685.png

2.1 spark core


实现spark基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。

同时也包含对弹性分布式数据集(RDD),RDD表示分布在多个计算节点上可以并行操作的元素集合。


2.2 spark sql


spark sql用来操作结构化数据的程序包,我们可以使用sql或者hive语言来查询数据。


2.3 spark streaming


spark streaming上对实时数据进行流式计算的组件。例如:在网页服务日志,或者在网络服务中用户提交的状态更新组成的队列。


2.4 mlib


mlib提供机器学习功能程序库,提供多种机器学习算法


2.5 graphx


Graphx用来操作图,可以进行并行的图计算


2.6 集群管理器


Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计

算。为了实现这样的要求,同时获得最大灵活性,Spark 支持在各种集群管理器。


搭建spark集群


  • 步骤1:搭建hadoop单机和伪分布式环境
  • 步骤2:构造分布式hadoop集群
  • 步骤3:构造分布式spark集群

3.RDD编程


3.1RDD基础


实例1:读取外部数据集,并调用转化操作filter提取包含“python”的字符串,并调用first()行动,返回第一个包含python的字符串。


#初始化SparkContext
import pyspark
from pyspark import SparkContext,SparkConf
#配置应用
conf=SparkConf().setMaster("local").setAppName("My App")
#基于sparkconf创建一个sparkcontext
sc=SparkContext(conf=conf)
#读取外部数据
lines=sc.textFile("README.md")
pythonlines=lines.filter(lambda line:"python" in line)
pythonlines.first()
out:u'## Interactive Python Shell'


实例2:spark的RDD会对每次行动进行重新计算,如果想复用同一个RDD,使用RDD.persist(),将RDD内容保存到内存中


pythonlines.persist
pythonlines.count()
pythonlines.first()
out:u'## Interactive Python Shell'


3.2创建RDD


实例1:将程序中一个已有集合传递给SparkContext的parallelize()


#内部创建数据
lines=sc.parallelize(["pandas","i like pandas"])
#外部读取数据
lines=sc.textFile("/path/to/README.md")


3.3RDD操作


3.3.1 转化操作


实例1:假定有一个日志文件log.txt,内部含若干信息,希望提取出其中的错误信息


inputRDD=sc.textFile("log.txt")
errorsRDD=inputRDD.filter(lambda x:"error" in x)


实例2:打印包含error或warning的行数


errorsRDD=inputRDD.filter(lambda x:"error" in x)
warningsRDD=inputRDD.filter(lamdba x:"warning" in x)
badlinesRDD=errorsRDD.union(warningsRDD)

20210609163935521.png

3.3.2 行动操作


实例1:输出badlinesRDD的一些信息,count()返回计数结果,take()收集RDD部分元素,collect()获取整个RDD数据


print("Input had"+badlinesRDD.count()+"concerning lines")
print("here are 10 examples:")
for line in badlinesRDD.take(10):
    print line


3.4向spark传递函数


实例1:


#1
word=rdd.filter(lambda s:"error" in s)
#2
def containserrors(s):
    return "error" in s
word=rdd.filter(containserror)


实例2:


class wordfunctions(object):
      def getmatchesnoreference(self,rdd):
      query=self.query
      return rdd.filter(lambda x:query in x)


3.5常见转化操作和行动操作


3.5.1 基本RDD


map()和filter()

实例1:计算RDD中各值的平方


nums=sc.parallelize([1,2,3,4])
squared=nums.map(lambda x:x*x).collect()
for num in squared:
    print "%i "(num)


实例2:使用flatMap()将行数据划分为单词


lines=sc.parallelize(["hello world","hi"])
words=lines.flatMap(lambda line:line.split(" "))
words.first()


其他转化操作:

集合操作

RDD笛卡儿积


转化操作列表


1687188065150.png


行动操作列表


1687188087767.png

4.键值对操作


4.1 创建Pair RDD


集合:(key,value)


pairs = lines.map(lambda x: (x.split(" ")[0], x))
• 1


对键值对集合{(1, 2), (3, 4), (3, 6)}为例


转化操作:


1687188117805.png


针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)})


1687188133072.png

20210610162850593.png

import os
import pyspark
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc = SparkContext(conf=conf)
# 使用 parallelize方法直接实例化一个RDD
rdd = sc.parallelize(range(1,11),4) # 这里的 4 指的是分区数量
rdd.take(100)
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
"""
----------------------------------------------
                Transform算子解析
----------------------------------------------
"""
# 以下的操作由于是Transform操作,因为我们需要在最后加上一个collect算子用来触发计算。
# 1. map: 和python差不多,map转换就是对每一个元素进行一个映射
rdd = sc.parallelize(range(1, 11), 4)
rdd_map = rdd.map(lambda x: x*2)
print("原始数据:", rdd.collect())
print("扩大2倍:", rdd_map.collect())
# 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 扩大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
# 2. flatMap: 这个相比于map多一个flat(压平)操作,顾名思义就是要把高维的数组变成一维
rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"])
print("原始数据:", rdd2.collect())
print("直接split之后的map结果:", rdd2.map(lambda x: x.split(" ")).collect())
print("直接split之后的flatMap结果:", rdd2.flatMap(lambda x: x.split(" ")).collect())
# 直接split之后的map结果: [['hello', 'SamShare'], ['hello', 'PySpark']]
# 直接split之后的flatMap结果: ['hello', 'SamShare', 'hello', 'PySpark']
# 3. filter: 过滤数据
rdd = sc.parallelize(range(1, 11), 4)
print("原始数据:", rdd.collect())
print("过滤奇数:", rdd.filter(lambda x: x % 2 == 0).collect())
# 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 过滤奇数: [2, 4, 6, 8, 10]
# 4. distinct: 去重元素
rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32])
print("原始数据:", rdd.collect())
print("去重数据:", rdd.distinct().collect())
# 原始数据: [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
# 去重数据: [4, 8, 16, 32, 2]
# 5. reduceByKey: 根据key来映射数据
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print("原始数据:", rdd.collect())
print("原始数据:", rdd.reduceByKey(add).collect())
# 原始数据: [('a', 1), ('b', 1), ('a', 1)]
# 原始数据: [('b', 1), ('a', 2)]
# 6. mapPartitions: 根据分区内的数据进行映射操作
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator):
    yield sum(iterator)
print(rdd.collect())
print(rdd.mapPartitions(f).collect())
# [1, 2, 3, 4]
# [3, 7]
# 7. sortBy: 根据规则进行排序
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
# 8. subtract: 数据集相减, Return each value in self that is not contained in other.
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
print(sorted(x.subtract(y).collect()))
# [('a', 1), ('b', 4), ('b', 5)]
# 9. union: 合并两个RDD
rdd = sc.parallelize([1, 1, 2, 3])
print(rdd.union(rdd).collect())
# [1, 1, 2, 3, 1, 1, 2, 3]
# 10. interp: 取两个RDD的交集,同时有去重的功效
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5, 2, 3])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
print(rdd1.interp(rdd2).collect())
# [1, 2, 3]
# 11. cartesian: 生成笛卡尔积
rdd = sc.parallelize([1, 2])
print(sorted(rdd.cartesian(rdd).collect()))
# [(1, 1), (1, 2), (2, 1), (2, 2)]
# 12. zip: 拉链合并,需要两个RDD具有相同的长度以及分区数量
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
print(x.collect())
print(y.collect())
print(x.zip(y).collect())
# [0, 1, 2, 3, 4]
# [1000, 1001, 1002, 1003, 1004]
# [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
# 13. zipWithIndex: 将RDD和一个从0开始的递增序列按照拉链方式连接。
rdd_name = sc.parallelize(["LiLei", "Hanmeimei", "Lily", "Lucy", "Ann", "Dachui", "RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())
# [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]
# 14. groupByKey: 按照key来聚合数据
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.collect())
print(sorted(rdd.groupByKey().mapValues(len).collect()))
print(sorted(rdd.groupByKey().mapValues(list).collect()))
# [('a', 1), ('b', 1), ('a', 1)]
# [('a', 2), ('b', 1)]
# [('a', [1, 1]), ('b', [1])]
# 15. sortByKey:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortByKey(True, 1).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# 16. join:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
print(sorted(x.join(y).collect()))
# [('a', (1, 2)), ('a', (1, 3))]
# 17. leftOuterJoin/rightOuterJoin
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
print(sorted(x.leftOuterJoin(y).collect()))
# [('a', (1, 2)), ('b', (4, None))]
"""
----------------------------------------------
                Action算子解析
----------------------------------------------
"""
# 1. collect: 指的是把数据都汇集到driver端,便于后续的操作
rdd = sc.parallelize(range(0, 5))
rdd_collect = rdd.collect()
print(rdd_collect)
# [0, 1, 2, 3, 4]
# 2. first: 取第一个元素
sc.parallelize([2, 3, 4]).first()
# 2
# 3. collectAsMap: 转换为dict,使用这个要注意了,不要对大数据用,不然全部载入到driver端会爆内存
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
m
# {1: 2, 3: 4}
# 4. reduce: 逐步对两个元素进行操作
rdd = sc.parallelize(range(10),5)
print(rdd.reduce(lambda x,y:x+y))
# 45
# 5. countByKey/countByValue:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(sorted(rdd.countByKey().items()))
print(sorted(rdd.countByValue().items()))
# [('a', 2), ('b', 1)]
# [(('a', 1), 2), (('b', 1), 1)]
# 6. take: 相当于取几个数据到driver端
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.take(5))
# [('a', 1), ('b', 1), ('a', 1)]
# 7. saveAsTextFile: 保存rdd成text文件到本地
text_file = "./data/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)
# 8. takeSample: 随机取数
rdd = sc.textFile("./test/data/hello_samshare.txt", 4)  # 这里的 4 指的是分区数量
rdd_sample = rdd.takeSample(True, 2, 0)  # withReplacement 参数1:代表是否是有放回抽样
rdd_sample
# 9. foreach: 对每一个元素执行某种操作,不生成新的RDD
rdd = sc.parallelize(range(10), 5)
accum = sc.accumulator(0)
rdd.foreach(lambda x: accum.add(x))
print(accum.value)
# 45


5.数据读取与保存


spark支持很多种输入输出源,一部分原因spark本身基于hadoop生态圈而构建,特别说spark可以通过HadoopMapReduce所使用的InputFormat和OutputFormat接口访问。


5.1 文本文件


读取文本文件,保存文件


data=sc.textFile("file://home/README.md")
data.saveAsTextFile(outputFile)


5.2 JSON文件


import json
data=input.map(lambdax:json.loads(x))
data.filter(lambda x:x["lovesPandas"]).map(lambda x:json.dumps(x)).saveAsTextFile(outputFile)


5.3 逗号分隔值与制表符分隔值


import csv
import StringIO
def loadRecord(line):
    input=StringIO.stringIO(line)
     reader=csv.DictReader(input,fieldnames=["name","favouriteAnimal"])
    return reader.next()
input=sc.textFile(inputFile).map(loadRecord)
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
4月前
|
SQL 分布式计算 Spark
Spark 教程系列
Spark 教程系列
54 0
|
10月前
|
SQL 分布式计算 HIVE
pyspark笔记(RDD,DataFrame和Spark SQL)1
pyspark笔记(RDD,DataFrame和Spark SQL)
105 1
|
3月前
|
分布式计算 运维 Serverless
EMR Serverless Spark PySpark流任务体验报告
阿里云EMR Serverless Spark是一款全托管的云原生大数据计算服务,旨在简化数据处理流程,降低运维成本。测评者通过EMR Serverless Spark提交PySpark流任务,体验了从环境准备、集群创建、网络连接到任务管理的全过程。通过这次测评,可以看出阿里云EMR Serverless Spark适合有一定技术基础的企业,尤其是需要高效处理大规模数据的场景,但新用户需要投入时间和精力学习和适应。
7166 43
EMR Serverless Spark PySpark流任务体验报告
|
2月前
|
分布式计算 Java Serverless
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
本文以 ECS 连接 EMR Serverless Spark 为例,介绍如何通过 EMR Serverless spark-submit 命令行工具进行 Spark 任务开发。
365 7
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
|
2月前
|
分布式计算 运维 Serverless
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。
238 7
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
|
1月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
145 0
|
1月前
|
分布式计算 Java Linux
【Deepin 20系统】Linux 系统安装Spark教程及使用
在Deepin 20系统上安装和使用Apache Spark的详细教程,包括安装Java JDK、下载和解压Spark安装包、配置环境变量和Spark配置文件、启动和关闭Spark集群的步骤,以及使用Spark Shell和PySpark进行简单操作的示例。
23 0
|
3月前
|
分布式计算 运维 Serverless
通过Serverless Spark提交PySpark流任务的实践体验
EMR Serverless Spark服务是阿里云推出的一种全托管、一站式的数据计算平台,旨在简化大数据计算的工作流程,让用户更加专注于数据分析和价值提炼,而非基础设施的管理和运维。下面就跟我一起通过Serverless Spark提交PySpark流任务吧。
100 1
|
4月前
|
SQL 分布式计算 Hadoop
【Spark】Spark基础教程知识点
【Spark】Spark基础教程知识点
|
4月前
|
SQL 分布式计算 Java
Spark 基础教程:wordcount+Spark SQL
Spark 基础教程:wordcount+Spark SQL
54 0