Spark机器学习管道 - Pipeline

简介: Spark机器学习管道 - Pipeline

一、实验目的

掌握使用Spark机器学习管道创建小型机器学习工作流

二、实验内容

1、构建一个机器学习管道,应用LogisticRegression算法,预测一行文本中是否出现了”spark”这个单词。

三、实验原理

Spark ML有一个名为Pipeline的类,它被设计用来管理一系列的阶段,每一个阶段都由PipelineStage来表示。一个PipelineStage既可以是transformer,也可以是estimator。抽象Pipeline是一种estimator。管道以指定的顺序连接多个transformers和estimators,形成机器学习工作流。从概念上讲,它将机器学习工作流中的数据预处理、特征提取和模型训练步骤链接在一起。

 管道由一系列阶段组成,每个阶段都是一个Transformer或一个Estimator。它按照指定的顺序运行这些阶段。

 下图描述了一个使用管道创建一个小型工作流。

37d39efdc55146a79dd8728bb8dc604e.png


四、实验环境

硬件:x86_64 ubuntu 16.04服务器

 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1


五、实验步骤

5.1 启动Spark集群和Zeppelin服务器。

在终端窗口下,输入以下命令,分别启动Spark集群和Zeppelin服务器:

1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh
3.  $ zeppelin-daemon.sh start

然后使用jps命令查看启动的进程,确保Spark集群和Zeppelin服务器已经正确启动。

 2、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页,点击”Create new note”链接,创建一个新的笔记本。如下图所示:

231d3a59844346efa050975b7b5e6061.png


5.2 使用管道创建一个小型工作流。

这个示例中,管道由两个transformers和一个estimator组成。当调用Pipeline.fit()函数时,包含原始文本的输入DataFrame将被传递给Tokenizer transformer,其输出将被传递到HashingTF transformer,它将单词转换为特征。该Pipeline认识到LogisticRegression是一个estimator,因此它将调用fit函数和计算特征来产生一个LogisticRegressionModel。

 1、导入所需的包。在zeppelin中输入以下代码:

1.  import org.apache.spark.ml.{Pipeline, PipelineModel}
2.  import org.apache.spark.ml.classification.LogisticRegression
3.  import org.apache.spark.ml.feature.{HashingTF, Tokenizer}

同时按下”【Shift + Enter】”键,执行以上代码。

 2、构造一个DataFrame。在zeppelin中输入以下代码:

1.  val text_data = spark.createDataFrame(Seq(
2.                          (1, "Spark is a unified data analytics engine", 0.0),
3.                          (2, "Spark is cool and it is fun to work with Spark", 0.0),
4.                          (3, "There is a lot of exciting sessions at upcoming Spark summit", 0.0),
5.                          (4, "signup to win a million dollars", 0.0) ) 
6.                      ).toDF("id", "line", "label")

同时按下”【Shift + Enter】”键,执行以上代码。

 3、构造第一个阶段transformer。在zeppelin中输入以下代码:

1.  val tokenizer = new Tokenizer().setInputCol("line").setOutputCol("words")

同时按下”【Shift + Enter】”键,执行以上代码。

 4、构造第二个阶段transformer(第一个阶段的输出作为第二个阶段的输入)。在zeppelin中输入以下代码:

1.  val hashingTF = new HashingTF().setInputCol(tokenizer.getOutputCol)
2.                                 .setOutputCol("features")
3.                                 .setNumFeatures(4096)

同时按下”【Shift + Enter】”键,执行以上代码。

 5、构造第三个阶段estimator,代表逻辑回归算法实现。在zeppelin中输入以下代码:

1.  val logisticReg = new LogisticRegression().setMaxIter(5).setRegParam(0.01)

同时按下”【Shift + Enter】”键,执行以上代码。

 6、构造一个管道,由以上三个阶段组成。在zeppelin中输入以下代码:

1.  val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, logisticReg))

同时按下”【Shift + Enter】”键,执行以上代码。

 7、触发各阶段的顺序执行。在zeppelin中输入以下代码:

1.  val logisticRegModel = pipeline.fit(text_data)

同时按下”【Shift + Enter】”键,执行以上代码。

 8、使用学习到的模型对数据进行转换。在zeppelin中输入以下代码:

1.  logisticRegModel.transform(text_data).show

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:

52bdbbf8e4884debb5d90147d30b1062.png


结束语

Pipeline的fit方法调用每个Transformer的transform方法和每个Estimator的fit方法,与创建管道时指定的顺序相同。每个Transformer接受一个DataFrame作为输入,并返回一个新的DataFrame,它将成为管道中下一阶段的输入。如果一个阶段是一个Estimator,则调用它的fit方法来训练一个模型。返回的模型是一个Transformer,用于将前一阶段的输出转换为下一阶段的输入。管道本身也是一个Estimator,其fit方法返回一个PipelineModel,它是一个Transformer。


相关文章
|
6月前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
104 6
|
6月前
|
机器学习/深度学习 分布式计算 算法
【机器学习】Spark ML 对数据特征进行 One-Hot 编码
One-Hot 编码是机器学习中将离散特征转换为数值表示的方法,每个取值映射为一个二进制向量,常用于避免特征间大小关系影响模型。Spark ML 提供 OneHotEncoder 进行编码,输入输出列可通过 `inputCol` 和 `outputCol` 参数设置。在示例中,先用 StringIndexer 对类别特征编码,再用 OneHotEncoder 转换,最后展示编码结果。注意 One-Hot 编码可能导致高维问题,可结合实际情况选择编码方式。
75 6
|
5月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
39 0
|
5月前
|
机器学习/深度学习 数据处理 计算机视觉
机器学习- Sklearn (交叉验证和Pipeline)
机器学习- Sklearn (交叉验证和Pipeline)
|
6月前
|
机器学习/深度学习 分布式计算 算法
使用Spark进行机器学习
【5月更文挑战第2天】使用Spark进行机器学习
67 2
|
6月前
|
机器学习/深度学习 Java 开发工具
机器学习PAI常见问题之export DEBUG=ON 后编译不过如何解决
PAI(平台为智能,Platform for Artificial Intelligence)是阿里云提供的一个全面的人工智能开发平台,旨在为开发者提供机器学习、深度学习等人工智能技术的模型训练、优化和部署服务。以下是PAI平台使用中的一些常见问题及其答案汇总,帮助用户解决在使用过程中遇到的问题。
|
6月前
|
机器学习/深度学习 分布式计算 算法
Spark MLlib简介与机器学习流程
Spark MLlib简介与机器学习流程
|
6月前
|
机器学习/深度学习 存储 搜索推荐
利用机器学习算法改善电商推荐系统的效率
电商行业日益竞争激烈,提升用户体验成为关键。本文将探讨如何利用机器学习算法优化电商推荐系统,通过分析用户行为数据和商品信息,实现个性化推荐,从而提高推荐效率和准确性。
233 14
|
6月前
|
机器学习/深度学习 算法 数据可视化
实现机器学习算法时,特征选择是非常重要的一步,你有哪些推荐的方法?
实现机器学习算法时,特征选择是非常重要的一步,你有哪些推荐的方法?
113 1
|
6月前
|
机器学习/深度学习 算法 搜索推荐
Machine Learning机器学习之决策树算法 Decision Tree(附Python代码)
Machine Learning机器学习之决策树算法 Decision Tree(附Python代码)