Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)

简介:

本博文的主要内容是:

1、rdd基本操作实战

2、transformation和action流程图

3、典型的transformation和action

 

 

 

RDD有3种操作:

1、  Trandformation      对数据状态的转换,即所谓算子的转换

2、  Action    触发作业,即所谓得结果的

3、  Contoller  对性能、效率和容错方面的支持,如cache、persist、checkpoint

Contoller包括cache、persist、checkpoint。

 

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

传入类型是T,返回类型是U。

 

 

 

元素之间,为什么reduce操作,要符合结合律和交换律?
答:因为,交换律,不知,哪个数据先过来。所以,必须符合交换律。
在交换律基础上,想要reduce操作,必须要符合结合律。

/**

* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

RDD.scala(源码)


这里,新建包com.zhouls.spark.cores

package com.zhouls.spark.cores

/**
* Created by Administrator on 2016/9/27.
*/
object TextLines {

}


下面,开始编代码

本地模式

自动 ,会写好

源码来看,

所以, val lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\textlines.txt") //通过HadoopRDD以及MapPartitionsRDD获取文件中每一行的内容本身

 

 

val lineCount = lines.map(line => (line,1)) //每一行变成行的内容与1构成的Tuple


val textLines = lineCount.reduceByKey(_+_)


textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))

 成功!



 现在,将此行代码,

     textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
 

总结:

本地模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
运行正常,因为在本地模式下,是jvm,但这样书写,是不正规的。

 

 

 

集群模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
运行无法通过,因为结果是分布在各个节点上。
 
collect源码:
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

得出,collect后array中就是一个元素,只不过这个元素是一个Tuple。
Tuple是元组。通过concat合并!


foreach源码:

/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
  


 

rdd实战(rdd基本操作实战)至此!

 

 

 

 

 

 rdd实战(transformation流程图)

 拿wordcount为例!

 

启动hdfs集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

 

 

 启动spark集群

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

 

 

启动spark-shell

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g

 

 

scala> val partitionsReadmeRdd =  sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt")

 或者

 scala> val readmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md")

 scala>  val partitionsReadmeRdd = readmeRdd.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_,1)

.saveAsTextFile("~/partition1README.txt")

 

注意,~目录,不是这里。

 

 

 

 为什么,我的,不是这样的显示呢?

 

 

 

RDD的transformation和action执行的流程图

 

 

典型的transformation和action


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5913334.html,如需转载请自行联系原作者

相关文章
|
1月前
|
程序员 API 开发者
实战阿里qwen2.5-coder 32B,如何配置Cline的Ollama API接口。
阿里Qwen2.5大模型开源免费,适合编程应用。在Ollama平台下载时,推荐选择带有“cline”字样的Qwen2.5-Coder版本,仅需额外下载适配文件,无需重复下载模型文件。Ollama环境永久免费,配置简单,效果出色,适合开发者使用。
2391 77
|
27天前
|
供应链 搜索推荐 API
深度解析1688 API对电商的影响与实战应用
在全球电子商务迅猛发展的背景下,1688作为知名的B2B电商平台,为中小企业提供商品批发、分销、供应链管理等一站式服务,并通过开放的API接口,为开发者和电商企业提供数据资源和功能支持。本文将深入解析1688 API的功能(如商品搜索、详情、订单管理等)、应用场景(如商品展示、搜索优化、交易管理和用户行为分析)、收益分析(如流量增长、销售提升、库存优化和成本降低)及实际案例,帮助电商从业者提升运营效率和商业收益。
144 20
|
22天前
|
XML API 开发者
探究获取亚马逊畅销榜API接口及实战应用
亚马逊MWS(商城网络服务)提供了一系列API接口,帮助开发者获取平台数据,其中畅销榜API尤为关键。通过注册开发者账号、创建应用并申请权限,可使用HTTP POST请求获取商品的销售排名、价格等信息。Python代码示例展示了如何构建和发送请求,并处理返回的XML或JSON数据。注意遵守亚马逊的频率限制、数据准确性和合规性要求,以确保安全合法地利用这些数据支持电商业务决策。
44 1
|
23天前
|
JSON 监控 API
获取1688商品SKU信息API接口及实战应用
在电商蓬勃发展的今天,数据成为宝贵的财富。1688作为国内知名批发采购平台,提供商品SKU信息API接口,可获取库存、价格、规格等关键数据,助力电商运营、市场分析和价格监控。本文介绍如何注册1688开放平台账号、创建应用并获取AppKey/AppSecret,申请API权限,使用Python实现接口调用,处理响应数据,并注意请求频率限制和错误处理。通过该接口,可为电商运营和数据分析提供有力支持。
54 2
|
24天前
|
安全 程序员 API
API对于程序员的多元用法:从基础到实战
API(应用程序编程接口)是现代软件开发中不可或缺的工具,充当不同系统间沟通的桥梁。通过API,程序员可以轻松获取外部数据、扩展应用功能、实现微服务间的通信等,极大提升开发效率和应用功能性。常见的API类型包括Web API、本地API和第三方API。使用API,开发者能快速集成复杂功能(如支付、物流跟踪),并确保数据安全与管理。掌握API的开发、维护及安全管理技巧,对构建高效、稳定的应用至关重要。随着数字化进程加速,API的重要性将不断提升。
35 1
|
2月前
|
安全 API 数据安全/隐私保护
速卖通AliExpress商品详情API接口深度解析与实战应用
速卖通(AliExpress)作为全球化电商的重要平台,提供了丰富的商品资源和便捷的购物体验。为了提升用户体验和优化商品管理,速卖通开放了API接口,其中商品详情API尤为关键。本文介绍如何获取API密钥、调用商品详情API接口,并处理API响应数据,帮助开发者和商家高效利用这些工具。通过合理规划API调用策略和确保合法合规使用,开发者可以更好地获取商品信息,优化管理和营销策略。
|
3月前
|
JSON API 开发者
微店(Weidian)商品详情API接口解析实战
微店(Weidian)是一个基于社交关系的电商平台,为商家提供了一整套的电商解决方案。微店API接口允许开发者通过编程方式访问和操作微店平台上的数据,从而可以创建自动化的工具、应用或集成服务。
|
3天前
|
JSON API 数据格式
微店商品列表接口(微店 API 系列)
微店商品列表接口是微店API系列的一部分,帮助开发者获取店铺中的商品信息。首先需注册微店开发者账号并完成实名认证,选择合适的开发工具如PyCharm或VS Code,并确保熟悉HTTP协议和JSON格式。该接口支持GET/POST请求,主要参数包括店铺ID、页码、每页数量和商品状态等。响应数据为JSON格式,包含商品详细信息及状态码。Python示例代码展示了如何调用此接口。应用场景包括商品管理系统集成、数据分析、多平台数据同步及商品展示推广。
|
1天前
|
JSON 监控 API
唯品会商品详情接口(唯品会 API 系列)
唯品会商品详情接口助力电商发展,提供商品名称、价格、规格等详细信息,支持HTTP GET/POST请求,响应为JSON格式。开发者可通过API Key和商品ID获取数据,应用于电商数据分析、竞品调研、应用开发及价格监控,提升业务效率与竞争力。示例代码展示Python调用方法,方便快捷。
|
27天前
|
JSON 前端开发 搜索推荐
关于商品详情 API 接口 JSON 格式返回数据解析的示例
本文介绍商品详情API接口返回的JSON数据解析。最外层为`product`对象,包含商品基本信息(如id、name、price)、分类信息(category)、图片(images)、属性(attributes)、用户评价(reviews)、库存(stock)和卖家信息(seller)。每个字段详细描述了商品的不同方面,帮助开发者准确提取和展示数据。具体结构和字段含义需结合实际业务需求和API文档理解。

热门文章

最新文章