Apache Spark面试问题答案
一, Spark Driver在spark应用程序中的作用是什么?
Spark驱动程序是定义知识RDD的转换和操作并向主服务器提交请求的程序。Spark驱动程序是在机器的主节点上运行的程序,它声明对知识RDD的转换和操作。
简单来说,Spark中的驱动程序创建SparkContext,连接到给定的Spark Master。它将RDD图表联合提供给Master,无论独立集群管理器在哪里运行。
二, Apache Spark集群中的工作节点是什么?
Apache Spark遵循主/从架构,具有一个主或驱动程序进程以及多个从属或工作进程
1. master是运行main()程序的驱动程序,其中创建了spark上下文。然后,它与集群管理器交互以安排作业执行并执行任务。
2.工作程序由可以并行运行的进程组成,以执行驱动程序安排的任务。这些过程称为执行程序。
每当客户端运行应用程序代码时,驱动程序都会实例化Spark Context,将转换和操作转换为执行的逻辑DAG。然后将此逻辑DAG转换为物理执行计划,然后将其细分为较小的物理执行单元。然后,驱动程序与集群管理器交互,以协商执行应用程序代码任务所需的资源。然后,集群管理器与每个工作节点交互,以了解每个节点中运行的执行程序的数量。
工作节点/执行者的角色:
1.执行应用程序代码的数据处理
2.读取数据并将数据写入外部源
3.将计算结果存储在内存或磁盘中。
执行程序在Spark应用程序的整个生命周期中运行。这是执行者的静态分配。用户还可以决定运行任务需要多少执行程序,具体取决于工作负载。这是执行者的动态分配。
在执行任务之前,执行程序通过集群管理器向驱动程序注册,以便驱动程序知道有多少执行程序正在运行以执行计划任务。然后,执行程序通过集群管理器开始执行工作节点调度的任务。
每当任何工作节点发生故障时,需要执行的任务将自动分配给任何其他工作节点
三,为什么变换在Spark中变得懒散?
每当在Apache Spark中执行转换操作时,它都会被懒惰地评估。在执行操作之前不会执行。Apache Spark只是将变换操作的条目添加到计算的DAG(有向无环图),这是一个没有循环的有向有限图。在该DAG中,所有操作都被分类到不同的阶段,在单个阶段中没有数据的混乱。
通过这种方式,Spark可以通过完整地查看DAG来优化执行,并将适当的结果返回给驱动程序。
例如,考虑HDFS中的1TB日志文件,其中包含错误,警告和其他信息。以下是驱动程序中正在执行的操作:
1. 创建此日志文件的RDD
2.对此RDD执行flatmap()操作,以根据制表符分隔符拆分日志文件中的数据。
3.执行filter()操作以提取仅包含错误消息的数据
4.执行first()操作以仅获取第一条错误消息。
如果热切评估上述驱动程序中的所有转换,那么整个日志文件将被加载到内存中,文件中的所有数据将根据选项卡进行拆分,现在要么需要在某处写入FlatMap的输出或将其保存在记忆中。Spark需要等到执行下一个操作,并且资源被阻塞以进行即将进行的操作。除此之外,每个操作spark都需要扫描所有记录,比如FlatMap处理所有记录然后再次在过滤操作中处理它们。
另一方面,如果所有转换都被懒惰地评估,Spark将整体查看DAG并为应用程序准备执行计划,现在该计划将被优化,操作将被合并/合并到阶段然后执行将开始。Spark创建的优化计划可提高作业效率和整体吞吐量。
通过Spark中的这种惰性评估,驱动程序和集群之间的切换次数也减少了,从而节省了内存中的时间和资源,并且计算速度也有所提高。
四,我可以在没有Hadoop的情况下运行Apache Spark吗?
是的,Apache Spark可以在没有Hadoop,独立或在云中运行。Spark不需要Hadoop集群就可以工作。Spark还可以读取并处理来自其他文件系统的数据。HDFS只是Spark支持的文件系统之一。
Spark没有任何存储层,因此它依赖于分布式存储系统之一,用于分布式计算,如HDFS,Cassandra等。
但是,在Hadoop(HDFS(用于存储)+ YARN(资源管理器))上运行Spark有很多优点,但这不是强制性要求。Spark是一种用于分布式计算的。在这种情况下,数据分布在计算机上,Hadoop的分布式文件系统HDFS用于存储不适合内存的数据。
使用Hadoop和Spark的另一个原因是它们都是开源的,并且与其他数据存储系统相比,它们可以相当容易地相互集成。
五,.在Spark中解释累加器。
这个讨论是继续问题,命名Apache Spark中可用的两种类型的共享变量。
累加器介绍:
Accumulator是Apache Spark中的共享变量,用于聚合群集中的信息。换句话说,将来自工作节点的信息/值聚合回驱动程序。(我们将在下面的会议中看到)为什么累加器:当我们在map(),filter()等操作中使用函数时,这些函数可以使用驱动程序中这些函数作用域外定义的变量。当我们将任务提交到集群时,集群上运行的每个任务都会获得这些变量的新副本,并且这些变量的更新不会传播回驱动程序。累加器降低了此限制。用例 :累加器最常见的用途之一是计算作业执行期间发生的事件以进行调试。意思是数不了。输入文件中的空白行,没有。在会话期间来自网络的错误数据包,在奥运会数据分析期间,我们必须找到我们在SQL查询中所说的年龄(年龄!='NA'),简短地查找错误/损坏的记录。例子 :scala> val record=spark.read.textFile("/home/hdadmin/wc-data-blanklines.txt")record: org.apache.spark.sql.Dataset[String]=[value: string]
scala> val emptylines=sc.accumulator(0)warning: there were two deprecation warnings; re-run with -deprecation for detailsemptylines: org.apache.spark.Accumulator[Int]=0
scala> val processdata=record.flatMap(x=> {if(x=="") emptylines +=1 x.split(" ") })
processdata: org.apache.spark.sql.Dataset[String]=[value: string]scala> processdata.collect16/12/02 20:55:15 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes输出:res0:Array [String]=Array(DataFlair,提供,培训,开,切,边缘,技术。,“”,DataFlair,是,领导,培训,提供者,我们,有,训练,1000s, ,候选人,培训,焦点,实践,方面,哪些,工业,需要,而不是理论,知识。,“”,DataFlair,帮助,组织,解决,BigData,问题。, “”,Javadoc,是一个工具,用于生成API,文档,HTML,格式,文档,注释,内容,源代码,代码。,它,可以,只下载,仅作为,部分,Java,2,SDK。,To,see,documentation,generated,by,Javadoc,tool ,, go,to,J2SE,1.5.0,API,Documentation。,“”,Javadoc,常见问题解答, - ,这,常见问题,涵盖,在哪里,到,下载,Javadoc,工具,如何,到,找到,列表,已知,错误和功能,reque ...scala> println(“空行数:”+ emptylines.value)空行数:10程序的解释和结论:在上面的例子中,我们创建了一个Accumulator [Int] 'emptylines'在这里,我们想找到没有。我们处理过程中的空白行。之后,我们应用flatMap()转换来处理我们的数据,但我们想要找出没有。空行(空白行)所以在flatMap()函数中,如果我们遇到任何空行,累加器空行增加1,否则我们按空格分割行。之后,我们检查输出和否。的空白行。我们通过调用sc.accumulator(0)来创建具有初始值的累加器,通过调用sc.accumulator(0)即spark Context.accumulator(初始值),其中返回类型为initalValue {org.apache.spark.Accumulator [T],其中T为initalValue]最后,我们调用累加器上的value()属性来访问它的值。请注意,工作节点上的任务不能访问累加器的value属性,因此在任务的上下文中,累加器是只写变量。accumulator的value()属性仅在驱动程序中可用。我们也可以算不上。在变换/动作的帮助下,我们需要一个额外的操作,但是在累加器的帮助下,我们可以计算一下。我们加载/处理数据时的空行(或更广泛的事件)。
六, Driver程序在Spark Application中的作用是什么?
驱动程序负责在集群上启动各种并行操作。驱动程序包含应用程序的main()函数。它是运行用户代码的过程,用户代码又创建SparkContext对象,创建RDD并在RDD上执行转换和操作操作。驱动程序通过SparkContext对象访问Apache Spark,该对象表示与计算集群的连接(从Spark 2.0开始,我们可以通过SparkSession访问SparkContext对象)。驱动程序负责将用户程序转换为称为任务的物理执行单元。它还定义了集群上的分布式数据集,我们可以对数据集(转换和操作)应用不同的操作。Spark程序创建一个名为Directed Acyclic graph的逻辑计划,当驱动程序运行时,该计划由驱动程序转换为物理执行计划。
七,如何识别给定的操作是程序中的Transformation / Action?
为了识别操作,需要查看操作的返回类型。
如果操作在这种情况下返回一个新的RDD,则操作是“转换”如果操作返回除RDD之外的任何其他类型,则操作为“Action”
因此,Transformation从现有RDD(前一个)构造新的RDD,而Action根据应用的转换计算结果,并将结果返回给驱动程序或将其保存到外部存储
八,命名Apache Spark中可用的两种类型的共享变量。
Apache Spark中有两种类型的共享变量:
(1)累加器:用于聚合信息。
(2)广播变量:有效地分配大值。
当我们将函数传递给Spark时,比如说filter(),这个函数可以使用在函数外部但在Driver程序中定义的变量,但是当我们将任务提交给Cluster时,每个工作节点都获得一个新的变量副本并更新从这些变量不会传播回Driver程序。
累加器和广播变量用于消除上述缺点(即我们可以将更新的值恢复到我们的驱动程序)
九,使用Apache Spark时开发人员常见的错误是什么?
1)DAG的管理- 人们经常在DAG控制中犯错误。始终尝试使用reducebykey而不是groupbykey。ReduceByKey和GroupByKey可以执行几乎相似的功能,但GroupByKey包含大数据。因此,尽量使用ReduceByKey。始终尽量减少地图的侧面。尽量不要在分区中浪费更多时间。尽量不要随便洗牌。尽量远离Skews和分区。
2)保持随机块的所需大小
十,默认情况下,Apache Spark中的RDD中创建了多少个分区?
默认情况下,Spark为文件的每个块创建一个分区(对于HDFS)HDFS块的默认块大小为64 MB(Hadoop版本1)/ 128 MB(Hadoop版本2)。但是,可以明确指定要创建的分区数。例1:没有指定分区val rdd1=sc.textFile("/home/hdadmin/wc-data.txt")例2:以下代码创建了10个分区的RDD,因为我们指定了no。分区。val rdd1=sc.textFile("/home/hdadmin/wc-data.txt", 10)可以通过以下方式查询分区数:rdd1.partitions.lengthORrdd1.getNumPartitions最佳情况是我们应该按照以下方式制作RDD:Cluster中的核心数量=否。分区
十一,.为什么我们需要压缩以及支持的不同压缩格式是什么?
在Big Data中,当我们使用压缩时,它可以节省存储空间并减少网络开销。可以在将数据写入HDFS时指定压缩编码(Hadoop格式)人们也可以读取压缩数据,因为我们也可以使用压缩编解码器。以下是BigData中不同的压缩格式支持: gzip lzo bzip2 Zlib * Snappy
十二,解释过滤器转换。
Apache Spark中的filter()转换将函数作为输入。它返回一个RDD,它只有通过输入函数中提到的条件的元素。示例:val rdd1=sc.parallelize(List(10,20,40,60))val rdd2=rdd2.filter(x=> x !=10)println(rdd2.collect())产量10
十三,如何在交互式shell中启动和停止scala?
在Scala中启动交互式shell的命令:
>>>> bin / spark-shell
首先进入spark目录即
hdadmin@ubuntu:~$ cd spark-1.6.1-bin-hadoop2.6/
hdadmin@ubuntu:~/spark-1.6.1-bin-hadoop2.6$ bin/spark-shell
shisi
-------------------------------------------------- -------------------------------------------------- --------------------------
在Scala中停止交互式shell的命令:
scala>Press (Ctrl+D)
可以看到以下消息
scala> Stopping spark context.
十四,解释sortByKey()操作
> sortByKey()是一种转换。
>它返回按键排序的RDD。
>排序可以在(1)升序OR(2)降序OR(3)自定义排序中完成
从:
http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#212_SortByKey
他们将适用于范围内具有隐式排序[K]的任何键类型K. 对于所有标准基元类型,已经存在排序对象。用户还可以为自定义类型定义自己的排序,或覆盖默认排序。将使用最近范围内的隐式排序。
当调用 (K,V)数据集
(其中k为Ordered)时,返回按键按升序或降序排序的(K,V)对数据集,如升序参数中所指定。
示例:
val rdd1=sc.parallelize(Seq(("India",91),("USA",1),("Brazil",55),("Greece",30),("China",86),("Sweden",46),("Turkey",90),("Nepal",977)))
val rdd2=rdd1.sortByKey()
rdd2.collect();
输出:
数组[(String,Int)]=(数组(巴西,55),(中国,86),(希腊,30),(印度,91),(尼泊尔,977),(瑞典,46),(火鸡,90),(美国,1)
val rdd1=sc.parallelize(Seq(("India",91),("USA",1),("Brazil",55),("Greece",30),("China",86),("Sweden",46),("Turkey",90),("Nepal",977)))
val rdd2=rdd1.sortByKey(false)
rdd2.collect();
输出:
Array [(String,Int)]=(Array(USA,1),(Turkey,90),(Sweden,46),(Nepal,977),(India,91),(Greece,30),(中国,86),(巴西,55)
十五,解释Spark中的distnct(),union(),intersection()和substract()转换
union()转换
最简单的设定操作。rdd1.union(rdd2),它输出一个包含两个来源数据的RDD。如果输入RDD中存在重复项,则union()转换的输出也将包含重复项,可以使用distinct()进行修复。
例
val u1=sc.parallelize(List("c","c","p","m","t"))
val u2=sc.parallelize(List("c","m","k"))
val result=u1.union(u2)
result.foreach{println}
输出:
c
c
p
m
t
c
m
k
十六,在apache spark中解释foreach()操作
> foreach()操作是一个动作。
>它不会返回任何值。
>它对RDD的每个元素执行输入功能。
来自:http:
//data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#39_Foreach
它在RDD中的每个项目上执行该功能。它适用于编写数据库或发布到Web服务。它为每个数据项执行参数减少功能。
例:
val mydata=Array(1,2,3,4,5,6,7,8,9,10)
val rdd1=sc.parallelize(mydata)
rdd1.foreach{x=>println(x)}
OR
rdd1.foreach{println}
输出:
1
2
3
4
5
6
7
8
9
10
十七,Apache Spark中的groupByKey vs reduceByKey
在对(K,V)对的数据集应用groupByKey()时,数据根据另一个RDD中的键值K进行混洗。在这种转变中,许多不必要的数据通过网络传输。
Spark提供了将数据存储到单个执行程序机器上的数据多于内存中数据时保存到磁盘的功能。
例:
val data=spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val group=data.groupByKey().collect()
group.foreach(println)
在对数据集(K,V)应用reduceByKey时,在对数据进行混合之前,组合具有相同密钥的同一机器上的对。
例:
val words=Array("one","two","two","four","five","six","six","eight","nine","ten")
val data=spark.sparkContext.parallelize(words).map(w=> (w,1)).reduceByKey(_+_)
data.foreach(println)
十八,.解释mapPartitions()和mapPartitionsWithIndex()
Mappartitions是一种类似于Map的转换。
在Map中,函数应用于RDD的每个元素,并返回结果RDD的每个其他元素。对于mapPartitions,该函数将应用于RDD的每个分区,而不是每个元素,并返回结果RDD的多个元素。在mapPartitions转换中,性能得到改善,因为在地图转换中消除了每个元素的对象创建。
由于mapPartitions转换适用于每个分区,因此它将字符串或int值的迭代器作为分区的输入。
请考虑以下示例:
val data=sc.parallelize(List(1,2,3,4,5,6,7,8), 2)
Map:
def sumfuncmap(numbers : Int) : Int=
{
var sum=1
return sum + numbers
}
data.map(sumfuncmap).collect
returns Array[Int]=Array(2, 3, 4, 5, 6, 7, 8, 9) //Applied to each and every element
MapPartitions:
def sumfuncpartition(numbers : Iterator[Int]) : Iterator[Int]=
{
var sum=1
while(numbers.hasNext)
{
sum=sum + numbers.next()
}
return Iterator(sum)
}
data.mapPartitions(sumfuncpartition).collect
returns
Array[Int]=Array(11, 27) // Applied to each and every element partition-wise
MapPartitionsWithIndex类似于mapPartitions,除了它还需要一个参数作为输入,它是分区的索引。
十九,Apache Spark中的Map是什么?
Map是应用于RDD中每个元素的转换,它提供了一个新的RDD作为结果。在Map转换中,用户定义的业务逻辑将应用于RDD中的所有元素。它类似于FlatMap,但与可以产生0,1或多个输出的FlatMap不同,Map只能产生一对一的输出。 映射操作将长度为N的RDD转换为另一个长度为N的RDD。
A -------> a
B -------> b
C -------> c
Map Operation
映射转换不会将数据从一个分区变为多个分区。它将使操作保持狭窄
二十,Apache Spark中的FlatMap是什么?
FlatMap是Apache Spark中的转换操作,用于从现有RDD 创建RDD。它需要RDD中的一个元素,并且可以根据业务逻辑生成0,1或多个输出。它类似于Map操作,但Map产生一对一输出。如果我们对长度为N的RDD执行Map操作,则输出RDD的长度也为N.但对于FlatMap操作,输出RDD可以根据业务逻辑的不同长度
X ------ A x ----------- a
Y ------ B y ----------- b,c
Z ----- -C z ----------- d,e,f
地图操作FlatMap操作
我们也可以说flatMap将长度为N的RDD转换为N个集合的集合,然后将其展平为单个RDD结果。
如果我们观察下面的示例data1 RDD,它是Map操作的输出,具有与数据RDD相同的元素,
但是data2 RDD没有相同数量的元素。我们还可以在这里观察data2 RDD是data1 RDD的平坦输出
pranshu @ pranshu-virtual-machine:?$ cat pk.txt
1 2 3 4
5 6 7 8 9
10 11 12
13 14 15 16 17
18 19 20
scala> val data=sc.textFile(“/ home / pranshu / pk.txt”)
17/05/17 07:08:20 WARN SizeEstimator:无法检查是否设置了UseCompressedOops; 假设是
数据:org.apache.spark.rdd.RDD [String]=/home/pranshu/pk.txt MapPartitionsRDD [1] at textFile at :24
scala> data.collect
res0:Array [String]=Array(1 2 3 4,5 6 7 8 9,10 11 12,13 14 15 16 17,18 19 20)
斯卡拉>
scala> val data1=data.map(line=> line.split(“”))
data1:org.apache.spark.rdd.RDD [Array [String]]=MapPartitionsRDD [2] at map at :26
斯卡拉>
scala> val data2=data.flatMap(line=> line.split(“”))
data2:org.apache.spark.rdd.RDD [String]=在mapMap at 的MapPartitionsRDD [3]:26
斯卡拉>
scala> data1.collect
res1:Array [Array [String]]=Array(数组(1,2,3,4),数组(5,6,7,8,9 ),数组(10,11,12),数组(13,14,15,16,17),数组(18,19,20))
斯卡拉>
scala> data2.collect
res2:Array [String]=数组(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18, 19,20)
二十一,在Spark中解释fold()操作。
fold()是一个动作。它是广泛的操作(即跨多个分区的shuffle数据并输出单个值)它将函数作为输入,具有两个相同类型的参数,并输出单个输入类型的值。它类似于reduce,但还有一个参数'ZERO VALUE'(比如初始值),它将在每个分区的初始调用中使用。
def fold(zeroValue:T)(op:(T,T)?T):T
使用给定的关联函数和中性“零值”聚合每个分区的元素,然后聚合所有分区的结果。函数op(t1,t2)允许修改t1并将其作为结果值返回以避免对象分配; 但是,它不应该修改t2。
这与在Scala等函数式语言中为非分布式集合实现的折叠操作有所不同。该折叠操作可以单独地应用于分区,然后将这些结果折叠成最终结果,而不是以某种定义的顺序将折叠顺序地应用于每个元素。对于不可交换的函数,结果可能与应用于非分布式集合的折叠的结果不同。
zeroValue:op运算符的每个分区的累积结果的初始值,以及组合的初始值来自op运算符的不同分区 - 这通常是中性元素(例如,列表连接为Nil或0为总结)
操作:用于在分区内累积结果并组合来自不同分区的结果的运算符
示例:
val rdd1=sc.parallelize(List(1,2,3,4,5),3)
rdd1.fold(5)(_+_)
输出:
Int=35
val rdd1=sc.parallelize(List(1,2,3,4,5))
rdd1.fold(5)(_+_)
输出:
Int=25
val rdd1=sc.parallelize(List(1,2,3,4,5),3)
rdd1.fold(3)(_+_)
Int=27
二十二,解释API createOrReplaceTempView()
它的基本数据集功能。它位于org.apache.spark.sql下def createOrReplaceTempView(viewName:String):Unit使用给定名称创建临时视图。此临时视图的生命周期与用于创建此数据集的SparkSession相关联。
scala> val df=spark.read.csv("/home/hdadmin/titanic_data.txt")
df: org.apache.spark.sql.DataFrame=[_c0: string, _c1: string ... 9 more fields]
scala> df.printSchema
root
|-- _c0: string (nullable=true)
|-- _c1: string (nullable=true)
|-- _c2: string (nullable=true)
|-- _c3: string (nullable=true)
|-- _c4: string (nullable=true)
|-- _c5: string (nullable=true)
|-- _c6: string (nullable=true)
|-- _c7: string (nullable=true)
|-- _c8: string (nullable=true)
|-- _c9: string (nullable=true)
|-- _c10: string (nullable=true)
scala> df.show
+---+---+---+--------------------+-------+-----------+--------------------+-------+-----------------+-----+------+
|_c0|_c1|_c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10|
+---+---+---+--------------------+-------+-----------+--------------------+-------+-----------------+-----+------+
| 1|1st| 1|Allen, Miss Elisa...|29.0000|Southampton| St Louis, MO| B-5| 24160 L221| 2|female|
| 2|1st| 0|Allison, Miss Hel...| 2.0000|Southampton|Montreal, PQ / Ch...| C26| null| null|female|
| 3|1st| 0|Allison, Mr Hudso...|30.0000|Southampton|Montreal, PQ / Ch...| C26| null|(135)| male|
| 4|1st| 0|Allison, Mrs Huds...|25.0000|Southampton|Montreal, PQ / Ch...| C26| null| null|female|
| 5|1st| 1|Allison, Master H...| 0.9167|Southampton|Montreal, PQ / Ch...| C22| null| 11| male|
| 6|1st| 1| Anderson, Mr Harry|47.0000|Southampton| New York, NY| E-12| null| 3| male|
| 7|1st| 1|Andrews, Miss Kor...|63.0000|Southampton| Hudson, NY| D-7| 13502 L77| 10|female|
| 8|1st| 0|Andrews, Mr Thoma...|39.0000|Southampton| Belfast, NI| A-36| null| null| male|
| 9|1st| 1|Appleton, Mrs Edw...|58.0000|Southampton| Bayside, Queens, NY| C-101| null| 2|female|
| 10|1st| 0|Artagaveytia, Mr ...|71.0000| Cherbourg| Montevideo, Uruguay| null| null| (22)| male|
| 11|1st| 0|Astor, Colonel Jo...|47.0000| Cherbourg| New York, NY| null|17754 L224 10s 6d|(124)| male|
| 12|1st| 1|Astor, Mrs John J...|19.0000| Cherbourg| New York, NY| null|17754 L224 10s 6d| 4|female|
| 13|1st| 1|Aubert, Mrs Leont...| NA| Cherbourg| Paris, France| B-35| 17477 L69 6s| 9|female|
| 14|1st| 1|Barkworth, Mr Alg...| NA|Southampton| Hessle, Yorks| A-23| null| B| male|
| 15|1st| 0| Baumann, Mr John D.| NA|Southampton| New York, NY| null| null| null| male|
| 16|1st| 1|Baxter, Mrs James...|50.0000| Cherbourg| Montreal, PQ|B-58/60| null| 6|female|
| 17|1st| 0|Baxter, Mr Quigg ...|24.0000| Cherbourg| Montreal, PQ|B-58/60| null| null| male|
| 18|1st| 0| Beattie, Mr Thomson|36.0000| Cherbourg| Winnipeg, MN| C-6| null| null| male|
| 19|1st| 1|Beckwith, Mr Rich...|37.0000|Southampton| New York, NY| D-35| null| 5| male|
| 20|1st| 1|Beckwith, Mrs Ric...|47.0000|Southampton| New York, NY| D-35| null| 5|female|
+---+---+---+--------------------+-------+-----------+--------------------+-------+-----------------+-----+------+
only showing top 20 rows
scala> df.createOrReplaceTempView("titanicdata")
二十三,解释Apache Spark中的values()操作
values()是一种转换。它仅返回值的RDD。
val rdd1=sc.parallelize(Seq((2,4),(3,6),(4,8),(5,10),(6,12),(7,14),(8,16),(9,18),(10,20)))
val rdd2=rdd1.values
rdd2.collect
输出:
Array[Int]=Array(4, 6, 8, 10, 12, 14, 16, 18, 20)
示例2:数据集中的值重复
val rdd1=sc.parallelize(Seq((2,4),(3,6),(4,8),(2,6),(4,12),(5,10),(5,40),(10,40)))
val rdd2=rdd1.keys
rdd2.collect
val rdd3=rdd1.values
rdd3.collect
输出:
Array[Int]=Array(2, 3, 4, 2, 4, 5, 5, 10)
Array[Int]=Array(4, 6, 8, 6, 12, 10, 40, 40)
二十四,解释Apache spark中的keys()操作。
keys()是一种转换。它返回一个密钥的RDD。val rdd1=sc.parallelize(Seq((2,4),(3,6),(4,8),(5,10),(6,12),(7,14),(8,16),(9,18),(10,20)))val rdd2=rdd1.keysrdd2.collect输出:Array[Int]=Array(2, 3, 4, 5, 6, 7, 8, 9, 10)
示例2 :(重复键 - 数据集中存在重复键)
val rdd1=sc.parallelize(Seq((2,4),(3,6),(4,8),(2,6),(4,12),(5,10),(5,40),(10,40)))
val rdd2=rdd1.keys
rdd2.collect
输出:
Array[Int]=Array(2, 3, 4, 2, 4, 5, 5, 10)
二十五,在Spark中解释textFile与fullTextFile
两者都是org.apache.spark.SparkContext的方法。文本文件() :def textFile(path:String,minPartitions:Int=defaultMinPartitions):RDD [String]从HDFS读取文本文件,本地文件系统(在所有节点上都可用)或任何Hadoop支持的文件系统URI,并将其作为字符串的RDD返回例如sc.textFile(“/ home / hdadmin / wc-data.txt”)因此它将创建RDD,其中每个单独的行都是一个元素。每个人都知道textFile的用法。wholeTextFiles():def wholeTextFiles(path:String,minPartitions:Int=defaultMinPartitions):RDD [(String,String)]从HDFS读取文本文件目录,本地文件系统(在所有节点上都可用)或任何支持Hadoop的文件系统URI。而不是创建基本RDD,wholeTextFile()返回pairRDD。例如,目录中的文件很少,因此通过使用wholeTextFile()方法,它创建了带有文件名的对RDD,路径为键,值为整个文件为字符串val myfilerdd=sc.wholeTextFiles("/home/hdadmin/MyFiles")val keyrdd=myfilerdd.keyskeyrdd.collectval filerdd=myfilerdd.valuesfilerdd.collect输出:Array [String]=Array(文件:/home/hdadmin/MyFiles/JavaSparkPi.java,文件:/home/hdadmin/MyFiles/sumnumber.txt,文件:/home/hdadmin/MyFiles/JavaHdfsLR.java,文件: /home/hdadmin/MyFiles/JavaPageRank.java,文件:/home/hdadmin/MyFiles/JavaLogQuery.java,文件:/home/hdadmin/MyFiles/wc-data.txt,文件:/ home / hdadmin / MyFiles / nosum。文本)Array [String]=Array(“/ 根据一个或多个贡献者许可协议许可给Apache Software Foundation(ASF)。有关版权所有权的其他信息,请参阅随此工作分发的NOTICE文件。 ASF许可此根据Apache许可证2.0版(“许可证”)向您提交;除非符合许可,否则您不得使用此文件。您可以在以下位置获取许可副本:http://www.apache.org/licenses/LICENSE-2.0除非适用法律要求或书面同意,否则根据许可证分发的软件按“原样”分发,不附带任何明示或暗示的担保或条件。有关权限和 的特定语言,请参阅许可证。
二十六,解释Spark中的cogroup()操作
>这是一个转变。
>它位于
org.apache.spark.rdd.PairRDDFunctions包中
def cogroup [W1,W2,W3](other1:RDD [(K,W1)],other2:RDD [(K,W2)],other3:RDD [(K,W3)]):RDD [(K,( Iterable [V],Iterable [W1],Iterable [W2],Iterable [W3]))]
对于this或other1或other2或other3中的每个键k,返回包含元组的结果RDD,该元组具有该键,other1,other2和other3中该键的值列表。
例:
val myrdd1=sc.parallelize(List((1,"spark"),(2,"HDFS"),(3,"Hive"),(4,"Flink"),(6,"HBase")))
val myrdd2=sc.parallelize(List((4,"RealTime"),(5,"Kafka"),(6,"NOSQL"),(1,"stream"),(1,"MLlib")))
val result=myrdd1.cogroup(myrdd2)
result.collect
输出:
Array [(Int,(Iterable [String],Iterable [String]))]=
Array((4,(CompactBuffer(Flink),CompactBuffer(RealTime))),
(1,(CompactBuffer(spark),CompactBuffer( stream,MLlib))),
(6,(CompactBuffer(HBase),CompactBuffer(NOSQL))),
(3,(CompactBuffer(Hive),CompactBuffer())),
(5,(CompactBuffer(),CompactBuffer(Kafka) )),
(2,(CompactBuffer(HDFS),CompactBuffer())))
二十七,解释Apache Spark中的pipe()操作
这是一种转变。def pipe(command:String):RDD [String] 将由管道元素创建的RDD返回给分叉的外部进程。通常,Spark使用Scala,Java和Python来编写程序。但是,如果这还不够,并且想要管道(注入)用其他语言(如'R')编写的数据,Spark会以pipe()方式的形式提供一般机制Spark在RDD上提供了pipe()方法。使用Spark的pipe()方法,可以编写RDD的转换,可以将RDD中的每个元素从标准输入读取为String。它可以将结果作为String写入标准输出。
二十八,.解释Spark coalesce()操作
>这是一种转变。
>它位于
org.apache.spark.rdd.ShuffledRDD包中
DEF聚结(numPartitions:中等,洗牌:布尔=假,partitionCoalescer:选项[PartitionCoalescer]=Option.empty)(隐式ORD:订购[(K,C)]=NULL):RDD [(K,C)]
返回一个缩减为numPartitions分区的新RDD。
这会导致较窄的依赖性,例如,如果从1000个分区到100个分区,则不会进行随机播放,而是100个新分区中的每个分区将声明10个当前分区。
但是,如果你正在进行激烈的合并,例如对numPartitions=1,这可能导致你的计算发生在比你想要的更少的节点上(例如,在numPartitions=1的情况下,一个节点)。为避免这种情况,您可以传递shuffle=true。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
注意:使用shuffle=true,您实际上可以合并到更大数量的分区。如果您有少量分区(例如100),这可能会使一些分区异常大,这很有用。调用coalesce(1000,shuffle=true)将导致1000个分区,并使用散列分区程序分发数据。
来自:http:
//data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#214_Coalesce
它会更改存储数据的分区数。它将原始分区与新数量的分区相结合,因此可以减少分区数量。它是重新分区的优化版本,允许数据移动,但前提是您要减少RDD分区的数量。过滤大型数据集后,它可以更有效地运行操作。
示例:
val myrdd1=sc.parallelize(1 to 1000, 15)
myrdd1.partitions.length
val myrdd2=myrdd1.coalesce(5,false)
myrdd2.partitions.length
Int=5
输出:
Int=15
Int=5
二十九,解释Spark中的repartition()操作
repartition()是一种转变。
>此函数更改参数numPartitions(numPartitions:Int)中提到的分区数
>它位于包
org.apache.spark.rdd.ShuffledRDD中
def repartition(numPartitions:Int)(隐式ord:Ordering [(K,C)]=null):RDD [(K,C)]
返回一个具有正好numPartitions分区的新RDD。
可以增加或减少此RDD中的并行度。在内部,它使用shuffle重新分配数据。
如果要减少此RDD中的分区数,请考虑使用coalesce,这可以避免执行shuffle。
来自:
http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/
重新分区将重新调整RDD中的数据,以生成您请求的最终分区数。它可以减少或增加整个网络中的分区数量和数据。
示例:
val rdd1=sc.parallelize(1 to 100, 3)
rdd1.getNumPartitions
val rdd2=rdd1.repartition(6)
rdd2.getNumPartitions
输出:
Int=3
Int=6
三十,.解释Apache Spark中的fullOuterJoin()操作
>这是转型。
>它位于
org.apache.spark.rdd.PairRDDFunctions包中
def fullOuterJoin [W](其他:RDD [(K,W)]):RDD [(K,(Option [V],Option [W]))]
执行此和其他的完全外部联接。
对于此中的每个元素(k,v),得到的RDD将包含所有对(k,(Some(v),Some(w)))用于其他w,
或对(k,(Some(v)) ,无)))如果其他元素没有密钥k。
类似地,对于其他元素(k,w),得到的RDD将包含所有对(k,(Some(v),Some(w)))中的v,
或者对(k,(None,一些(w)))如果其中没有元素具有密钥k。
使用现有的分区程序/并行级别对生成的RDD进行散列分区。
示例:
val frdd1=sc.parallelize(Seq(("Spark",35),("Hive",23),("Spark",45),("HBase",89)))
val frdd2=sc.parallelize(Seq(("Spark",74),("Flume",12),("Hive",14),("Kafka",25)))
val fullouterjoinrdd=frdd1.fullOuterJoin(frdd2)
fullouterjoinrdd.collect
输出:
Array [(String,(Option [Int],Option [Int]))]=Array((Spark,(Some(35),Some(74))),(Spark,(Some(45),Some( 74))),(Kafka,(无,有些(25))),(Flume,(无,有些(12))),(Hive,(Some(23),Some(14))),(HBase, (一些(89),无)))
三十一. Expain Spark leftOuterJoin()和rightOuterJoin()操作
> leftOuterJoin()和rightOuterJoin()都是转换。
>两者都在
org.apache.spark.rdd.PairRDDFunctions包中
leftOuterJoin():
def leftOuterJoin [W](其他:RDD [(K,W)]):RDD [(K,(V,Option [W]))]
执行此和其他的左外连接。对于此中的每个元素(k,v),得到的RDD将包含w中的所有对(k,(v,Some(w))),或者对(k,(v,None))如果不包含其他元素有关键k。使用现有分区程序/并行级别对输出进行散列分区。
leftOuterJoin()在两个RDD之间执行连接,其中键必须存在于第一个RDD中
示例:
val rdd1=sc.parallelize(Seq(("m",55),("m",56),("e",57),("e",58),("s",59),("s",54)))
val rdd2=sc.parallelize(Seq(("m",60),("m",65),("s",61),("s",62),("h",63),("h",64)))
val leftjoinrdd=rdd1.leftOuterJoin(rdd2)
leftjoinrdd.collect
输出:
Array [(String,(Int,Option [Int]))]=Array((s,(59,Some(61))),(s,(59,Some(62))),(s,( 54,Some(61))),(s,(54,Some(62))),(e,(57,None)),(e,(58,None)),(m,(55,Some( 60))),(m,(55,Some(65))),(m,(56,Some(60))),(m,(56,Some(65))))
rightOuterJoin():
def rightOuterJoin [W](其他:RDD [(K,W)]):RDD [(K,(Option [V],W))]
执行此和其他的右外连接。对于其他元素(k,w),得到的RDD将包含所有对(k,(Some(v),w))的v,或者对(k,(None,w))如果没有其中的元素有关键k。使用现有的分区程序/并行级别对生成的RDD进行散列分区。
它执行两个RDD之间的连接,其中密钥必须存在于其他RDD中
例:
val rdd1=sc.parallelize(Seq(("m",55),("m",56),("e",57),("e",58),("s",59),("s",54)))
val rdd2=sc.parallelize(Seq(("m",60),("m",65),("s",61),("s",62),("h",63),("h",64)))
val rightjoinrdd=rdd1.rightOuterJoin(rdd2)
rightjoinrdd.collect
Array [(String,(Option [Int],Int))]=Array((s,(Some(59),61)),(s,(Some(59),62)),(s,(Some(( 54),61)),(s,(Some(54),62)),(h,(None,63)),(h,(None,64)),(m,(Some(55),60 )),(m,(Some(55),65)),(m,(Some(56),60)),(m,(Some(56),65)))
三十二,解释Spark join()操作
> join()是转型。
>它在包
org.apache.spark.rdd.pairRDDFunction
def join [W](其他:RDD [(K,W)]):RDD [(K,(V,W))]固定链接
返回包含所有元素对的RDD,其中包含匹配键和其他元素。
每对元素将作为(k,(v1,v2))元组返回,其中(k,v1)在此,而(k,v2)在其他元素中。在整个群集中执行散列连接。
它正在连接两个数据集。当调用类型(K,V)和(K,W)的数据集时,返回(K,(V,W))对的数据集以及每个键的所有元素对。通过leftOuterJoin,rightOuterJoin和fullOuterJoin支持外连接。
例1:
val rdd1=sc.parallelize(Seq(("m",55),("m",56),("e",57),("e",58),("s",59),("s",54)))
val rdd2=sc.parallelize(Seq(("m",60),("m",65),("s",61),("s",62),("h",63),("h",64)))
val joinrdd=rdd1.join(rdd2)
joinrdd.collect
输出:
Array [(String,(Int,Int))]=Array((m,(54,60)),(m,(54,65)),(m,(56,60)),(m,(56 ,65)),(s,(59,61)),(s,(59,62)),(s,(54,61)),(s,(54,62)))
例2:
val myrdd1=sc.parallelize(Seq((1,2),(3,4),(3,6)))
val myrdd2=sc.parallelize(Seq((3,9)))
val myjoinedrdd=myrdd1.join(myrdd2)
myjoinedrdd.collect
输出:
数组[(Int,(Int,Int))]=数组((3,(4,9)),(3,(6,9)))
三十三,解释top()和takeOrdered()操作
top()和takeOrdered()都是动作。两者都返回基于默认排序或基于用户提供的自定义排序的RDD元素。def top(num: Int)(implicit ord: Ordering[T]): Array[T]返回此RDD中的前k个(最大)元素,由指定的隐式Ordering [T]定义并维护排序。这与takeOrdered相反。def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]返回此RDD中的第一个k(最小)元素,由指定的隐式Ordering [T]定义并维护排序。这与top相反。示例:val myrdd1=sc.parallelize(List(5,7,9,13,51,89))myrdd1.top(3)myrdd1.takeOrdered(3)myrdd1.top(3)输出:Array[Int]=Array(89, 51, 13)Array[Int]=Array(5, 7, 9)Array[Int]=Array(89, 51, 13)
三十四,解释Spark中的first()操作
>这是一个动作。
>它返回RDD的第一个元素。
示例:
val rdd1=sc.textFile("/home/hdadmin/wc-data.txt")
rdd1.count
rdd1.first
输出:
长:20
字符串:DataFlair是领先的技术培训提供商
三十五,.解释Apache Spark中的sum(),max(),min()操作
sum():
>它将RDD中的值相加。
>它是一个包
org.apache.spark.rdd.DoubleRDDFunctions。
>它的返回类型是Double
例:
val rdd1=sc.parallelize(1 to 20)
rdd1.sum
输出:
Double=210.0
max():
>它从隐式排序(元素顺序)定义的RDD元素返回一个最大值
>它是一个包org.apache.spark.rdd
例:
val rdd1=sc.parallelize(List(1,5,9,0,23,56,99,87))
rdd1.max
输出:
Int=99
min():
>它从隐式排序(元素顺序)定义的RDD元素返回一个min值
>它是一个包org.apache.spark.rdd
例:
val rdd1=sc.parallelize(List(1,5,9,0,23,56,99,87))
rdd1.min
输出:
Int=0
三十六,.解释Apache Spark RDD中的countByValue()操作
这是一个动作它返回RDD中每个唯一值的计数作为本地Map(作为Map to driver program)(value,countofvalues)对必须小心使用此API,因为它将值返回给驱动程序,因此它仅适用于较小的值。例:val rdd1=sc.parallelize(Seq(("HR",5),("RD",4),("ADMIN",5),("SALES",4),("SER",6),("MAN",8)))rdd1.countByValue输出:scala.collection.Map [(String,Int),Long]=Map((HR,5) - > 1,(RD,4) - > 1,(SALES,4) - > 1,(ADMIN,5 ) - > 1,(MAN,8) - > 1,(SER,6) - > 1)val rdd2=sc.parallelize{Seq(10,4,3,3)}rdd2.countByValue输出:scala.collection.Map [Int,Long]=Map(4 - > 1,3 - > 2,10 - > 1)
三十七,.解释Spark中的lookup()操作
>这是一个动作
>它返回二手QQ转让RDD中键值'key'的值列表
val rdd1=sc.parallelize(Seq(("Spark",78),("Hive",95),("spark",15),("HBase",25),("spark",39),("BigData",78),("spark",49)))
rdd1.lookup("spark")
rdd1.lookup("Hive")
rdd1.lookup("BigData")
输出:
Seq [Int]=WrappedArray(15,39,49)
Seq [Int]=WrappedArray(95)
Seq [Int]=WrappedArray(78)
三十八,解释Spark countByKey()操作
>它是一个动作操作
>返回(key,noofkeycount)对。
它计算RDD的值,该值由每个不同键的两个组件元组组成。它实际上计算每个键的元素数,并将结果作为(键,计数)对的列表返回给主键。
val rdd1=sc.parallelize(Seq(("Spark",78),("Hive",95),("spark",15),("HBase",25),("spark",39),("BigData",78),("spark",49)))
rdd1.countByKey
输出:
scala.collection.Map [String,Long]=Map(Hive - > 1,BigData - > 1,HBase - > 1,spark - > 3,Spark - > 1)
三十九,解释Spark saveAsTextFile()操作
它将RDD的内容写入文本文件,或使用字符串表示将RDD保存为文件路径目录中的文本文件。
四十,解释reduceByKey()Spark操作
> reduceByKey()是对pairRDD(包含Key / Value)进行转换的转换。
> PairRDD包含元组,因此我们需要传递元组上的运算符而不是每个元素。
>它使用关联reduce函数将值与相同的键合并。
>它是广泛的操作,因为数据混洗可能发生在多个分区上。
>它在跨分区发送数据之前在本地合并数据,以优化数据混洗。
>它将函数作为一个输入,它有两个相同类型的参数(与同一个键相关的值)和一个输入类型的元素输出(值)
>我们可以说它有三个重载函数:
reduceBykey(function)
reduceByKey(功能,分配数量)
reduceBykey(partitioner,function)
它使用关联reduce函数,它合并每个键的值。它只能与键值对中的Rdd一起使用。它是一种广泛的操作,可以从多个分区/分区中混洗数据并创建另一个RDD。它使用关联函数在本地合并数据,以优化数据混洗。组合的结果(例如,和)与值的类型相同,并且当从不同分区组合时的操作也与在分区内组合值时的操作相同。
示例:
val rdd1=sc.parallelize(Seq(5,10),(5,15),(4,8),(4,12),(5,20),(10,50)))
val rdd2=rdd1.reduceByKey((x,y)=>x+y)
OR
rdd2.collect()
输出:
数组[(Int,Int)]=数组((4,20),(10,50),(5,45)
四十一,解释Spark中的reduce()操作
> reduce()是一个动作。它是宽操作(即跨越多个分区的随机数据并输出单个值)
>它将函数作为具有两个相同类型参数的输入,并输出单个输入类型的值。
>即将RDD的元素组合在一起。
示例1:
val rdd1=sc.parallelize(1到100)
val rdd2=rdd1.reduce((x,y)=> x + y)
OR
val rdd2=rdd1.reduce(_ + _)
输出:
rdd2:Int=5050
示例2:
val rdd1=sc.parallelize(1到5)
val rdd2=rdd1.reduce(_ * _)
输出:
rdd2:Int=120
四十二,在Spark RDD中解释动作count()
count()是Apache Spark RDD操作中的一个操作count()返回RDD中的元素数。示例:val rdd1=sc.parallelize(List(10,20,30,40))println(rdd1.count())输出:4它返回RDD中的多个元素或项目。因此,它基本上计算数据集中存在的项目数,并在计数后返回一个数字。
四十三.解释Spark map()转换
> map()转换将函数作为输入,并将该函数应用于RDD中的每个元素。
>函数的输出将是每个输入元素的新元素(值)。
防爆。
val rdd1=sc.parallelize(List(10,20,30,40))
val rdd2=rdd1.map(x=> x * x)
println(rdd2.collect()。mkString(“,”))
四十四,解释Apache Spark中的flatMap()转换
当想要为每个输入元素生成多个元素(值)时,使用flatMap()。与map()一样,flatMap()也将函数作为输入。函数的输出是我们可以迭代的元素的List。(即函数可以为每个输入元素返回0或更多元素)简单地使用flatMap()将输入行(字符串)拆分为单词。
例
val fm1=sc.parallelize(List("Good Morning", "Data Flair", "Spark Batch"))
val fm2=fm1.flatMap(y=> y.split(" "))
fm2.foreach{println}
输出如下:
Good
Morning
Data
Flair
Spark
Batch
四十五,Apache Spark有哪些限制?
在,Apache Spark被认为是行业广泛使用的下一代Gen Big数据工具。但Apache Spark存在一定的局限性。他们是:
Apache Spark的局限性:
1.无文件管理系统
Apache Spark依赖于其他平台,如Hadoop或其他基于云的平台文件管理系统。这是Apache Spark的主要问题之一。
2.延迟
使用Apache Spark时,它具有更高的延迟。
3.不支持实时处理
在Spark Streaming中,到达的实时数据流被分成预定义间隔的批次,每批数据被视为Spark Resilient Distributed Database(RDD)。然后使用map,reduce,join等操作处理这些RDD。这些操作的结果是批量返回的。因此,它不是实时处理,但Spark接近实时数据的实时处理。微批处理在Spark Streaming中进行。
4.手动优化
手动优化是优化Spark作业所必需的。此外,它适用于特定数据集。如果我们想要在Spark中进行分区和缓存是正确的,我们需要手动控制。
少一点。算法
Spark MLlib在Tanimoto距离等许多可用算法方面落后。
6.窗口标准
Spark不支持基于记录的窗口标准。它只有基于时间的窗口标准。
7.迭代处理
在Spark中,数据分批迭代,每次迭代都是单独调度和执行的。
8.
当我们想要经济高效地处理大数据时,昂贵内存容量可能成为瓶颈,因为在内存中保存数据非常昂贵。此时内存消耗非常高,并且不以用户友好的方式处理。Spark的成本非常高,因为Apache Spark需要大量的RAM才能在内存中运行。
四十六,什么是Spark SQL?
Spark SQL是一个Spark接口,用于处理结构化和半结构化数据(定义字段即表格的数据)。它提供了一个名为DataFrame和DataSet的抽象层,我们可以轻松处理数据。可以说DataFrame就像关系数据库中的表。Spark SQL可以以Parquets,JSON,Hive等各种结构化和半结构化格式读写数据。在Spark应用程序中使用SparkSQL是使用它的最佳方式。这使我们能够加载数据并使用SQL进行查询。我们也可以将它与Python,Java或Scala中的 “常规”程序代码结合起来。
四十七,解释Spark SQL缓存和解除
当我们尝试在另一个用户使用该表时解冻Spark SQL中的表时会发生什么?因为我们可以在Spark SQL JDBC服务器中的多个用户之间使用共享缓存表。
四十八,解释Spark流媒体
rk Streaming
数据流定义为以无界序列的形式连续到达的数据。为了进一步处理,Streaming将连续流动的输入数据分离为离散单元。它是一种低延迟处理和分析流数据。
在2013年,Apache Spark Streaming被添加到Apache Spark。通过Streaming,我们可以对实时数据流进行容错,可扩展的流处理。从许多来源,如Kafka,Apache Flume,Amazon Kinesis或TCP套接字,可以进行数据摄取。此外,通过使用复杂算法,可以进行处理。用高级函数表示,例如map,reduce,join和window。最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。
在内部,通过Spark流,接收实时输入数据流并将其分成批次。然后,这些批次由Spark引擎处理,以批量生成最终结果流。
Discretized Stream或简称Spark DStream是它的基本抽象。这也代表了分成小批量的数据流。DStreams构建于Spark的核心数据抽象Spark RDD之上。Streaming可以与Spark MLlib和Spark SQL等任何其他Apache Spark组件集成。
四十九,解释Spark Streaming
Spark Streaming
数据流定义为以无界序列的形式连续到达的数据。为了进一步处理,Streaming将连续流动的输入数据分离为离散单元。它是一种低延迟处理和分析流数据。
在2013年,Apache Spark Streaming被添加到Apache Spark。通过Streaming,我们可以对实时数据流进行容错,可扩展的流处理。从许多来源,如Kafka,Apache Flume,Amazon Kinesis或TCP套接字,可以进行数据摄取。此外,通过使用复杂算法,可以进行处理。用高级函数表示,例如map,reduce,join和window。最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。
在内部,通过Spark流,接收实时输入数据流并将其分成批次。然后,这些批次由Spark引擎处理,以批量生成最终结果流。
Discretized Stream或简称Spark DStream是它的基本抽象。这也代表了分成小批量的数据流。DStreams构建于Spark的核心数据抽象Spark RDD之上。Streaming可以与Spark MLlib和Spark SQL等任何其他Apache Spark组件集成。
五十,在Apache Spark Streaming中解释DStream中的不同转换
Apache Spark Streaming中DStream中的不同转换是:
1- map(func) - 通过函数func传递源DStream的每个元素来返回一个新的DStream。
2- flatMap(func) - 与map类似,但每个输入项可以映射到0个或更多输出项。
3- filter(func) - 通过仅选择func返回true的源DStream的记录来返回新的DStream。
4- repartition(numPartitions) - 通过创建更多或更少的分区来更改此DStream中的并行度级别。
5- union(otherStream) - 返回一个新的DStream,它包含源DStream和
otherDStream中元素的并集。
6- 计数() -返回单元素的一个新的DSTREAM RDDS通过计数在源DSTREAM的每个RDD元件的数量。
7- reduce(func) - 通过使用函数func(它接受两个参数并返回一个)聚合源DStream的每个RDD中的元素,返回单元素RDD的新DStream。
8- countByValue() - 当在类型为K的元素的DStream上调用时,返回(K,Long)对的新DStream,其中每个键的值是其在源DStream的每个RDD中的频率。
9- reduceByKey(func,[numTasks]) - 当在(K,V)对的DStream上调用时,返回一个(K,V)对的新DStream,其中使用给定的reduce函数聚合每个键的值。
10- join(otherStream,[numTasks]) - 当在(K,V)和(K,W)对的两个DStream上调用时,返回一个新的DStream(K,(V,W))对与所有对每个键的元素。
11- cogroup(otherStream,[numTasks]) - 当在(K,V)和(K,W)对的DStream上调用时,返回(K,Seq [V],Seq [W])元组的新DStream。
12- transform(func) - 通过将RDD-to-RDD函数应用于源DStream的每个RDD来返回一个新的DStream。
13- updateStateByKey(func) - 返回一个新的“状态”DStream,其中通过在密钥的先前状态和密钥的新值上应用给定函数来更新每个密钥的状态。
希望以上这些多大家有所帮助,能够帮得到您说明我的努力是没有白费的,最后,希望大家多多关注下,更多精彩的文章带给大家!