stypora-copy-images-to: img
typora-root-url: ./
Spark Day02:Spark 基础环境(二)
Hadoop3.0-HDFS https://www.bilibili.com/video/BV1yX4y1K7Lq Hadoop3.0-MapReduce https://www.bilibili.com/video/BV1Tf4y167U8 Hadoop3.0-yarn https://www.bilibili.com/video/BV1wh411S76Z
01-[了解]-上次课程内容回顾
主要讲解2个方面的内容:Spark 框架概述和Spark 快速入门。
1、Spark 框架概述 - Spark 框架诞生背景 加州大学、伯克利分校、APMLab实验室、2009年 - Spark 框架功能(官方定义),类似MapReduce框架,分析处理数据 Apache Spark™ is a unified analytics engine for large-scale data processing. 分析引擎、统一的(任意类型分析基本都可以完成)、大规模数据集(海量数据) - Spark 发展史 2009年、2010年发布论文(RDD)、2014年(1.0)、2016年(2.0)、2020年(3.0) - Spark 官方四个特性 快Speed,与MapReduce相比较,2个方面比较 统一 支持多语言,Scala、Java、Python、R、SQL - 框架模块 Core、SQL、Streaming(StructuredStreaming)、MLlib及GraphX、PySpark和SparkR等 - 运行方式 本地模型运行(1JVM进程,运行Task,线程方式)、集群模式运行和容器(云端):K8s 2、Spark 快速入门 - 环境准备 导入虚拟机、基本配置 Spark 框架基本配置(设置):解压、设置JAVA和Scala环境变量 - spark-shell 本地模式运行交互式命令行 $SPARK_HOME/bin/spark-shell --master local[2] - 经典案例:词频统计WordCount map\flatMap reduceByKey 数据结构:RDD,认为就是一个集合,比如列表List,存储很多数据,调用高价函数处理数据 - 圆周率PI 使用提交命令:spark-submit --class xxx --master yyyy xxx.jar parameter
02-[了解]-今日课程内容提纲
讲解2个方面的内容:Standalone集群模式和使用IDEA开发应用程序。
1、Standalone 集群 Spark框架自身提供类似Hadoop YARN分布式集群资源管理集群Standalone功能,管理集群资源和分配资源运行Spark应用程序。 集群架构组成,类似Hadoop YARN集群架构 配置、部署、启动和测试 Spark应用运行在集群上架构组成 Spark 应用运行WEB UI监控 2、IDEA应用开发,编写入门案例词频统计 创建Maven Project SparkContext实例创建 WordCount代码编写 使用spark-submit提交应用执行
03-[掌握]-Standalone集群【架构组成】
Spark Stanadlone集群类似Hadoop YARN集群功能,管理整个集群中资源(CUP Core核数、内存Memory、磁盘Disk、网络带宽等)
Standalone集群使用了分布式计算中的master-slave模型,master是集群中含有Master进程的节点,slave是集群中的Worker节点含有Executor进程。
- Standalone集群主从架构:Master-Slave
- 主节点:老大,管理者,
Master
- 从节点:小弟,干活的,
Workers
Spark Standalone集群,类似Hadoop YARN,管理集群资源和调度资源:
- Master,管理整个集群资源,接收提交应用,分配资源给每个应用,运行Task任务
- Worker,管理每个机器的资源,分配对应的资源来运行Task;每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数
- HistoryServer,Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查
看应用运行相关信息。
04-[掌握]-Standalone 集群【配置和部署】
Standalone集群安装服务规划与资源配置:
需要将三台虚拟机,全部恢复到【04、分布式集群环境】快照。
按照讲义上步骤进行配置即可,具体步骤如下:
05-[掌握]-Standalone 集群【服务启动和运行应用】
在Master节点node1.itcast.cn上启动,进入$SPARK_HOME,必须配置主节点到所有从节点的
SSH无密钥
登录,集群各个机器时间同步
。
- 主节点Master启动命令
[root@node1 ~]# /export/server/spark/sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /export/server/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-node1.itcast.cn.out [root@node1 ~]# [root@node1 ~]# jps 15076 DataNode 15497 Master 15545 Jps 14973 NameNode
WEB UI页面地址:http://node1.itcast.cn:8080
- 从节点Workers启动命令
/export/server/spark/sbin/start-slaves.sh
- 历史服务器HistoryServer
/export/server/spark/sbin/start-history-server.sh
WEB UI页面地址:http://node1.itcast.cn:18080
将上述运行在Local Mode的圆周率PI程序,运行在Standalone集群上,修改【
--master
】地址为Standalone集群地址:spark://node1.itcast.cn:7077
,具体命令如下:
SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077 \ --class org.apache.spark.examples.SparkPi \ ${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
查看Master主节点WEB UI界面:
06-[掌握]-Spark 应用架构组成
登录到Spark HistoryServer历史服务器WEB UI界面,点击刚刚运行圆周率PI程序:
切换到【Executors】Tab页面:
从图中可以看到Spark Application运行到集群上时,由两部分组成:Driver Program和Executors
每个Executor相当于线程池,每个线程运行Task任务,需要1Core CPU。
- 第一、
Driver Program
- 相当于AppMaster,整个应用管理者,负责应用中所有Job的调度执行;
- 运行JVM Process,运行程序的MAIN函数,必须创建SparkContext上下文对象;
- 一个SparkApplication仅有一个;
- 第二、
Executors
- 相当于一个线程池,运行JVM Process,其中有很多线程,每个线程运行一个Task任务,
一个Task运行需要1 Core CPU,所有可以认为Executor中线程数就等于CPU Core核数;- 一个Spark Application可以有多个,可以设置个数和资源信息;
07-[掌握]-Spark 应用WEB UI 监控
Spark 提供了多个监控界面,当运行Spark任务后可以直接在网页对各种信息进行监控查看。
/export/server/spark/bin/spark-shell --master spark://node1.itcast.cn:7077
在spark-shell中执行词频统计WordCount程序代码,运行如下:
val inputRDD = sc.textFile("/datas/wordcount.data") val wordcountsRDD = inputRDD.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey((tmp, item) => tmp +item) wordcountsRDD.take(5)
截图如下:
可以发现在一个Spark Application中,包含多个Job,每个Job有多个Stage组成,每个Job执行按照DAG图进行的。
其中每个Stage中包含多个Task任务,每个Task以线程Thread方式执行,需要1Core CPU。
Spark Application程序运行时三个核心概念:
Job、Stage、Task
,说明如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rAyWpkFn-1627098684378)(/img/image-20210420160752870.png)]
Job和Stage及Task之间关系:
08-[理解]-Standalone 集群【Standalone HA】
Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着
Master单点故障(SPOF:single Point of Failover)
的问题。
ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。
基于Zookeeper实现HA:http://spark.apache.org/docs/2.4.5/spark-standalone.html#high-availability
09-[掌握]-IDEA 应用开发【构建Maven Project】
Spark课程代码,创建一个Maven Project工程,每天创建Maven Module模块,方便复习。
创建Maven Project工程【bigdata-spark_2.11】,设置GAV三要素的值如下:
创建Maven Module模块【spark-chapter01_2.11】,对应的GAV三要素值如下:
至此,将Maven Module模块创建完成,可以开始编写第一个Spark程序。
10-[掌握]-IDEA 应用开发【应用入口SparkContext】
Spark Application程序入口为:SparkContext,任何一个应用首先需要构建SparkContext对象,如下两步构建:
11-[掌握]-IDEA 应用开发【编程实现:WordCount】
从HDFS上读取数据,所以需要将HDFS Client配置文件放入到Maven Module资源目录下,同时设置应用运行时日志信息。
完整代码如下:
package cn.itcast.spark.start import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 使用Spark实现词频统计WordCount程序 */ object SparkWordCount { def main(args: Array[String]): Unit = { // TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息 val sc: SparkContext = { // 其一、构建SparkConf对象,设置应用名称和master val sparkConf: SparkConf = new SparkConf() .setAppName("SparkWordCount") .setMaster("local[2]") // 其二、创建SparkContext实例,传递sparkConf对象 new SparkContext(sparkConf) } // TODO: 第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data") // TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey /* mapreduce spark spark hive | flatMap() = map + flatten mapreduce spark spark hive |map mapreduce,1 spark,1 spark,1 hive,1 | reduceByKey spark, 2 mapreduce, 1 hive, 1 */ val resultRDD: RDD[(String, Int)] = inputRDD // 按照分隔符分割单词 .flatMap(line => line.split("\\s+")) // 转换单词为二元组,表示每个单词出现一次 .map(word => word -> 1) // 按照单词分组,对组内执进行聚合reduce操作,求和 .reduceByKey((tmp, item) => tmp + item) // TODO: 第三步、将最终处理结果RDD保存到HDFS或打印控制台 resultRDD.saveAsTextFile("/datas/spark-wordcount") resultRDD.foreach(tuple => println(tuple)) // 为了查看应用监控,可以让进程休眠 Thread.sleep(100000) // 应用结束,关闭资源 sc.stop() } }
12-[掌握]-IDEA 应用开发【编程实现:TopKey】
在上述词频统计WordCount代码基础上,
对统计出的每个单词的词频Count,按照降序排序,获取词频次数最多Top3单词
。
数据结构RDD中关于排序函数有如下三个:
- 1)、sortByKey:针对RDD中数据类型key/value对时,按照Key进行排序
- 2)、sortBy:针对RDD中数据指定排序规则
- 3)、top:按照RDD中数据采用降序方式排序,如果是Key/Value对,按照Key降序排序
具体演示代码如下,建议使用sortByKey
函数进行数据排序操作,慎用top函数。
package cn.itcast.spark.top import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 使用Spark实现词频统计WordCount程序,按照词频降序排序 */ object SparkTopKey { def main(args: Array[String]): Unit = { // TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息 val sc: SparkContext = { // 其一、构建SparkConf对象,设置应用名称和master val sparkConf: SparkConf = new SparkConf() .setAppName("SparkWordCount") .setMaster("local[2]") // 其二、创建SparkContext实例,传递sparkConf对象 new SparkContext(sparkConf) } // TODO: 第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data") // TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey /* mapreduce spark spark hive | flatMap() = map + flatten mapreduce spark spark hive |map mapreduce,1 spark,1 spark,1 hive,1 | reduceByKey spark, 2 mapreduce, 1 hive, 1 */ val resultRDD: RDD[(String, Int)] = inputRDD // 按照分隔符分割单词 .flatMap(line => line.split("\\s+")) // 转换单词为二元组,表示每个单词出现一次 .map(word => word -> 1) // 按照单词分组,对组内执进行聚合reduce操作,求和 .reduceByKey((tmp, item) => tmp + item) // TODO: 第三步、将最终处理结果RDD保存到HDFS或打印控制台 /* (spark,11) (hadoop,3) (hive,6) (hdfs,2) (mapreduce,4) (sql,2) */ resultRDD.foreach(tuple => println(tuple)) println("===========================") // =========================== sortByKey ========================= resultRDD // 将单词和词频互换 .map(tuple => tuple.swap) // (tuple => (tuple._2, tuple._1)) // 调用sortByKey安装,按照Key进行排序,设置降序排序 .sortByKey(ascending = false) // 打印结果 .take(3) .foreach(tuple => println(tuple)) println("===========================") // =========================== sortBy ========================= /* def sortBy[K]( f: (T) => K, // 指定排序规则 ascending: Boolean = true, numPartitions: Int = this.partitions.length ) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] */ resultRDD .sortBy(tuple => tuple._2, ascending = false) // 打印结果 .take(3) .foreach(tuple => println(tuple)) println("===========================") // =========================== top ========================= /* def top(num: Int)(implicit ord: Ordering[T]): Array[T] */ resultRDD .top(3)(Ordering.by(tuple => - tuple._2)) .foreach(tuple => println(tuple)) // 为了查看应用监控,可以让进程休眠 Thread.sleep(100000) // 应用结束,关闭资源 sc.stop() } }
13-[理解]-Spark 应用提交命令【spark-submit】
使用IDEA集成开发工具开发测试Spark Application程序以后,类似MapReduce程序一样,打成jar包,使用命令【
spark-submit
】提交应用的执行,提交命令帮助文档:
[root@node1 ~]# /export/server/spark/bin/spark-submit --help Usage: spark-submit [options] <app jar | python file | R file> [app arguments] Usage: spark-submit --kill [submission ID] --master [spark://...] Usage: spark-submit --status [submission ID] --master [spark://...] Usage: spark-submit run-example [options] example-class [example args] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, k8s://https://host:port, or local (Default: local[*]). --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of jars to include on the driver and executor classpaths. --packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by --repositories. The format for the coordinates should be groupId:artifactId:version. --exclude-packages Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in --packages to avoid dependency conflicts. --repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. File paths of these files in executors can be accessed via SparkFiles.get(fileName). --conf PROP=VALUE Arbitrary Spark configuration property. --properties-file FILE Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M). --driver-java-options Extra Java options to pass to the driver. --driver-library-path Extra library path entries to pass to the driver. --driver-class-path Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). --proxy-user NAME User to impersonate when submitting the application. This argument does not work with --principal / --keytab. --help, -h Show this help message and exit. --verbose, -v Print additional debug output. --version, Print the version of current Spark. Cluster deploy mode only: --driver-cores NUM Number of cores used by the driver, only in cluster mode (Default: 1). Spark standalone or Mesos with cluster deploy mode only: --supervise If given, restarts the driver on failure. --kill SUBMISSION_ID If given, kills the driver specified. --status SUBMISSION_ID If given, requests the status of the driver specified. Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. Spark standalone and YARN only: --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) YARN-only: --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). --num-executors NUM Number of executors to launch (Default: 2). If dynamic allocation is enabled, the initial number of executors will be at least NUM. --archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor. --principal PRINCIPAL Principal to be used to login to KDC, while running on secure HDFS. --keytab KEYTAB The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically.
提交一个应用命令:
- 第一种:基本参数配置
- 第二种:Driver Program 参数配置
- 第三种:Executor 参数配置
每个Spark Application运行时,需要启动Executor运行任务Task,需要指定Executor个数及每个Executor资源信息(内存Memory和CPU Core核数)。
官方案例,提交Spark应用运行设置
14-[掌握]-IDEA应用开发【应用打包运行】
将开发测试完成的WordCount程序打成jar保存,使用【spark-submit】分别提交运行在本地模式LocalMode和集群模式Standalone集群。
package cn.itcast.spark.submit import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 使用Spark实现词频统计WordCount程序 */ object SparkSubmit { def main(args: Array[String]): Unit = { //判断是否传递2个参数,如果不是,直接抛出异常 if(args.length < 2){ println("Usage: SparkSubmit <input> <output> ...................") System.exit(-1) } // TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息 val sc: SparkContext = { // 其一、构建SparkConf对象,设置应用名称和master val sparkConf: SparkConf = new SparkConf() .setAppName("SparkWordCount") //.setMaster("local[2]") // 其二、创建SparkContext实例,传递sparkConf对象 new SparkContext(sparkConf) } // TODO: 第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile(args(0)) // TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey /* mapreduce spark spark hive | flatMap() = map + flatten mapreduce spark spark hive |map mapreduce,1 spark,1 spark,1 hive,1 | reduceByKey spark, 2 mapreduce, 1 hive, 1 */ val resultRDD: RDD[(String, Int)] = inputRDD // 按照分隔符分割单词 .flatMap(line => line.split("\\s+")) // 转换单词为二元组,表示每个单词出现一次 .map(word => word -> 1) // 按照单词分组,对组内执进行聚合reduce操作,求和 .reduceByKey((tmp, item) => tmp + item) // TODO: 第三步、将最终处理结果RDD保存到HDFS或打印控制台 resultRDD.saveAsTextFile(s"${args(1)}-${System.currentTimeMillis()}") // 应用结束,关闭资源 sc.stop() } }
打成jar包,上传至HDFS文件系统:
/spark/apps
SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master local[2] \ --class cn.itcast.spark.submit.SparkSubmit \ hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar \ /datas/wordcount.data /datas/swc-output SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \ --class cn.itcast.spark.submit.SparkSubmit \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --total-executor-cores 2 \ hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar \ /datas/wordcount.data /datas/swc-output
附录一、创建Maven模块
1)、Maven 工程结构
MAVEN工程GAV三要素:
<parent> <artifactId>bigdata-spark_2.11</artifactId> <groupId>cn.itcast.spark</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>spark-chapter01_2.11</artifactId>
2)、POM 文件内容
Maven 工程POM文件中内容(依赖包):
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
IDEA中配置远程连接服务器
.0
1.8
1.8
UTF-8
net.alchim31.maven
scala-maven-plugin
3.2.0
compile
testCompile
> IDEA中配置远程连接服务器 [外链图片转存中...(img-Isvrrx8P-1627098684386)]