Spark - 一文搞懂 parquet

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: parquet 文件常见于 Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲parquet 文件在 spark 场景下的存储,读取与使用中可能遇到的坑。......

一.引用

parquet 文件常见于 Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲 parquet 文件在 spark 场景下的存储,读取与使用中可能遇到的坑。

image.gif编辑

二.Parquet 加载方式

1.SparkSession.read.parquet

SparkSession 位于 org.apache.spark.sql.SparkSession 类下,除了支持读取 parquet 的列式文件外,SparkSession 也支持读取 ORC 列式存储文件,可以参考: Spark 读取 ORC FIle

val conf = new SparkConf()
      .setAppName("ParquetInfo")
      .setMaster("local")
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    spark.read.parquet(path).foreach(row => {
      val head = row.getString(0)
      println(head)
    })

image.gif

读取后会获取一个 Sql.DataFrame,支持常见的 sql 语法操作,如果不想使用 sql 才做也可以通过 .rdd 的方法得到 RDD[Row],随后遍历每个 partition 下的 Iterator[Row] 即可。

Tips:

后续可以执行 sql 操作,当然也支持初始化 SqlContext 调用 sql 方法,不过用 SparkSession 也可以搞定。

val parquetFileDF = spark.read.parquet("path")
    parquetFileDF.createOrReplaceTempView("tableName")
    val resultDf = spark.sql("SELECT * FROM tableName")
    val sqlContext = new SQLContext(sc)
    sqkContext.sql("xxx")

image.gif

2.SparkContext.HadoopFile

使用 hadoopFile 读取时需要指定对应的 K-V 以及 InputFormat 的格式,Parquet  文件对应的 K-V 为 Void-ArrayWritable,其 InputFormat 为: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat,获取 ArrayWritable 后通过索引可以获得 Writable。

val sc = spark.sparkContext
    sc.setLogLevel("error")
    val parquetInfo = sc.hadoopFile(path, classOf[MapredParquetInputFormat], classOf[Void], classOf[ArrayWritable])
    parquetInfo.take(5).foreach(info => {
      val writable = info._2.get()
      val head = writable(0)
      println(writable.length + "\t" + head)
    })

image.gif

Tips:

需要在 SparkConf 中加入序列化的配置,否则 hadoopFile 方法会报错:

image.gif编辑

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

image.gif

writable 需要通过反序列化的方式才能再获取具体内容,所以这里推荐使用 SparkSession 的官方 api 读取,不过可以 RcFile SparkSession 暂不支持直接读取,所以可以用 sc.hadoopRdd 的方法读取同样列式存储的 RcFile 格式文件,可以参考: Spark 读取 RcFile

三.Parquet 存储方式

1.静态转换

Parquet -> Parquet,读取 parquet 生成 Sql.DataFrame 再转存,类似 RDD 的 transform:

spark.read.parquet(path)
      .write.mode(SaveMode.Overwrite)
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .format("parquet")
      .save("/split")

image.gif

2.RDD[T*] 转换

常规数据 RDD 可以通过加入 import sqlContext.implicits._ 隐式转换的方式由 RDD 转换为 sql.Dataframe,随后完成 parquet 的存储,下面掩饰一个 PairRDD 转换为 df 并存储的方法:

import sqlContext.implicits._
    val commonStringRdd = sc.emptyRDD[(String, String)].toDF()
    commonStringRdd.write
      .mode(SaveMode.Overwrite)
      .format("parquet")
      .save("")

image.gif

Tips:

SaveModel 分为 Append 追加、Overwrite 覆盖、ErrorIfExists 报错、Ignore 忽略四种模式,前两个比较好理解,后面两个前者代表如果地址已存在则报错,后者如果地址已存在则忽略且不影响原始数据。SaveModel 通过枚举 Enum 的方式实现:

image.gif编辑

详细的 RDD 转换 Sql.DataFrame 可以参考:Spark - RDD / ROW / sql.DataFrame 互转

3.RDD[Row] 转换

如果有生成的 RDD[Row] 就可以直接调用 sqlContext 将该 RDD 转换为 DataFrame。这里 TABLE_SCHEMA 可以看作是每一列数据的描述,类似 Hive 的 column 的信息,主要是字段名和类型,也可以添加额外的信息,sqlContext 将对应的列属性与 Row 一一匹配,如果 Schema 长度没有达到 Row 的总列数,则后续字段都只能读为 Null。

val sqlContext = new SQLContext(sc)
    final val TABLE_SCHEME = StructType(Array(
      StructField("A", StringType),
      StructField("B", StringType),
      StructField("C", StringType),
      StructField("D", StringType),
      StructField("E", StringType),
      StructField("F", StringType),
      StructField("G", StringType),
      StructField("H", StringType)
    ))
    val commonRowRdd = sc.emptyRDD[Row]
    sqlContext.createDataFrame(commonRowRdd, TABLE_SCHEME)
      .write.mode(SaveMode.Overwrite)
      .format("parquet")
      .save("/split")

image.gif

Tips:

image.gif编辑

使用上述语法读取时可能会报错: Illegal pattern component: XXX ,这是因为内部 DataFormat 解析的问题,在代码中加入 .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") 即可。

spark.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").parquet(path)

image.gif

四.Parquet 浅析

Parquet 由于其开源,支持多平台多系统以及高效的存储和编码方案,使得其非常适合大数据场景下的任务开发,下面简单看下他的两个特性,列式存储和元数据存储:

1.列式存储 - 更小的 IO

CSV 是最常见的行式存储,对于一些需要单独特征或列的场景,如果是 CSV 文件需要遍历整行并分割,最终获取目标元素,而 Parquet 方式通过列式存储,对于单独的特征可以直接访问,从而提高了执行的效率,减少了数据 IO。

CSV: A,B,C,D,E -> Split(",")[col]
Parquet: A B C D E -> getString(col)

image.gif

2.元数据存储 - 更高的压缩比

Parquet 采用多种编码 encoding 方式,保证数据的高效存储和低空间

A.Run Length encoding

游程编码,当一行的多列数据有很多重复数据时,可以通过 "X重复了N次" 的记录方法,缩小记录的成本,虽然 N 可能很大,但存储成本很小:

[1,2,1,1,1,1,2] -> 1-1,2-1,1-4,2-1

image.gif

B.Dictionary encoding

字典编码,顾名思义就是通过映射,保存重复过多的数据,例如 "0" -> "LongString":

[LongString, LongString, LongString] -> [0, 0, 0]

image.gif

C.Delta encoding

增量编码,适用于 unix 时间戳,时间戳记录为 1970年1月1日以来的秒数,存储时间戳时可以直接减去初始时间戳,减少存储量,比如 1577808000 作为基准,则可以减少很多存储空间:

[1577808000, 1577808004, 1577808008] -> [0, 4, 8]

image.gif

3.存储-压缩对比

val st = System.currentTimeMillis()
    val pairInfo = (0 to 1000000).zipWithIndex.toArray
    val format = "csv" // csv、json、parquet
    sc.parallelize(pairInfo).toDF("A", "B")
      .write
      .mode(SaveMode.Overwrite)
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .save(s"./output/$format")
    val saveType = "gzip" // text、gzip
    sc.parallelize(pairInfo).saveAsTextFile(s"./output/$saveType", classOf[GzipCodec])
    val cost = System.currentTimeMillis() - st
    println(s"耗时: $cost")

image.gif

使用上述两种方法分别将 0 到 1000000 的数组存到对应文件,看一下存储的大小:

image.gif编辑

类型 Text Gzip Parquet CSV JSON
大小(MB) 15.8  4.6 8 13.8 23.8

相比于表格数据 CSV 和 JSON 存储,parquet 提供了更高的压缩比,Amazon S3 集群曾经对比过 CSV 与 parquet 的效率对比,使用 Parquet 可以缩减 87% 的大小,查询的速度快 34 倍 同时可以节省 99.7 的成本,所以在大数据量加经常需要个别列操作的场景下,Parquet 非常适合。

4.读取-效率对比

再分别读取上述文件:

val csv = spark.read.csv(path + "/output/csv").rdd.count()
    val parquet = spark.read.parquet(path + "/output/parquet").rdd.count()
    val json = spark.read.json(path + "/output/json").count()
    val common = sc.textFile(path + "/output/common").count()
    val gz = sc.textFile(path + "/output/gzip").count()

image.gif

类型 Text Gzip Parquet CSV JSON
耗时(ms) 1417 1448 4952 6870 6766

相比 CSV,JSON 是有优势的,但是相对于行数存储的 Text 和 Gzip,执行 count 类的行统计操作显然不是列式存储文件的强项,所以相差很多,如果是大数据下针对某个或几个字段统计,Parquet 会提供相比于行式存储文件更高的性能。

5.selectExpr

image.gif编辑

读取 Parquet 文件除了获取原始的字段内容外,也可以通过 selectExpr 操作获取更多额外的信息,方法位于 org.apache.spark.sql.functions 中,内部包含 collect_list 类似的聚合操作,也包含 count 类似的统计操作,还有 max、min、isnull 等等。

spark.read.parquet(path).selectExpr("count('_c1')").rdd.foreach(row => {
        println(row.getLong(0))
      })

image.gif

上述操作通过 selectExpr 获取了 count(_c1) 特征的数量,count Result:5383。

其中 _c1 为 Parquet 获取的 sql.DataFrame 的默认 schema,可以通过下述方法获取默认的 schema 信息:

val schema = spark.read.parquet(path).schema
      println(schema)

image.gif

image.gif编辑

这里截取了一部分,特征名从 _c0 开始依次累加,默认为 _c0,_c1 ,如果自己定义了 schema 的 StructField ,使用 spark.read.schema().parqeut() 读出来的 sql.Dataframe 的 selectExpr 函数内操作使用的列名就要换成自己定义的名称,例如 _c1 我定义为 age,则上述写法要改为 count('age'),再使用 _c1 会报错。更详细的 schema 操作可以参考:Parquet 指定 schema

五.总结

Spark - Parquet 大致常用的内容就这些,SparkSession 集成了读取 parquet、orc 的 API 非常的便捷,有需要建议直接通过 API 读取而不是 HadoopRdd / HadoopFile 。最后想说 parquet 的命名确实很好玩,parquet 翻译为地板,而不定长的列名存储,如果通过平面展示也颇有地板的感觉。

image.gif编辑

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
SQL 分布式计算 HIVE
spark sql编程之实现合并Parquet格式的DataFrame的schema
spark sql编程之实现合并Parquet格式的DataFrame的schema
350 0
spark sql编程之实现合并Parquet格式的DataFrame的schema
|
存储 SQL 分布式计算
【Parquet】Spark读取Parquet问题详解……
【Parquet】Spark读取Parquet问题详解……
873 0
【Parquet】Spark读取Parquet问题详解……
|
存储 分布式计算 大数据
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
550 0
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
|
存储 SQL 分布式计算
数据湖实操讲解【 JindoTable 计算加速】第十九讲:Spark 对 OSS 上的 Parquet 数据进行查询加速
数据湖 JindoFS+OSS 实操干货 36讲 每周二16点准时直播! 扫文章底部二维码入钉群,线上准时观看~ Github链接: https://github.com/aliyun/alibabacloud-jindofs
数据湖实操讲解【 JindoTable 计算加速】第十九讲:Spark 对 OSS 上的 Parquet 数据进行查询加速
|
SQL 存储 JSON
Apache Spark,Parquet和麻烦的Null
  关于类型安全性的经验教训,并承担过多   介绍   在将SQL分析ETL管道迁移到客户端的新Apache Spark批处理ETL基础结构时,我注意到了一些奇特的东西。 开发的基础结构具有可为空的DataFrame列架构的概念。 乍看起来似乎并不奇怪。 大多数(如果不是全部)SQL数据库都允许列为可空或不可空,对吗? 让我们研究一下在创建Spark DataFrame时,这种看似明智的概念为什么会带来问题。   from pyspark.sql import types   schema=types.StructType([
883 0
|
SQL 存储 缓存
Spark SQL的Parquet那些事儿
Parquet是一种列式存储格式,很多种处理引擎都支持这种存储格式,也是sparksql的默认存储格式。Spark SQL支持灵活的读和写Parquet文件,并且对parquet文件的schema可以自动解析。当Spark SQL需要写成Parquet文件时,处于兼容的原因所有的列都被自动转化为了nullable。 1读写Parquet文件 // Encoders for most common types are automatically provided by importing spark.implicits._ import spark.implicits._ val peop
836 0
|
存储 分布式计算 大数据
# Apache Spark系列技术直播# 第七讲 【 大数据列式存储之 Parquet/ORC 】
主讲人:诚历(孙大鹏)阿里巴巴计算平台事业部EMR技术专家 简介:Parquet 和 ORC 是大数据生态里最常用到的两个列式存储引擎,这两者在实现上有什异同,哪个效率更好,哪个性能更优,本次分享将和您一起探索两大列式存储。
1639 0
|
分布式计算 Spark Go