背景
目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:
class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider with DataSourceRegister with StreamSinkProvider with StreamSourceProvider with SparkAdapterSupport with Serializable {
后续在进一步的写入的时候也是基于DataSource V2的
闲说杂谈
继续上次的Apache Hudi初探(三)涉及的代码:
// HoodieDataSourceInternalBatchWrite 类中的方法:其所涉及的的方法调用链如下: createBatchWriterFactory => dataWriter.write => dataWriter.commit/abort => dataWriter.close || \/ onDataWriterCommit || \/ commit/abort
HoodieDataSourceInternalBatchWrite 这些方法的调用最终都会委托到DataSourceInternalWriterHelper这个类来执行
- createBatchWriterFactory
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { dataSourceInternalWriterHelper.createInflightCommit(); if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) { return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(), writeConfig, instantTime, structType, populateMetaFields, arePartitionRecordsSorted); } else { throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported "); } }
主要是创建*.hoodie/20230422183520311.deltacommit.inflight这种类型的文件(对于MOR表) ,表明该操作正在进行
返回HoodieBulkInsertDataInternalWriterFactory*对象用于Task write
dataWriter.write这是真正的写数据的地方
hudi写数据的最终类是HoodieBulkInsertDataInternalWriter,它会委托BulkInsertDataInternalWriterHelper具体去执行
所以write最终会调用到BulkInsertDataInternalWriterHelper.write方法,该方法首先会从record中获取对应的分区信息
然后根据根据分区信息获取对应的writer,其中写的文件名称如下:
bcbfc2b3-6a8a-480a-b2de-ed7d0e736cde-0_1-788-738368_20230422183520311.parquet
其中: bcbfc2b3-6a8a-480a-b2de-ed7d0e736cde-0 是 fileId ,由uuid + 写文件的次数(从0开始)
1-788-738368是 writeToken ,由 taskPartitionId(spark)+ taskId(spark) + taskEpochId 组成
20230422183520311 是instantTime 格式为yyyyMMddHHmmssSSS
写成功以后会返回一个HoodieInternalWriteStatus数据结构,其中包含了分区路径,fileId,以及写入的统计信息如文件记录数等
dataWriter.commit
这个会调用writer的close方法获取到HoodieInternalWriteStatus数据结构,并返回一个HoodieWriterCommitMessage(List)数据结构,
最终组装成一个DataWritingSparkTaskResult的数据结构
dataWriter.close
清除writer便于GC,以及收集写入的文件信息
onDataWriterCommit
目前的实现是打印出log信息,Received commit of a data writer =,注意这是在driver执行的
commit/abort
commit最终调用的是DataSourceInternalWriterHelper.commit方法,
最终会调用SparkRDDWriteClient.commitStats方法做一些额外的信息提交
rollback最终也是调用的是SparkRDDWriteClient.rollback方法做一些提交
因为commit/abort涉及到的细节比较多,所以下一次详解(当前还涉及到SparkRDDWriteClient.preWrite)