Apache Hudi初探(四)(与spark的结合)

简介: Apache Hudi初探(四)(与spark的结合)

背景


目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

后续在进一步的写入的时候也是基于DataSource V2的


闲说杂谈


继续上次的Apache Hudi初探(三)涉及的代码:

 // HoodieDataSourceInternalBatchWrite 类中的方法:其所涉及的的方法调用链如下:
 createBatchWriterFactory => dataWriter.write => dataWriter.commit/abort => dataWriter.close
     ||
     \/
 onDataWriterCommit
     ||
     \/
 commit/abort

HoodieDataSourceInternalBatchWrite 这些方法的调用最终都会委托到DataSourceInternalWriterHelper这个类来执行


  • createBatchWriterFactory
  public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
    dataSourceInternalWriterHelper.createInflightCommit();
    if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) {
      return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
          writeConfig, instantTime, structType, populateMetaFields, arePartitionRecordsSorted);
    } else {
      throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported ");
    }
  }

主要是创建*.hoodie/20230422183520311.deltacommit.inflight这种类型的文件(对于MOR表) ,表明该操作正在进行


返回
HoodieBulkInsertDataInternalWriterFactory*对象用于Task write


dataWriter.write这是真正的写数据的地方

hudi写数据的最终类是HoodieBulkInsertDataInternalWriter,它会委托BulkInsertDataInternalWriterHelper具体去执行

所以write最终会调用到BulkInsertDataInternalWriterHelper.write方法,该方法首先会从record中获取对应的分区信息

然后根据根据分区信息获取对应的writer,其中写的文件名称如下:

bcbfc2b3-6a8a-480a-b2de-ed7d0e736cde-0_1-788-738368_20230422183520311.parquet

其中: bcbfc2b3-6a8a-480a-b2de-ed7d0e736cde-0 是 fileId ,由uuid + 写文件的次数(从0开始)

1-788-738368是 writeToken ,由 taskPartitionId(spark)+ taskId(spark) + taskEpochId 组成

20230422183520311 是instantTime 格式为yyyyMMddHHmmssSSS

写成功以后会返回一个HoodieInternalWriteStatus数据结构,其中包含了分区路径,fileId,以及写入的统计信息如文件记录数等


dataWriter.commit

这个会调用writer的close方法获取到HoodieInternalWriteStatus数据结构,并返回一个HoodieWriterCommitMessage(List)数据结构,

最终组装成一个DataWritingSparkTaskResult的数据结构


dataWriter.close
清除writer便于GC,以及收集写入的文件信息


onDataWriterCommit
目前的实现是打印出log信息,Received commit of a data writer =,注意这是在driver执行的


commit/abort

commit最终调用的是DataSourceInternalWriterHelper.commit方法,

最终会调用SparkRDDWriteClient.commitStats方法做一些额外的信息提交

rollback最终也是调用的是SparkRDDWriteClient.rollback方法做一些提交


因为commit/abort涉及到的细节比较多,所以下一次详解(当前还涉及到SparkRDDWriteClient.preWrite)

相关文章
|
Kubernetes Apache 对象存储
海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用(中)
海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用
255 0
|
8月前
|
消息中间件 分布式计算 Apache
从 Apache Kudu 迁移到 Apache Hudi
从 Apache Kudu 迁移到 Apache Hudi
170 3
|
8月前
|
消息中间件 分布式计算 Kafka
【Spark Streaming】Spark Day11:Spark Streaming 学习笔记
【Spark Streaming】Spark Day11:Spark Streaming 学习笔记
56 0
|
8月前
|
分布式计算 监控 大数据
【Spark Streaming】Spark Day10:Spark Streaming 学习笔记
【Spark Streaming】Spark Day10:Spark Streaming 学习笔记
90 0
|
8月前
|
消息中间件 分布式计算 Kafka
Spark【Spark Streaming】
Spark【Spark Streaming】
|
关系型数据库 MySQL 大数据
海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用(下)
海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用
267 0
|
存储 消息中间件 大数据
海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用(上)
海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用
259 0
|
SQL 机器学习/深度学习 分布式计算
【大数据架构】Apache Flink和Apache Spark—比较指南
【大数据架构】Apache Flink和Apache Spark—比较指南
【大数据架构】Apache Flink和Apache Spark—比较指南
|
分布式计算 Apache Spark
Apache Hudi初探(与spark的结合)
Apache Hudi初探(与spark的结合)
168 0
|
分布式计算 Apache Spark
Apache Hudi初探(六)(与spark的结合)
Apache Hudi初探(六)(与spark的结合)
203 0

推荐镜像

更多