Spark 介绍
核心概念
Spark 是 UC Berkeley AMP lab 开发的一个集群计算的框架,类似于 Hadoop,但有很多的区别。
最大的优化是让计算任务的中间结果可以存储在内存中,不需要每次都写入 HDFS,更适用于需要迭代的 MapReduce 算法场景中,可以获得更好的性能提升。
例如一次排序测试中,对 100TB 数据进行排序,Spark 比 Hadoop 快三倍,并且只需要十分之一的机器。
Spark 集群目前最大的可以达到 8000 节点,处理的数据达到 PB 级别,在互联网企业中应用非常广泛。
Spark 的特性
Hadoop 的核心是分布式文件系统 HDFS 和计算框架 MapReduces。Spark 可以替代 MapReduce,并且兼容 HDFS、Hive 等分布式存储层,良好的融入 Hadoop 的生态系统。
Spark 执行的特点
- 中间结果输出:Spark 将执行工作流抽象为通用的有向无环图执行计划(DAG),可以将多 Stage 的任务串联或者并行执行。
- 数据格式和内存布局:Spark 抽象出分布式内存存储结构弹性分布式数据集 RDD,能够控制数据在不同节点的分区,用户可以自定义分区策略。
- 任务调度的开销:Spark 采用了事件驱动的类库 AKKA 来启动任务,通过线程池的复用线程来避免系统启动和切换开销。
Spark 的优势
- 速度快,运行工作负载快 100 倍。Apache Spark 使用最先进的 DAG 调度器、查询优化器和物理执行引擎,实现了批处理和流数据的高性能。
- 易于使用,支持用 Java、Scala、Python、R 和 SQL 快速编写应用程序。Spark 提供了超过 80 个算子,可以轻松构建并行应用程序。您可以从 Scala、Python、R 和 SQL shell 中交互式地使用它。
- 普遍性,结合 SQL、流处理和复杂分析。Spark 提供了大量的库,包括 SQL 和 DataFrames、用于机器学习的 MLlib、GraphX 和 Spark 流。您可以在同一个应用程序中无缝地组合这些库。
- 各种环境都可以运行,Spark 在 Hadoop、Apache Mesos、Kubernetes、单机或云主机中运行。它可以访问不同的数据源。您可以使用它的独立集群模式在 EC2、Hadoop YARN、Mesos 或 Kubernetes 上运行 Spark。访问 HDFS、Apache Cassandra、Apache HBase、Apache Hive 和数百个其他数据源中的数据。
哪些公司在使用 Spark
日常为我们所熟知的,在国外就有 IBM Almaden(IBM 研究实验室)、Amazon(亚马逊)等,而在国内有 baidu(百度)、Tencent(腾讯)等等,包括一些其它的公司大部分都使用 Spark 来处理生产过程中产生的大量数据。更多详情可以参考链接: 谁在使用 Spark?
2.3 Spark 生态系统 BDAS
目前,Spark 已经发展成为包含众多子项目的大数据计算平台。
BDAS 是伯克利大学提出的基于 Spark 的数据分析栈(BDAS)。
其核心框架是 Spark,同时涵盖支持结构化数据 SQL 查询与分析的查询引擎 Spark SQL,提供机器学习功能的系统 MLBase 及底层的分布式机器学习库 MLlib,并行图计算框架 GraphX,流计算框架 Spark Streaming,近似查询引擎 BlinkDB,内存分布式文件系统 Tachyon,资源管理框架 Mesos 等子项目。这些子项目在 Spark 上层提供了更高层、更丰富的计算范式。
部署前准备
Spark 安装非常简单,简单到只需要下载 binary 包解压即可
安装 Spark 之前需要先安装 Java,Scala 及 Python。
- java 1.8.0
- scala 2.11.8
- python 2.7
安装Java
下载
参考清华软件源:https://mirrors.tuna.tsinghua.edu.cn/AdoptOpenJDK/
wget wget https://mirrors.tuna.tsinghua.edu.cn/AdoptOpenJDK/8/jdk/x64/linux/OpenJDK8U-jdk_x64_linux_openj9_linuxXL_8u282b08_openj9-0.24.0.tar.gz --no-check-certificate
按需求下载之后记得修改环境变量
记得配置环境变量,添加 bin 目录即可
安装Scala
下载
wget https://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz
记得配置环境变量,添加 bin 目录即可
安装Python
一般系统自带 Python2
python --version
如果需要 Python3 可以自行下载
yum -y install python3
Spark 下载
此处使用的是:Spark 2.4.8
官网上下载已经预编译好的 Spark binary,直接解压即可。
Spark 官方下载链接:http://spark.apache.org/downloads.html
下载
参考
- 清华软件源:https://mirrors.tuna.tsinghua.edu.cn/apache/spark/
- 华为软件源:https://mirrors.huaweicloud.com/apache/spark/
wget https://mirrors.huaweicloud.com/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
记得配置环境变量,添加 bin 目录即可
部署模式介绍
Spark on Mesos未尝试过,大家可以自行尝试
本文仅介绍 Standalone 模式和 Spark on Yarn模式
按照自己需求配合!
本地模式
Spark单机运行,直接解压执行start-all.sh即可,一般用于开发测试
Standalone 模式
构建一个由Master+Slave构成的Spark集群,Spark运行在集群中。
即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。
从一定程度上说,该模式是其他两种的基础。借鉴 Spark 开发模式,我们可以得到一种开发新型计算框架的一般思路:先设计出它的 standalone 模式,为了快速开发,起初不需要考虑服务(比如 master/slave)的容错性,之后再开发相应的 wrapper,将 stanlone 模式下的服务原封不动的部署到资源管理系统 yarn 或者 mesos 上,由资源管理系统负责服务本身的容错。目前 Spark 在 standalone 模式下是没有任何单点故障问题的,这是借助 zookeeper 实现的,思想类似于 Hbase master 单点故障解决方案。将 Spark standalone 与 MapReduce 比较,会发现它们两个在架构上是完全一致的:
- 都是由 master/slaves 服务组成的,且起初 master 均存在单点故障,后来均通过 zookeeper 解决(Apache MRv1 的 JobTracker 仍存在单点问题,但 CDH 版本得到了解决);
- 各个节点上的资源被抽象成粗粒度的 slot,有多少 slot 就能同时运行多少 task。不同的是,MapReduce 将 slot 分为 map slot 和 reduce slot,它们分别只能供 Map Task 和 Reduce Task 使用,而不能共享,这是 MapReduce 资源利率低效的原因之一,而 Spark 则更优化一些,它不区分 slot 类型,只有一种 slot,可以供各种类型的 Task 使用,这种方式可以提高资源利用率,但是不够灵活,不能为不同类型的 Task 定制 slot 资源。总之,这两种方式各有优缺点。
Spark on Yarn 模式
Spark客户端直接连接Yarn。不需要额外构建Spark集群。
这是一种很有前景的部署模式。但限于 YARN 自身的发展,目前仅支持粗粒度模式(Coarse-grained Mode)。
这是由于 YARN 上的 Container 资源是不可以动态伸缩的,一旦 Container 启动之后,可使用的资源不能再发生变化,不过这个已经在 YARN 计划中了。
spark on yarn 的支持两种模式:
- yarn-cluster:适用于生产环境;
- yarn-client:适用于交互、调试,希望立即看到 app 的输出
yarn-cluster 和 yarn-client 的区别在于 yarn appMaster,每个 yarn app 实例有一个 appMaster 进程,是为 app 启动的第一个 container;负责从 ResourceManager 请求资源,获取到资源后,告诉 NodeManager 为其启动 container。
yarn-cluster 和 yarn-client 模式内部实现还是有很大的区别。
如果你需要用于生产环境,那么请选择 yarn-cluster;而如果你仅仅是 Debug 程序,可以选择 yarn-client。
Spark on Mesos 模式
Spark客户端直接连接Mesos。不需要额外构建Spark集群。
这是很多公司采用的模式,官方推荐这种模式(当然,原因之一是血缘关系)。
正是由于 Spark 开发之初就考虑到支持 Mesos,因此,目前而言,Spark 运行在 Mesos 上会比运行在 YARN 上更加灵活,更加自然。
目前在 Spark On Mesos 环境中,用户可选择两种调度模式之一运行自己的应用程序(可参考 Andrew Xia 的“Mesos Scheduling Mode on Spark”):
1.粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个 Dirver 和若干个 Executor 组成,其中,每个 Executor 占用若干资源,内部可运行多个 Task(对应多少个“slot”)。
应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。举个例子,比如你提交应用程序时,指定使用 5 个 executor 运行你的应用程序,每个 executor 占用 5GB 内存和 5 个 CPU,每个 executor 内部设置了 5 个 slot,则 Mesos 需要先为 executor 分配资源并启动它们,之后开始调度任务。
另外,在程序运行过程中,mesos 的 master 和 slave 并不知道 executor 内部各个 task 的运行情况,executor 直接将任务状态通过内部的通信机制汇报给 Driver,从一定程度上可以认为,每个应用程序利用 mesos 搭建了一个虚拟集群自己使用。
2.细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos 还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。
与粗粒度模式一样,应用程序启动时,先会启动 executor,但每个 executor 占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos 会为每个 executor 动态分配资源,每分配一些,便可以运行一个新任务,单个 Task 运行完之后可以马上释放对应的资源。
每个 Task 会汇报状态给 Mesos slave 和 Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于 MapReduce 调度模式,每个 Task 完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。
总结
这三种分布式部署方式各有利弊,通常需要根据实际情况决定采用哪种方案。
进行方案选择时,往往要考虑公司的技术路线(采用 Hadoop 生态系统还是其他生态系统)、相关技术人才储备等。上面涉及到 Spark 的许多部署模式,究竟哪种模式好这个很难说,需要根据你的需求,如果你只是测试 Spark Application,你可以选择 local 模式。而如果你数据量不是很多,Standalone 是个不错的选择。当你需要统一管理集群资源(Hadoop、Spark 等),那么你可以选择 Yarn 或者 mesos,但是这样维护成本就会变高。
- 从对比上看,mesos 似乎是 Spark 更好的选择,也是被官方推荐的
- 但如果你同时运行 hadoop 和 Spark,从兼容性上考虑,Yarn 是更好的选择。
- 如果你不仅运行了 hadoop,spark。还在资源管理上运行了 docker,Mesos 更加通用。
- Standalone 对于小规模计算集群更适合!
Standalone 模式
Standalone 另可分两种子模式:
- 单机
- 集群
当然,集群部署的前提是单机的部署完成,根据自己的需求调整即可
单机部署
Spark 虽然是大规模的计算框架,但也支持在单机上运行
修改配置文件
进入 Spark 配置目录
cd $SPARK_HOME/conf
日志配置
创建/复制
cp log4j.properties.template log4j.properties
我们修改 log4j.rootCategory 的 「INFO」修改为「WARN」,这一步是修改日志等级,可避免测试中输出太多信息
spark-env.sh
创建/复制
cp spark-env.sh.template spark-env.sh
添加HOME 变量:JAVA_HOME、SPARK_HOME、SCALA_HOME
考虑我们已经添加至环境变量文件里了,所以我们刷新配置的环境文件即可
spark-env.sh脚本会在启动 Spark 时加载,内容包含很多配置选项及说明,在以后会用到少部分,感兴趣可以仔细阅读这个文件的注释内容。
启动Spark 服务
这一节将启动 Spark 的 master 主节点和 slave 从节点
也会介绍 spark 单机模式和集群模式的部署区别
启动主节点
前往sbin 目录
cd $SPARK_HOME/sbin
启动
./start-master.sh
没有报错的话表示 master 已经启动成功
master 默认可以通过 web 访问http://localhost:8080
图中所示,master 中暂时还没有一个 worker ,我们启动 worker 时需要 master 的参数,该参数已经在上图中标志出来:spark://master:7077,请在执行后续命令时替换成你自己的参数。
启动从节点
启动 slave
./start-slave.sh spark://master:7077
没有报错表示启动成功,再次刷新浏览器页面可以看到下图所示新的 worker 已经添加
也可以用jps命令查看启动的服务,应该会列出Master和Worker。
测试实例
使用 spark-shell 连接 master ,注意把 MASTER 参数替换成你实验环境中的实际参数
MASTER=spark://master:7077 spark-shell
刷新 master 的 web 页面,可以看到新的Running Applications,如下图所示:
当退出 spark-shell 时,这个 application 会移动到Completed Applications一栏。
可以自己点击页面中的 Application 和 Workers 的链接查看并了解相关信息。
停止服务
停止服务的脚本为./sbin/stop-all.sh。
./stop-all.sh
但我建议依次关闭
./stop-master.sh ./stop-slave.sh
通过 jps 可以看到,master 与 worker 进程都已经停止
集群部署
修改配置
在 「单机模式」小节下的「修改配置文件」的基础上进行添加/修改
进入 Spark 配置目录
cd $SPARK_HOME/conf
spark-env.sh
参数解读:
- SPARK_MASTER_HOST = Master的主机名
- SPARK_MASTER_PORT = 提交Application的端口,默认7077,可更改
- SPARK_WORKER_CORES = 每一个Worker最多可以使用的cpu核个数
- SPARK_WORKER_MEMORY = 每个Worker最多可以使用的内存
其实你完全可以参考spark-env.sh内的注释
编辑
vim spark-env.sh
做出如下修改(位置非固定):
slaves
修改 slaves 配置文件,添加 Worker 的主机列表
复制/创建
cp slaves.template slaves
修改localhost 为你需要的机器的 HostName
你可以参考我的:
注意需要先把所有主机名输入到 /etc/hosts 避免无法解析
同步
此处不进行赘述,具体操作大家自行搜索,或者查看我关于 「HADOOP部署」的相关文章
大致操作如下:
- ssh-keygen 命令配合 ssh-copy-id 命令实现ssh免密
- scp 命令同步所有设置(指Spark 下的 conf 文件下,或者同步 Spark 文件)
启动集群
前往 master 机器下执行
前往sbin 目录
cd $SPARK_HOME/sbin
在这台机启动集群
./start-all.sh
启动的步骤和「单机部署」下的「启动 Spark 服务」一致,关闭也一致
start-all.sh 和 start-master.sh、start-slave.sh 和 Hadoop 里的 start-all.sh、start-yarn.sh、start-dfs.sh 的关系大致一样
Web:
Spark 交互式执行
Spark-Shell
Spark-Shell是 Spark 自带的一个 Scala 交互 Shell ,可以以脚本方式进行交互式执行,类似直接用 Python 及其他脚本语言的 Shell 。
进入Spark-Shell只需要执行spark-shell即可:
spark-shell
(前提是你配置好了 Spark 的环境变量)
进入到Spark-Shell后可以使用Ctrl D组合键退出 Shell。
在Spark-Shell中我们可以使用 Scala 的语法进行简单的测试,比如我们运行下面几个语句获得文件/etc/protocols的行数以及第一行的内容:
var f = sc.textFile("/etc/protocols") f.count() f.first()
上面的操作中创建了一个 RDD file,执行了两个简单的操作:
count()获取 RDD 的行数
first()获取第一行的内容
我们继续执行其他操作,比如查找有多少行含有tcp和udp字符串:
f.filter(line => line.contains("tcp")).count() f.filter(line => line.contains("udp")).count()
查看一共有多少个不同单词的方法,这里用到 Mapreduce 的思路:
var wordcount = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_) wordcount.count()
上面两步骤我们发现,/etc/protocols中各有一行含有tcp与udp字符串,并且一共有 442 个不同的单词。
上面每个语句的具体含义这里不展开,可以结合你阅读的文章进行理解,这里仅仅提供一个简单的例子让大家对 Spark 运算有基本认识。
操作完成后,Ctrl D组合键退出 Shell。
Pyspark
Pyspark 类似 Spark-Shell ,是一个 Python 的交互 Shell 。
执行pyspark启动进入 Pyspark:
pyspark
退出方法仍然是Ctrl D组合键。
在 Pyspark 中,我们可以用 Python 语法执行 Spark-Shell 中的操作,比如下面的语句获得文件/etc/protocols 的行数以及第一行的内容:
file = sc.textFile("/etc/protocols") file.count() file.first()
操作完成后,Ctrl D组合键退出 Shell。
对于 Pyspark 大家可以自行学习拓展,可以参考官方文档Spark Python API
提交应用程序
在Spark bin目录下的spark-submit可以用来在集群上启动应用程序。它可以通过统一的接口使用Spark支持的所有集群管理器 ,所有你不必为每一个管理器做相应的配置。
用spark-submit启动应用程序
bin/spark-submit脚本负责建立包含Spark以及其依赖的类路径(classpath),它支持不同的集群管理器以及Spark支持的加载模式。
./bin/spark-submit \ --class <main-class> --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments]
一些常用的选项是:
- --class:你的应用程序的入口点(如org.apache.spark.examples.SparkPi)
- --master:集群的master URL(如spark://23.195.26.187:7077)
- --deploy-mode:在worker节点部署你的driver(cluster)或者本地作为外部客户端(client)。默认是client。
- --conf:任意的Spark配置属性,格式是key=value。
application-jar:包含应用程序以及其依赖的jar包的路径。这个URL必须在集群中全局可见,例如,存在于所有节点的hdfs://路径或file://路径
application-arguments:传递给主类的主方法的参数
一个通用的部署策略是从网关集群提交你的应用程序,这个网关机器和你的worker集群物理上协作。在这种设置下,client模式是适合的。在client模式下,driver直接在spark-submit进程 中启动,而这个进程直接作为集群的客户端。应用程序的输入和输出都和控制台相连接。因此,这种模式特别适合涉及REPL的应用程序。
另一种选择,如果你的应用程序从一个和worker机器相距很远的机器上提交,通常情况下用cluster模式减少drivers和executors的网络迟延。注意,cluster模式目前不支持独立集群、 mesos集群以及python应用程序。
有几个我们使用的集群管理器特有的可用选项。例如,在Spark独立集群的cluster模式下,你也可以指定--supervise用来确保driver自动重启(如果它因为非零退出码失败)。 为了列举spark-submit所有的可用选项,用--help运行它。
# Run application locally on 8 cores ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local[8] \ /path/to/examples.jar \ 100 # Run on a Spark Standalone cluster in client deploy mode ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://master:7077 \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 # Run on a Spark Standalone cluster in cluster deploy mode with supervise ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://master:7077 \ --deploy-mode cluster --supervise --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 # Run on a YARN cluster export HADOOP_CONF_DIR=XXX ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ # can also be `yarn-client` for client mode --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ 1000 # Run a Python application on a Spark Standalone cluster ./bin/spark-submit \ --master spark://master:7077 \ examples/src/main/python/pi.py \ 1000
Master URLs
传递给Spark的url可以用下面的模式