大数据进阶之路——Spark SQL小结

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 文章目录手写 WordCountRDD、DAG、 Stage、 Task 、 JobSpark 作业提交流程Spark 的 Local 和 Standalone宽依赖、窄依赖Spark SQL比 Hive 快在哪打包的注意事项

手写 WordCount

使用flatMap、reduceByKey 来计算

//sc是SparkContext对象,该对象是提交spark程序的入口
sc.textFile("file:///home/hadoop/data/hello.txt") // 读取文件,
  .flatMap(line => line.split(" "))  // 将文件中的每一行单词按照分隔符(这里是空格)分隔
  .map(word => (word,1))  //给每个单词计数为1
  .reduceByKey((x,y) => (x+y))  // 统计相同单词的数量
  .collect

简写

sc.textFile("file:///home/hadoop/data/hello.txt")
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)
  .collect

RDD、DAG、 Stage、 Task 、 Job

RDD(Resilient Distributed Datasets),弹性分布式数据集

DAG(Directed Acyclic Graph),有向无环图


RDD RDD 是 Spark 的灵魂,也称为弹性分布式数据集。一个 RDD 代表一个可以被分区的只读数据集。RDD 内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。


DAG Spark 中使用 DAG 对 RDD 的关系进行建模,描述了 RDD 的依赖关系,这种关系也被称之为 lineage(血缘),RDD 的依赖关系使用 Dependency 维护。


Stage 在 DAG 中又进行 Stage 的划分,划分的依据是依赖是否是 shuffle 的,每个 Stage 又可以划分成若干 Task。接下来的事情就是 Driver 发送 Task 到 Executor,Executor 线程池去执行这些 task,完成之后将结果返回给 Driver。


Job Spark 的 Job 来源于用户执行 action 操作(这是 Spark 中实际意义的 Job),就是从 RDD 中获取结果的操作,而不是将一个 RDD 转换成另一个 RDD 的 transformation 操作。


Task 一个 Stage 内,最终的 RDD 有多少个 partition,就会产生多少个 task。


Spark 作业提交流程

image.png

spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGScheduler 和 TaskScheduler。

TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。

Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executor

Executor 启动后,会自己反向注册到 TaskScheduler 中。所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。

每执行到一个 Action,就会创建一个 Job。Job 会提交给 DAGScheduler。

DAGScheduler 会将 Job 划分为多个 Stage,然后每个 Stage 创建一个 TaskSet。

TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。

Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。(TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)

Spark 的 Local 和 Standalone

Spark一共有6种运行模式:Local,Standalone,Yarn-Cluster,Yarn-Client, Mesos, Kubernetes


Local: Local 模式即单机模式,如果在命令语句中不加任何配置,则默认是 Local 模式,在本地运行。这也是部署、设置最简单的一种模式,所有的 Spark 进程都运行在一台机器或一个虚拟机上面。

Standalone: Standalone 是 Spark 自身实现的资源调度框架。如果我们只使用 Spark 进行大数据计算,不使用其他的计算框架时,就采用 Standalone 模式就够了,尤其是单用户的情况下。Standalone 模式是 Spark 实现的资源调度框架,其主要的节点有 Client 节点、Master 节点和 Worker 节点。其中 Driver 既可以运行在 Master 节点上中,也可以运行在本地 Client 端。当用 spark-shell 交互式工具提交 Spark 的 Job 时,Driver 在 Master 节点上运行;当使用 spark-submit 工具提交 Job 或者在 Eclipse、IDEA 等开发平台上使用 new SparkConf.setManager(“spark://master:7077”) 方式运行 Spark 任务时,Driver 是运行在本地 Client 端上的。

Standalone 模式的部署比较繁琐,不过官方有提供部署脚本,需要把 Spark 的部署包安装到每一台节点机器上,并且部署的目录也必须相同,而且需要 Master 节点和其他节点实现 SSH 无密码登录。启动时,需要先启动 Spark 的 Master 和 Slave 节点。提交命令类似于:

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://Oscar-2.local:7077 \
  /tmp/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar \
  100

其中 master:7077是 Spark 的 Master 节点的主机名和端口号,当然集群是需要提前启动。


不管使用什么模式,Spark应用程序的代码是一模一样的,只需要在提交的时候通过–master参数来指定我们的运行模式即可


Client

Driver运行在Client端(提交Spark作业的机器)

Client会和请求到的Container进行通信来完成作业的调度和执行,Client是不能退出的

日志信息会在控制台输出:便于我们测试


Cluster

Driver运行在ApplicationMaster中

Client只要提交完作业之后就可以关掉,因为作业已经在YARN上运行了

日志是在终端看不到的,因为日志是在Driver上,只能通过yarn logs -applicationIdapplication_id

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \

此处的yarn就是我们的yarn client模式

如果是yarn cluster模式的话,yarn-cluster


Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.


如果想运行在YARN之上,那么就必须要设置HADOOP_CONF_DIR或者是YARN_CONF_DIR


1) export HADOOP_CONF_DIR=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop

2) $SPARK_HOME/conf/spark-env.sh

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--executor-memory 1G \
--num-executors 1 \
/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \
4

yarn logs -applicationId application_1495632775836_0002

宽依赖、窄依赖

image.png

窄依赖指的是每一个 Parent RDD 的 Partition 最多被子 RDD 的一个 Partition 使用(一子一亲)


宽依赖指的是多个子 RDD 的 Partition 会依赖同一个 parent RDD的 partition(多子一亲)


RDD 作为数据结构,本质上是一个只读的分区记录集合。一个 RDD 可以包含多个分区,每个分区就是一个数据集片段。


首先,窄依赖可以支持在同一个节点上,以 pipeline 形式执行多条命令(也叫同一个 Stage 的操作),例如在执行了 map 后,紧接着执行 filter。相反,宽依赖需要所有的父分区都是可用的,可能还需要调用类似 MapReduce 之类的操作进行跨节点传递。


其次,则是从失败恢复的角度考虑。窄依赖的失败恢复更有效,因为它只需要重新计算丢失的 parent partition 即可,而且可以并行地在不同节点进行重计算(一台机器太慢就会重新调度到多个节点进行)。


Spark SQL比 Hive 快在哪

当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。

由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。


Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。


消除了冗余的 HDFS 读写: Hadoop 每次 shuffle 操作后,必须写到磁盘,而 Spark 在 shuffle 后不一定落盘,可以 persist 到内存中,以便迭代时使用。如果操作复杂,很多的 shufle 操作,那么 Hadoop 的读写 IO 时间会大大增加,也是 Hive 更慢的主要原因了。


消除了冗余的 MapReduce 阶段: Hadoop 的 shuffle 操作一定连着完整的 MapReduce 操作,冗余繁琐。而 Spark 基于 RDD 提供了丰富的算子操作,且 reduce 操作产生 shuffle 数据,可以缓存在内存中 。


JVM 的优化: Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多少。


打包的注意事项

打包时要注意,pom.xml中需要添加如下plugin

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <archive>
            <manifest>
                <mainClass></mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
</plugin>

注意:–files在spark中的使用


spark.read.format("parquet").load("/hiszm/clean/day=20170511/part-00000-71d465d1-7338-4016-8d1a-729504a9f95e.snappy.parquet").show(false)

mvn assembly:assembly

./bin/spark-submit \
--class com.hiszm.log.SparkStatCleanJobYARN \
--name SparkStatCleanJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--files /home/hadoop/lib/ipDatabase.csv,/home/hadoop/lib/ipRegion.xlsx \
/home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop001:8020/hiszm/input/* hdfs://hadoop001:8020/hiszm/clean

存储格式的选择:http://www.infoq.com/cn/articles/bigdata-store-choose/

压缩格式的选择:https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-compression-analysis/


调整并行度

./bin/spark-submit \
--class com.hiszm.log.TopNStatJobYARN \
--name TopNStatJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=100 \
/home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop001:8020/hiszm/clean 20170511 
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
20天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
31 3
|
24天前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
42 3
|
29天前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
13天前
|
SQL 分布式计算 大数据
大数据开发SQL代码编码原则和规范
这段SQL编码原则强调代码的功能完整性、清晰度、执行效率及可读性,通过统一关键词大小写、缩进量以及禁止使用模糊操作如select *等手段提升代码质量。此外,SQL编码规范还详细规定了代码头部信息、字段与子句排列、运算符前后间隔、CASE语句编写、查询嵌套、表别名定义以及SQL注释的具体要求,确保代码的一致性和维护性。
17 0
|
20天前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
39 0
|
20天前
|
监控 Java 开发者
揭秘Struts 2性能监控:选对工具与方法,让你的应用跑得更快,赢在起跑线上!
【8月更文挑战第31天】在企业级应用开发中,性能监控对系统的稳定运行至关重要。针对流行的Java EE框架Struts 2,本文探讨了性能监控的工具与方法,包括商用的JProfiler、免费的VisualVM以及Struts 2自带的性能监控插件。通过示例代码展示了如何在实际项目中实施这些监控手段,帮助开发者发现和解决性能瓶颈,确保应用在高并发、高负载环境下稳定运行。选择合适的监控工具需综合考虑项目需求、成本、易用性和可扩展性等因素。
28 0
|
30天前
|
大数据 RDMA
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
31 0
|
1月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
37 0
|
SQL 分布式计算 Apache
Apache Spark 系列技术直播 - Spark SQL进阶与实战
Spark SQL进阶与实战 Spark相关组件介绍 Spark及其依赖组件 Hive Metastore介绍 Spark Thrift Server介绍 表与ETL Spark表基本概念 Spark建表最佳实践 Spark ETL最佳实践 动态分区表示例分析 Spark SQL查询最佳实践 Sp.
|
2月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
112 1
Spark快速大数据分析PDF下载读书分享推荐