大数据Spark MLlib机器学习

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Spark MLlib机器学习

1 什么是Spark MLlib?

MLlib是Spark的机器学习(ML)库。旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。


MLlib目前分为两个代码包:


spark.mllib 包含基于RDD的原始算法API。

spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习管道。MLlib支持的算法库如下:

2 支持的数据类型

MLlib支持的数据类型是比较丰富的,从最基本的Spark数据集RDD到部署在集群中向量和矩阵,并且还支持部署在本地计算机中的本地化格式。

2.1 本地向量集

MLlib使用的本地化存储类型是向量,这里的向量主要由两类构成:稀疏型数据集(sparse)和密集型数据集(dense)。

导入MLlib的依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.12</artifactId>
    <version>2.4.3</version>
</dependency>

密集向量由double类型的数组支持,而稀疏向量则由两个平行数组支持。


example:


向量(5.2,0.0,5.5)


密集向量表示:[5.2,0.0,5.5]


稀疏向量表示:(3,[0,2],[5.2,5.5]) # 3是向量(5.2,0.0,5.5)的长度,除去0值外,其他两个值的索引和值分别构成了数组[0,2]和数组[5.2,5.5]。

2.1.1、密集型数据集

package cn.itcast.spark;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.junit.Test;
public class SparkMLlib {
    @Test
    public void testDense(){
        Vector vd = Vectors.dense(9, 5, 2, 7); //定义密集型向量
        double v = vd.apply(2); //获取下标为2的值
        System.out.println(v); //2
    }
}

2.1.2 稀疏型数据集

    @Test
    public void testSparse() {
        int[] indexes = new int[]{0, 1, 2, 5};
        double[] values = new double[]{9, 5, 2, 7};
        Vector vd = Vectors.sparse(6, indexes, values); //定义稀疏型向量
        double v = vd.apply(2); //获取下标为2的值
        double v2 = vd.apply(4); //获取下标为4的值
        double v3 = vd.apply(5); //获取下标为5的值
        System.out.println(v); //2
        System.out.println(v2); //0
        System.out.println(v3); //7
    }

2.2 向量标签

向量标签用于MLLIB中机器学习算法做标记。在分类问题中,可以将不同的数据集分成若干份,以整数型0、1、2进行标记。例如:垃圾邮件标记为1,非垃圾邮件标记为0。

    @Test
    public void testLabeledPoint(){
        Vector vd = Vectors.dense(9, 5, 2, 7); //定义密集型向量
        LabeledPoint lp = new LabeledPoint(1, vd); //定义标签向量
        System.out.println(lp.features());
        System.out.println(lp.label());
        System.out.println(lp);
        System.out.println("------------");
        int[] indexes = new int[]{0, 1, 2, 3};
        double[] values = new double[]{9, 5, 2, 7};
        Vector vd2 = Vectors.sparse(4, indexes, values); //定义稀疏型向量
        LabeledPoint lp2 = new LabeledPoint(2, vd2);
        System.out.println(lp2.features());
        System.out.println(lp2.label());
        System.out.println(lp2);
    }

2.3 本地矩阵

大数据运算中,为了更好地提升计算效率,可以更多地使用矩阵运算进行数据处理。部署在单机中的本地矩阵就是一个很好的存储方法。举一个简单的例子,例如一个数组Array(1,2,3,4,5,6),将其分为2行3列的:

    @Test
    public void testMatrix() {
        double[] values = new double[]{1, 2, 3, 4, 5, 6};
        Matrix mx = Matrices.dense(2, 3, values);
        System.out.println(mx);
    }

运行结果:

1.0  3.0  5.0  
2.0  4.0  6.0

2.4 分布式矩阵

分布式矩阵由长整型行列索引和双精度浮点型值数据组成,分布式存储在一个或多个RDD中,对于巨大的分布式矩阵来说,选择正确的存储格式非常重要,将一个分布式矩阵转化为另外一个不同格式需要混洗(shuffle),其代价很高。在MLlib实现了四类分布式矩阵存储格式,分别是:


行矩阵(RowMatrix)

行索引矩阵(IndexedRowMatrix)

坐标矩阵(CoordinateMatrix)

分块矩阵(BlockMatrix)

2.4.1 行矩阵

行矩阵是最基本的一种矩阵类型。行矩阵是以行作为基本方向的矩阵存储格式,列的作用相对较小。可以将其理解为行矩阵是一个巨大的特征向量的集合。每一行就是一个具有相同格式的向量数据,且每一行的向量内容都可以单独取出来进行操作。

a.txt:
1 2 3
4 5 6
@Test
    public void testRowMatrix() {
        SparkConf sparkConf = new SparkConf()
                .setAppName("SparkMLlib")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd1 = jsc.textFile("F:\\code\\files\\a.txt");
        JavaRDD<Vector> rdd2 = rdd1.map(v1 -> {
            String[] ss = v1.split(" ");
            double[] ds = new double[ss.length];
            for (int i = 0; i < ss.length; i++) {
                ds[i] = Double.valueOf(ss[i]);
            }
            return Vectors.dense(ds);
        });
        RowMatrix rmx = new RowMatrix(rdd2.rdd());
        System.out.println(rmx.numRows()); //2
        System.out.println(rmx.numCols()); //3
    }

2.4.2 行索引矩阵

An IndexedRowMatrix类似于a RowMatrix但具有有意义的行索引。它由索引行的RDD支持,因此每行由其索引(long-typed)和本地向量表示。

@Test
    public void testIndexedRowMatrix() {
        SparkConf sparkConf = new SparkConf()
                .setAppName("SparkMLlib")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd1 = jsc.textFile("F:\\code\\files\\a.txt");
        JavaRDD<Vector> rdd2 = rdd1.map(v1 -> {
            String[] ss = v1.split(" ");
            double[] ds = new double[ss.length];
            for (int i = 0; i < ss.length; i++) {
                ds[i] = Double.valueOf(ss[i]);
            }
            return Vectors.dense(ds);
        });
        JavaRDD<IndexedRow> rdd3 = rdd2.map(v1 -> new IndexedRow(v1.size(), v1)); //建立待索引的行
        IndexedRowMatrix irmx = new IndexedRowMatrix(rdd3.rdd()); //行索引
        JavaRDD<IndexedRow> rdd4 = irmx.rows().toJavaRDD();
        List<IndexedRow> collect = rdd4.collect();
        collect.forEach(indexedRow -> System.out.println(indexedRow)); //打印行内容
    }

2.4.3 坐标矩阵

CoordinateMatrix是由其条目的RDD支持的分布式矩阵。每个条目都是一个元组(i: Long, j: Long, value: Double),其中i是行索引,j是列索引, value是条目值。只有当矩阵的两个维度都很大并且矩阵非常稀疏时,才应该使用它。

@Test
    public void testCoordinateMatrix() {
        SparkConf sparkConf = new SparkConf()
                .setAppName("SparkMLlib")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd1 = jsc.textFile("F:\\code\\files\\a.txt");
        JavaRDD<MatrixEntry> rdd2 = rdd1.map(v1 -> {
            String[] ss = v1.split(" ");
            return new MatrixEntry(Long.valueOf(ss[0]),Long.valueOf(ss[1]),Double.valueOf(ss[2]));
        });
        CoordinateMatrix matrix = new CoordinateMatrix(rdd2.rdd());
        List<MatrixEntry> collect = matrix.entries().toJavaRDD().collect();
        for (MatrixEntry matrixEntry : collect) {
            System.out.println(matrixEntry);
        }
    }

2.4.4 分块矩阵

BlockMatrix是支持矩阵分块RDD的分布式矩阵,其中矩阵分块由((int,int),matrix)元祖所构成(int,int)是该部分矩阵所处的矩阵的索引位置,Matrix表示该索引位置上的子矩阵

分块矩阵支持矩阵加法和乘法,并设有辅助函数验证用于检查矩阵是否设置正确。

@Test
    public void testCoordinateMatrix() {
        SparkConf sparkConf = new SparkConf()
                .setAppName("SparkMLlib")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd1 = jsc.textFile("F:\\code\\files\\a.txt");
        JavaRDD<MatrixEntry> rdd2 = rdd1.map(v1 -> {
            String[] ss = v1.split(" ");
            return new MatrixEntry(Long.valueOf(ss[0]),Long.valueOf(ss[1]),Double.valueOf(ss[2]));
        });
        CoordinateMatrix matrix = new CoordinateMatrix(rdd2.rdd());
        //转化成块矩阵
        BlockMatrix blockMatrix = matrix.toBlockMatrix();
        // 对该分块矩阵进行检验,确认该分块是否正确,如果不正确则抛出异常
        blockMatrix.validate(); 
        BlockMatrix addBlockMatrix = blockMatrix.add(blockMatrix); // 相加
        List<Tuple2<Tuple2<Object, Object>, Matrix>> collect = addBlockMatrix.blocks().toJavaRDD().collect();
        for (Tuple2<Tuple2<Object, Object>, Matrix> tuple2MatrixTuple2 : collect) {
            System.out.println(tuple2MatrixTuple2);
        }
//        ((0,0),5 x 6 CSCMatrix
//                (1,2) 6.0
//        (4,5) 12.0)
    }

结果说明:

不论是索引值还是坐标值都是从0开始,所以行数是索引值加1

3 RDD、DataSet、Dataframe区别及转化

RDD(Spark1.0)—>DataFrame(Spark1.3)---->DataSet(Spark1.6)


SparkSql提供了Dataframe和DataSet的数据抽象,DataFrame就是RDD+Schema,可以认为是一张二维表格。它的劣势是在编译器不对表格中的字段进行类型检查。在运行期间检查。DataSet是Spark最新数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame。DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型。


个人理解:

JavaRDD<String[]> map = source.map(line -> line.split(","));

List<String[]> collect = source.map(line -> line.split(",")).collect();

rdd是一种特殊的分布式的列表List或数组Array,抽象的数据结构,RDD是一个抽象类Abstract,当collect转变为本质的存储类型.map进行以行为单位或者list进行遍历.每行的数据类型为String[].


示例:

package cn.itcast.spark;
import java.io.Serializable;
public class Student implements Serializable {
   private Long id;
   private String name;
   private Integer age;
    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Integer getAge() {
        return age;
    }
    public void setAge(Integer age) {
        this.age = age;
    }
    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}
users.txt:
1,zhangsan,20
2,lisi,21
3,wangwu,19
4,zhaoliu,18
 @Test
    public void testDataSet() {
        SparkSession sparkSession = SparkSession.builder().appName("SparkMLlib").master("local[*]").getOrCreate();
        // 读取文件
        JavaRDD<String> source = sparkSession.read().textFile("F:\\code\\users.txt").toJavaRDD();
        JavaRDD<Student> rowRDD = source.map(line -> {
            String parts[] = line.split(",");
            Student student = new Student();
            student.setId(Long.valueOf(parts[0]));
            student.setName(parts[1]);
            student.setAge(Integer.valueOf(parts[2]));
            return student;
        });
        Dataset<Row> df = sparkSession.createDataFrame(rowRDD, Student.class);
//        df.show();
        df.select("id", "name").orderBy(df.col("id").desc()).show();
    }

运行结果:

+---+--------+
| id|    name|
+---+--------+
|  4| zhaoliu|
|  3|  wangwu|
|  2|    lisi|
|  1|zhangsan|
+---+--------+


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
21天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
32 3
|
25天前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
43 3
|
30天前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
28天前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
|
21天前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
39 0
|
1月前
|
大数据 RDMA
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
31 0
|
1月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
37 0
|
17天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
63 11
|
22天前
|
存储 分布式计算 大数据
MaxCompute 数据分区与生命周期管理
【8月更文第31天】随着大数据分析需求的增长,如何高效地管理和组织数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个专为海量数据设计的计算服务,它提供了丰富的功能来帮助用户管理和优化数据。本文将重点讨论 MaxCompute 中的数据分区策略和生命周期管理方法,并通过具体的代码示例来展示如何实施这些策略。
54 1

热门文章

最新文章