【Spark】Spark Quick Start(快速入门翻译)

简介: 本文主要是翻译Spark官网Quick Start。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷 目录导航在右上角,感谢两个大佬(孤傲苍狼  JavaScript自动生成博文目录导航 和 juejiang 为博客园添加目录的配置总结)提供的帮助。

 本文主要是翻译Spark官网Quick Start。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷

目录导航在右上角,感谢两个大佬(孤傲苍狼  JavaScript自动生成博文目录导航 和 juejiang 为博客园添加目录的配置总结)提供的帮助。这篇文章还有个问题 scala/python/java 使用 Spark 的介绍不能像官网那样可以通过点击导航来显示不同的内容,很影响阅读。我在想办法改进

 

Quick Start

 

这个指南提供了使用Spark的快速介绍。我们会首先介绍Spark 交互式编程(使用Python或者Scala)的 API, 然后展示如何用Java、Scala 和 Python来编写应用程序。

为了使用这个指南,您需要先从 Spark 网页 下载打包发布的Spark安装包。由于我们将不会(在指南中)使用HDFS, 您可以下载任意版本的Hadoop安装包。

需要注意的是,Spark2.0 之前, Spark的主要编程接口是弹性分布式数据集(Resilient Distributed Dataset (RDD))。Spark2.0 之后, RDD 被 Dataset 取代,Dataset 和 RDD 一样是强类型,但是在底层进行了更多的优化。Spark2.0 之后仍然支持 RDD 接口,并且您可以从RDD编程指南中 获取更详细的参考。当然,我们强烈建议您选择使用Dataset, 因为它的性能比RDD更好。 查看 SQL编程指南 以得到更多关于Dataset的信息。

使用 Spark Shell 交互式编程

基本操作

Spark Shell 提供了一个简单的方式去学习 API,同时也提供了一个强大的交互式数据分析工具。它可以基于 Scala(一种在java 虚拟机上运行并因此可以很好地使用已有的java库的编程语言)或 Python 使用。在 Spark 目录下运行以下内容来开始(Sprk Shell):

  Scala 版

./bin/pyspark

Python 版

./bin/pyspark

如果你当前环境使用pip下载了 PySpark,可以使用如下下方式调用

pyspark

Spark 主要的抽象是一个被叫做 Dataset 的分布式集合。 Dataset 可以通过 Hadoop InputFormat(比如HDFS文件)或者 转换其他 Dataset 中创建。让我们通过 Spark 源目录下的 README 文件内容创建一个新的 Dataset:

Scala 版

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

Python 版

>>> textFile = spark.read.text("README.md")

你可以直接从Dataset中, 通过调用一些操作或者转化Dataset以获得一个新的Dataset来获取它的值。请阅读 API 文档(Scala / Python)  以获取更多细节

Scala 版

scala> textFile.count() // 该Dataset中的成员数量
res0: Long = 126 //  由于README.md 会随着时间的推移不断改变,所以结果可能会有所不同, 其他输出也有类似情况

scala> textFile.first() // 该Dataset的第一个成员
res1: String = # Apache Spark

Python 版

>>> textFile.count()  # 该DataFrame中的行数
126

>>> textFile.first()  # 该DataFrame的第一行
Row(value=u'# Apache Spark')

现在让我们使用该Dataset来转换成一个新的Dataset。 我们调用 filter 来返回一个新的Dataset, 其中包含这个文件内容的子集。

Scala 版

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

Python 版

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

我们可以将数据集转换和数据集操作串接在一起

Scala 版

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

Python 版

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15

更多关于Dataset的操作

Dataset操作和转换可以用来做更复杂的计算。假设我们想要找到单词数量最多的那行:

Scala 版

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

这首先将文件中的一行映射成一个整数值,并创建一个新的Dataset。调用该 Dataset 的 reduce 方法以找到最大的单词计数。map 和 reduce 的参数是 Scala 的函数字面量(闭包),并且可以使用任何语言的特性或者 Scala/Java 库。 比如, 我们可以很荣誉地调用任何地方声明地函数(方法)。我们将使用 Math.max() 方法以使这段代码易于理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

MapReduce是一种常见的数据流格式, 这是由Hadoop推广的。Spark 可以很容易地实现MapReduce流:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

这里,我们调用 flatMap 来将一个行级(以文本中的一行为一个成员(Item))的 Dataset 转换成一个 单词 级 的Dataset,然后串接调用 groupByKey 和 count 方法 来计算文件中的每个单词的数量作为(String, Long)数据对形式 的Dateset。 为了在我们的shell中统计出单词的数量, 我们可以调用 collect 方法:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

 

Python 版

>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=15)]

这首先将文件中的一行映射成一个整数值 并取一个为 “numWords” 的别名,同时创建一个新的DataFrame。调用该 Dataset 的 agg 方法以找到最大的单词计数。select 和 agg 的参数都是 Colum,我们可以使用 df.colName 方法来从一个DataFrame中获得一个 colum。我们同样可以导入 pyspark.sql.functions, 它提供了很多简易的方法从一个已有的 Colum 构建一个新的 Colum。

MapReduce是一种常见的数据流格式, 这是由Hadoop推广的。Spark 可以很容易地实现MapReduce流:

>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

这里,我们在 select 方法中使用了 explode 方法来将一个行级(以文本中的一行为一个成员(Item))的 Dataset 转换成一个 单词 级 的Dataset。然后串接调用 groupByKey 和 count 方法 来计算文件中的每个单词的数量作为一个拥有两个Colum:“word” 和 “count” 的DataFrame。 为了在我们的shell中统计出单词的数量, 我们可以调用 collect 方法:

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

 

缓存(Caching)

Spark同样支持将数据集加入到一个集群中的内存缓存中。当数据被重复访问时,这是非常有用的。比如查询一个小的热点数据集 或者 运行像PageRank 这样的迭代算法。让我们标记我们的 linesWithSpark  作为缓存数据 来作为一个例子:

Scala 版

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

Python 版

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

使用Spark来探索和缓存一个100行的文本文件看起来很蠢。有趣的是,这些方法同样可以作用在非常大的数据集中,哪怕它们被分布在数十个或上百个节点中。正如 RDD编程指南 中描述的那样, 您可以通过连接 bin/spark-shell 到一个集群中来进行以上交互式操作。

独立的应用程序

假设我们希望使用 Spark API 编写一个独立的 应用程序。  我们将分别使用Scala(带sbt),Java(带Maven) 和 Python(pip) 编写一个简单的应用程序。

Scala

我们将在 Scala 中创建一个Spark 应用程序——非常简单。 实际上,它被命名为 SimleApp.scala

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

注意,这个应用程序需要定义一个 main() 方法 而不是 继承 scala.App. scala.App 的子类可能无法正常地工作。

这个程序只是统计 Spark README 文件中包含 “a” 的行数和 包含"b" 的行数。 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME。与之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我们初始化一个SparkSeesion作为程序的一部分。

我们调用 SparkSession.builder 来构造一个 【SparkSession】,然后设置应用的名字, 最后调用 getOrCreate 方法获取一个 【SparkSession】实例。

我们的应用程序取决于Spark API, 所以我们同样需要一个 sbt 配置文件, build.sbt, 这表示 Spark 是一个依赖组件。

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"

为了使 sbt 能够正常工作, 我们需要根据经典的目录结构布局 SimpleApp.scala 和 build.sbt。一旦完成这些,我们就可以创建一个包含这个应用程序源代码的JAR包, 然后使用 spark-submit 脚本运行我们的程序。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23

 

Java


这个例子将会使用 Maven 编译一个JAR 应用程序,但是很多类似的构建系统都可以完成这些工作。

我们将创建一个简单的Spark应用程序, SimpleApp.java

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

这个程序只是统计 Spark README 文件中包含 “a” 的行数和 包含"b" 的行数。 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME。与之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我们初始化一个SparkSeesion作为程序的一部分。

为了构建这个程序, 我们同样要编写一个 Maven pom.xml 文件,这个文件将 Spark 列为一个依赖组件。请注意,Spark 构件 被标记为Scala版本

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.1</version>
    </dependency>
  </dependencies>
</project>

我们根据规范的Maven目录结构列出这些文件

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,我们可以使用 Maven 打包这个应用程序并且 通过 ./bin/spark-submit. 执行

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

 

Python

这里我们将展示如何使用Python API(PySpark)来编写一个应用程序

如果你正构建一个打包的 PySpark应用程序或库,你可以将它添加到你的 setup.py 文件中, 如下:

install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]

作为示例,我们将创建一个简单的 Spark 应用程序, SimpleApp.py:

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

这个程序只是统计 Spark README 文件中包含 “a” 的行数和 包含"b" 的行数。 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME。与之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我们初始化一个SparkSeesion作为程序的一部分。和 Scala 和 Java 例子一样, 我们使用 SparkSession 来创建 Dataset 。 对于使用自定义类或者第三方库的应用程序, 我们同样可以通过它的 --py-- files 参数将代码和依赖打包成zip文件(使用 spark-submit --help 查看细节)的形式 添加到 spark-submit。 SimpleApp 足够简单, 所以我们不用指定任何代码依赖组件。

我们使用 bin/spark-submit 脚本运行这个程序

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

如果您将PySpark通过 pip 安装到了您的环境中(eg. pip install pyspark),根据您的喜好,可以使用常规的Python解释器 或者 使用 spark-submit 来运行您的程序

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

 

下一步

祝贺您运行了您的第一个 Spark 应用程序

  关于API的深入概述,请从 RDD 编程指南SQL 编程指南 开始, 或者 查看编程指南菜单 以了解其他组件

  关于使用集群运行应用程序,请移步 部署概述

  最后, Spark 包含了几个简单的例子, 它们被保存在 example 目录下(Scala, Java, Python, R),你可以按照以下方式运行它们:

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R

 

相关文章
|
6月前
|
设计模式 SQL 分布式计算
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
73 0
|
SQL 分布式计算 大数据
大数据Spark SQL快速入门
大数据Spark SQL快速入门
164 0
|
消息中间件 分布式计算 网络协议
Spark Streaming 快速入门(实操)
Spark Streaming 快速入门(实操)
427 0
Spark Streaming 快速入门(实操)
|
SQL JSON 分布式计算
Spark SQL快速入门(基础)
Spark SQL快速入门(基础)
522 0
Spark SQL快速入门(基础)
|
分布式计算 监控 大数据
大数据Spark快速入门
大数据Spark快速入门
127 0
|
6月前
|
分布式计算 Hadoop Java
Spark_Day01:Spark 框架概述和Spark 快速入门
Spark_Day01:Spark 框架概述和Spark 快速入门
89 0
|
存储 分布式计算 并行计算
Spark GraphX 快速入门
Spark GraphX 快速入门
310 0
Spark GraphX 快速入门
|
存储 分布式计算 Scala
Spark快速入门-3-Spark的算子总结
Transformation 变换/转换算子:这类算子操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。这种变换并不触发提交作业,完成作业中间过程处理。 Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业,并将数据输出 Spark 系统。
|
存储 分布式计算 Hadoop
Spark快速入门-2-Spark的编程模型
Spark快速入门-2-Spark的编程模型
|
SQL 分布式计算 Spark
Spark SQL快速入门(进阶)(上)
Spark SQL快速入门(进阶)(上)
298 0
Spark SQL快速入门(进阶)(上)