Apache Hudi初探(与spark的结合)

简介: Apache Hudi初探(与spark的结合)

背景


本文基于hudi 0.12.2


目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

闲说杂谈


我们先从hudi的写数据说起(毕竟没有写哪来的读),对应的流程:

createRelation
     ||
     \/
HoodieSparkSqlWriter.write

###具体的代码


首先是一系列table配置的前置校验:

    assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
    val path = optParams("path")
    val basePath = new Path(path)
    val sparkContext = sqlContext.sparkContext
    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
    tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
    var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)

assert判断spark中是否传入“path”参数


tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) 判断是否是第一次写入,如果存在.hoodie目录,则说明不是第一次写入


getHoodieTableConfig是从当前表中获取配置,也就是从.hoodile/hoodie.properties中读取配置,其中配置文件的内容见附录


validateTableConfig就是做一系列的校验

其中判断的参数为spark配置的参数和已有参数进行比对,进行如下参数一一比对


“hoodie.datasource.write.recordkey.field”和“hoodie.table.recordkey.fields”


“hoodie.datasource.write.precombine.field”和“hoodie.table.precombine.field”


“hoodie.datasource.write.keygenerator.class”和“hoodie.table.keygenerator.class”


再次是keygen的校验

    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
    val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
    val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
      originKeyGeneratorClassName, parameters)
    //validate datasource and tableconfig keygen are the same
    validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);

mergeParamsAndGetHoodieConfig

 translateSqlOptions
      ||
      \/
 HoodieWriterUtils.parametersWithWriteDefaults
      ||
      \/
 HoodieWriterUtils.convertMapToHoodieConfig

translateSqlOptions


这里传入spark的参数转换为huid的参数:


如果spark配置中有“__partition_columns”参数,则会获取


获取“hoodie.datasource.write.keygenerator.class”的值,并对应用到“__partition_columns” 的值上,并以逗号分隔


最终写入到"hoodie.datasource.write.partitionpath.field"配置中


HoodieWriterUtils.parametersWithWriteDefaults


首先会从classpath下查找hudi-defaults.conf,如果找到则加载,


再次从环境变量HUDI_CONF_DIR查找hudi-defaults.conf文件


保持"hoodie.payload.ordering.field"和"hoodie.datasource.write.precombine.field"一致


HoodieWriterUtils.convertMapToHoodieConfig


把map对象转换为HoodieConfig对象


HoodieWriterUtils.getOriginKeyGenerator


extractConfigsRelatedToTimestampBasedKeyGenerator


获取timestampKeyGeneratorConfigs


validateKeyGeneratorConfig


对spark中配置的keygen和table中配置的进行校验


“hoodie.datasource.write.keygenerator.class”/"hoodie.sql.origin.keygen.class"和“hoodie.table.keygenerator.class”进行比对


其他校验及操作


spark中的参数”hoodie.table.name“必须存在


"spark.serializer"必须是“KryoSerializer”


假如配置了"hoodie.datasource.write.insert.drop.duplicates"为true 且 “hoodie.datasource.write.operation”为“upsert”时,

改“hoodie.datasource.write.operation”为“insert”



附录


  • .hoodile/hoodie.properties 文件内容
hoodie.table.timeline.timezone=LOCAL
hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.table.precombine.field=dt
hoodie.table.version=5
hoodie.database.name=
hoodie.datasource.write.hive_style_partitioning=true
hoodie.table.checksum=493353519
hoodie.partition.metafile.use.base.format=false
hoodie.archivelog.folder=archived
hoodie.table.name=test_hudi_mor
hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
hoodie.populate.meta.fields=true
hoodie.table.type=MERGE_ON_READ
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.base.file.format=PARQUET
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=files
hoodie.timeline.layout.version=1
hoodie.table.recordkey.fields=id
hoodie.table.partition.fields=dt```
相关文章
|
2月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
41 1
|
4月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
66 0
|
4月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
217 0
|
5月前
|
分布式计算 Apache Spark
|
4月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
47 1
|
2月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
708 13
Apache Flink 2.0-preview released
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
80 3
|
3月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
4月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
254 2
|
4月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
58 3

推荐镜像

更多