Apache Hudi初探(与spark的结合)

简介: Apache Hudi初探(与spark的结合)

背景


本文基于hudi 0.12.2


目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

闲说杂谈


我们先从hudi的写数据说起(毕竟没有写哪来的读),对应的流程:

createRelation
     ||
     \/
HoodieSparkSqlWriter.write

###具体的代码


首先是一系列table配置的前置校验:

    assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
    val path = optParams("path")
    val basePath = new Path(path)
    val sparkContext = sqlContext.sparkContext
    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
    tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
    var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)

assert判断spark中是否传入“path”参数


tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) 判断是否是第一次写入,如果存在.hoodie目录,则说明不是第一次写入


getHoodieTableConfig是从当前表中获取配置,也就是从.hoodile/hoodie.properties中读取配置,其中配置文件的内容见附录


validateTableConfig就是做一系列的校验

其中判断的参数为spark配置的参数和已有参数进行比对,进行如下参数一一比对


“hoodie.datasource.write.recordkey.field”和“hoodie.table.recordkey.fields”


“hoodie.datasource.write.precombine.field”和“hoodie.table.precombine.field”


“hoodie.datasource.write.keygenerator.class”和“hoodie.table.keygenerator.class”


再次是keygen的校验

    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
    val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
    val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
      originKeyGeneratorClassName, parameters)
    //validate datasource and tableconfig keygen are the same
    validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);

mergeParamsAndGetHoodieConfig

 translateSqlOptions
      ||
      \/
 HoodieWriterUtils.parametersWithWriteDefaults
      ||
      \/
 HoodieWriterUtils.convertMapToHoodieConfig

translateSqlOptions


这里传入spark的参数转换为huid的参数:


如果spark配置中有“__partition_columns”参数,则会获取


获取“hoodie.datasource.write.keygenerator.class”的值,并对应用到“__partition_columns” 的值上,并以逗号分隔


最终写入到"hoodie.datasource.write.partitionpath.field"配置中


HoodieWriterUtils.parametersWithWriteDefaults


首先会从classpath下查找hudi-defaults.conf,如果找到则加载,


再次从环境变量HUDI_CONF_DIR查找hudi-defaults.conf文件


保持"hoodie.payload.ordering.field"和"hoodie.datasource.write.precombine.field"一致


HoodieWriterUtils.convertMapToHoodieConfig


把map对象转换为HoodieConfig对象


HoodieWriterUtils.getOriginKeyGenerator


extractConfigsRelatedToTimestampBasedKeyGenerator


获取timestampKeyGeneratorConfigs


validateKeyGeneratorConfig


对spark中配置的keygen和table中配置的进行校验


“hoodie.datasource.write.keygenerator.class”/"hoodie.sql.origin.keygen.class"和“hoodie.table.keygenerator.class”进行比对


其他校验及操作


spark中的参数”hoodie.table.name“必须存在


"spark.serializer"必须是“KryoSerializer”


假如配置了"hoodie.datasource.write.insert.drop.duplicates"为true 且 “hoodie.datasource.write.operation”为“upsert”时,

改“hoodie.datasource.write.operation”为“insert”



附录


  • .hoodile/hoodie.properties 文件内容
hoodie.table.timeline.timezone=LOCAL
hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.table.precombine.field=dt
hoodie.table.version=5
hoodie.database.name=
hoodie.datasource.write.hive_style_partitioning=true
hoodie.table.checksum=493353519
hoodie.partition.metafile.use.base.format=false
hoodie.archivelog.folder=archived
hoodie.table.name=test_hudi_mor
hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
hoodie.populate.meta.fields=true
hoodie.table.type=MERGE_ON_READ
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.base.file.format=PARQUET
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=files
hoodie.timeline.layout.version=1
hoodie.table.recordkey.fields=id
hoodie.table.partition.fields=dt```
相关文章
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
89 1
|
5月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
98 0
|
5月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
251 0
|
6月前
|
分布式计算 Apache Spark
|
5月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
55 1
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
344 33
The Past, Present and Future of Apache Flink
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
948 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
144 3
|
4月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
5月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
336 2

推荐镜像

更多