educoder的spark算子学习

简介: educoder的spark算子学习

一、spark安装

本次是在educoder这个平台上使用的,所以对于spark的安装方式是local本地模式,平台上有完整的安装步骤,在这里就不在继续叙述了,感谢理解

二、pyspark的算子学习

2.1、Transformation - map

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
    #********** Begin **********#
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc =SparkContext("local","Simple App")
    
    # 2.创建一个1到5的列表List
    data=[1,2,3,4,5]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())
    """
    使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
    需求:
        偶数转换成该数的平方
        奇数转换成该数的立方
    """
    # 5.使用 map 算子完成以上需求
    rdd_map = rdd.map(lambda x: x**2 if x%2==0 else x**3)
    # 6.使用rdd.collect() 收集完成 map 转换的元素
    print(rdd_map.collect())
    # 7.停止 SparkContext
    sc.stop()
    #********** End **********#
2.2、Transformation - mapPartitions

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
#********** Begin **********#
def f(iterator):
    list = []
    for x in iterator:
        list.append((x,len(x)))
    return list
#********** End **********#
if __name__ == "__main__":
    #********** Begin **********#
    
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    data = ["dog", "salmon", "salmon", "rat", "elephant"]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    partitions = rdd.mapPartitions(f)
    print(partitions.collect())
    
    sc.stop()
2.3、Transformation - filter

filter 函数功能是对元素进行过滤,对每个元素应用f函数,返回值为 true的元素在RDD中保留,返回值为false的元素将被过滤掉。内部实现相当于生成。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
    #********** Begin **********#
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
        sc = SparkContext("local", "Simple App")
        data = [1, 2, 3, 4, 5, 6, 7, 8]
        rdd = sc.parallelize(data)
        print(rdd.collect())
        rdd_filter = rdd.filter(lambda x: x % 2 == 0)
        print(rdd_filter.collect())
        sc.stop()
2.4、Transformation - flatMap

flatMap扁平化操作

将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD中每个集合的元素合并为一个集合,内部创建:

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
       #********** Begin **********#
       
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List
    data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] 
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())
    """
        使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
        需求:
            合并RDD的元素,例如:
                            ([1,2,3],[4,5,6])  -->  (1,2,3,4,5,6)
                            ([2,3],[4,5],[6])  -->  (1,2,3,4,5,6)
        """
    # 5.使用 filter 算子完成以上需求
    faltmap= rdd.flatMap(lambda x: x)
    # 6.使用rdd.collect() 收集完成 filter 转换的元素
    print(faltmap.collect())
    # 7.停止 SparkContextsc.stio
    sc.stop()
    #********** End **********#
2.5、Transformation - distinct

distinct 将 RDD 中的元素进行去重操作。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
    #********** Begin **********#
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    # 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List
    data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())
    """
       使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素去重,例如:
                        1,2,3,3,2,1  --> 1,2,3
                        1,1,1,1,     --> 1
       """
    # 5.使用 distinct 算子完成以上需求
    distinct = rdd.distinct()
    # 6.使用rdd.collect() 收集完成 distinct 转换的元素
    print(distinct.collect())
    # 7.停止 SparkContext
    sc.stop()
    #********** End **********#
2.6 、Transformation - sortBy

该函数最多可以传三个参数:

  • 第一个参数是一个函数,排序规则;
  • 第二个参数是 ascending ,从字面的意思大家应该可以猜到,是的,这参数决定排序后 RDD 中的元素是升序还是降序,默认是 true ,也就是升序;
  • 第三个参数是 numPartitions ,该参数决定排序后的 RDD 的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。

从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
    # ********** Begin **********#
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口}
    sc = SparkContext("local", "Simple App")
    # 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List
    data =[1, 3, 5, 7, 9, 8, 6, 4, 2]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())
    """
       使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            5,4,3,1,2  --> 1,2,3,4,5
       """
    # 5.使用 sortBy 算子完成以上需求
    by = rdd.sortBy(lambda x: x)
    # 6.使用rdd.collect() 收集完成 sortBy 转换的元素
    print(by.collect())
    # 7.停止 SparkContext
    sc.stop()
    #********** End **********#
2.7、Transformation - sortByKey

ascending参数是指排序(升序还是降序),默认是升序。numPartitions参数是重新分区,默认与上一个RDD保持一致。keyfunc参数是排序规则。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
    # ********** Begin **********#
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    # 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List
    data = [('B', 1), ('A', 2), ('C', 3)]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())
    """
       使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            [(3,3),(2,2),(1,1)]  -->  [(1,1),(2,2),(3,3)]
       """
    # 5.使用 sortByKey 算子完成以上需求
    key = rdd.sortByKey()
    # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
    print(key.collect())
    # 7.停止 SparkContext
    sc.stop()
    # ********** End **********#
2.8、Transformation - mapValues

mapValues :针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
    # ********** Begin **********#
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List
    data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]
    # 3.通过 SparkContext 并行化创建 rdd
  
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())
    """
           使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
           需求:
               元素(key,value)的value进行以下操作:
                                                偶数转换成该数的平方
                                                奇数转换成该数的立方
    """
    # 5.使用 mapValues 算子完成以上需求
    values = rdd.mapValues(lambda x: x**2 if x%2==0 else x**3)
    # 6.使用rdd.collect() 收集完成 mapValues 转换的元素
    print(values.collect())
    # 7.停止 SparkContext
    sc.stop()
    # ********** End **********#
2.9、Transformations - reduceByKey

reduceByKey 算子,只是两个值合并成一个值,比如叠加。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
    # ********** Begin **********#
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    # 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
    data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())
    """
          使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
          需求:
              元素(key-value)的value累加操作,例如:
                                                (1,1),(1,1),(1,2)  --> (1,4)
                                                (1,1),(1,1),(2,2),(2,2)  --> (1,2),(2,4)
    """
    # 5.使用 reduceByKey 算子完成以上需求
    print(rdd.reduceByKey(lambda x,y:x+y).collect())
    # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
    sc.stop()
  
2.10、Actions -pyspark常用算子

count

count():返回 RDD 的元素个数。

示例:

sc = SparkContext("local", "Simple App")
data = ["python", "python", "python", "java", "java"]
rdd = sc.parallelize(data)
print(rdd.count())

输出:

5

first

first():返回 RDD 的第一个元素(类似于take(1))。

示例:

sc = SparkContext("local", "Simple App")
data = ["python", "python", "python", "java", "java"]
rdd = sc.parallelize(data)
print(rdd.first())

输出:

python

take

take(n):返回一个由数据集的前 n 个元素组成的数组。

示例:

sc = SparkContext("local", "Simple App")
data = ["python", "python", "python", "java", "java"]
rdd = sc.parallelize(data)
print(rdd.take(2))

输出:

[‘python’, ‘python’]

reduce

reduce():通过func函数聚集 RDD 中的所有元素,该函数应该是可交换的和关联的,以便可以并行正确计算。

示例:

sc = SparkContext("local", "Simple App")
data = [1,1,1,1]
rdd = sc.parallelize(data)
print(rdd.reduce(lambda x,y:x+y))

输出:

4

collect

collect():在驱动程序中,以数组的形式返回数据集的所有元素。

示例:

sc = SparkContext("local", "Simple App")
data = [1,1,1,1]
rdd = sc.parallelize(data)
print(rdd.collect())

输出:

[1,1,1,1]

代码答案:

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
    # ********** Begin **********#
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.收集rdd的所有元素并print输出
    rdd = sc.parallelize(data)
    
    print(rdd.collect())
    print(rdd.count())
    # 5.统计rdd的元素个数并print输出
    print(rdd.first())
    # 6.获取rdd的第一个元素并print输出
    print(rdd.take(3))
    # 7.获取rdd的前3个元素并print输出
    print(rdd.reduce(lambda x,y:x+y))
    # 8.聚合rdd的所有元素并print输出
    # print(rdd.collect())
    # 9.停止 SparkContext
    sc.stop()
    # ********** End **********#

三、Spark RDD编程初级实践(scala)

基于上面的pyspark几个算子基本上已经理解了,在scala中算子的概念是一样的

3.1、数据去重
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object RemDup {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RemDup").setMaster("local")
    val sc = new SparkContext(conf)
    //输入文件fileA.txt和fileB.txt已保存在本地文件系统/root/step1_files目录中
    val dataFile = "file:///root/step1_files"
    val data = sc.textFile(dataFile, 2)
    /********** Begin **********/
    //第一步:执行过滤操作,把空行丢弃。
    val rdd1 = data.filter(_.trim().length > 0)
    
    //第二步:执行map操作,取出RDD中每个元素,去除尾部空格并生成一个(key, value)键值对。
    val rdd2 = rdd1.map(line => (line.trim, ""))
    
    //第三步:执行groupByKey操作,把所有key相同的value都组织成一个value-list。
    val rdd3 = rdd2.groupByKey()
    
    //第四步:对RDD进行重新分区,变成一个分区,
    //在分布式环境下只有把所有分区合并成一个分区,才能让所有元素排序后总体有序。
    val rdd4 = rdd3.partitionBy(new HashPartitioner(1))
    
    //第五步:执行sortByKey操作,对RDD中所有元素都按照key的升序排序。
    val rdd5 = rdd4.sortByKey()
    
    //第六步:执行keys操作,将键值对RDD中所有元素的key返回,形成一个新的RDD。
    val rdd6 = rdd5.keys
    
    //第七步:执行collect操作,以数组的形式返回RDD中所有元素。
    val rdd7 = rdd6.collect()
    
    //第八步:执行foreach操作,并使用println打印出数组中每个元素的值。
    println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。
    
    rdd7.foreach(println)
    /********** End **********/
  }
}
3.2、整合排序
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object FileSort {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("FileSort").setMaster("local")
    val sc = new SparkContext(conf)
    //输入文件file1.txt、file2.txt和file3.txt已保存在本地文件系统/root/step2_files目录中
    val dataFile = "file:///root/step2_files"
    val data = sc.textFile(dataFile, 3)
    /********** Begin **********/
    //第一步:执行过滤操作,把空行丢弃。
    val rdd1 = data.filter(_.trim().length > 0)
    
    //第二步:执行map操作,取出RDD中每个元素,去除尾部空格并转换成整数,生成一个(key, value)键值对。
    val rdd2 = rdd1.map(line => (line.trim.toInt, ""))
    
    //第三步:对RDD进行重新分区,变成一个分区,
    //在分布式环境下只有把所有分区合并成一个分区,才能让所有元素排序后总体有序。
    val rdd3 = rdd2.partitionBy(new HashPartitioner(1))
    
    //第四步:执行sortByKey操作,对RDD中所有元素都按照key的升序排序。
    val rdd4 = rdd3.sortByKey()
    
    //第五步:执行keys操作,将键值对RDD中所有元素的key返回,形成一个新的RDD。
    val rdd5 = rdd4.keys
    
    //第六步:执行map操作,取出RDD中每个元素,生成一个(key, value)键值对,
    //其中key是整数的排序位次,value是原待排序的整数。
    var index = 0
    val rdd6 = rdd5.map(t => {
      index = index + 1
      (index, t)
    })
    
    //第七步:执行collect操作,以数组的形式返回RDD中所有元素。
    val rdd7 = rdd6.collect()
    
    //第八步:执行foreach操作,依次遍历数组中每个元素,分别取出(key, value)键值对中key和value,
    //按如下格式输出:key value
    println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。
    
    rdd7.foreach(t => println(t._1 + " " + t._2))
    /********** End **********/
  }
}
3.3、求平均值
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object AvgScore {
  def main(args: Array[String]): Unit = {
  
    val conf = new SparkConf().setAppName("FileSort").setMaster("local")
    val sc = new SparkContext(conf)
    
    //输入文件AlgorithmScore.txt、DataBaseScore.txt和PythonScore.txt已保存在本地文件系统/root/step3_files目录中
    val dataFile = "file:///root/step3_files"
    val data = sc.textFile(dataFile)
    
    /********** Begin **********/
    //第一步:执行过滤操作,把空行丢弃。
    val rdd1 = data.filter(_.trim().length > 0)
    
    //第二步:执行map操作,取出RDD中每个元素(即一行文本),以空格作为分隔符将一行文本拆分成两个字符串,
    //拆分后得到的字符串封装在一个数组对象中,成为新的RDD中一个元素。
    var rdd2 = rdd1.map(line => line.split(" "))
    
    //第三步:执行map操作,取出RDD中每个元素(即字符串数组),取字符串数组中第一个元素去除尾部空格,
    //取字符串数组中第二个元素去除尾部空格并转换成整数,并由这两部分构建一个(key, value)键值对。
    val rdd3 = rdd2.map(t => (t(0).trim, t(1).trim.toInt))
    
    //第四步:执行mapValues操作,取出键值对RDD中每个元素的value,使用x=>(x,1)这个匿名函数进行转换。
    val rdd4 = rdd3.mapValues(x => (x, 1))
    
    //第五步:执行reduceByKey操作,计算出每个学生所有课程的总分数和总课程门数。
    val rdd5 = rdd4.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
    
    //第六步:执行mapValues操作,计算出每个学生的平均成绩。
    val rdd6 = rdd5.mapValues(x => (x._1.toDouble / x._2))
    
    //第七步:执行collect操作,以数组的形式返回RDD中所有元素。
    val rdd7 = rdd6.collect()
    
    //第八步:执行foreach操作,按如下格式打印出每个学生的平均成绩:姓名 成绩,其中成绩要求保留两位小数。
    println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。
    
    rdd7.foreach(t => {
      val x = t._2
      println(t._1 + " " + f"$x%1.2f")
    })
    /********** End **********/
  }
}

最后,感谢阅读,如有帮助,一键三连哈。

相关文章
|
7月前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
128 11
|
7月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
363 1
|
7月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
158 2
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
59 5
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
51 3
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
68 0
|
2月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
45 0
|
3月前
|
分布式计算 Shell Scala
学习使用Spark
学习使用Spark
110 3
|
4月前
|
分布式计算 Shell Scala
如何开始学习使用Spark?
【8月更文挑战第31天】如何开始学习使用Spark?
107 2
|
7月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)