背景
本文基于hudi 0.12.2
目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:
class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider with DataSourceRegister with StreamSinkProvider with StreamSourceProvider with SparkAdapterSupport with Serializable {
闲说杂谈
我们先从hudi的写数据说起(毕竟没有写哪来的读),对应的流程:
createRelation || \/ HoodieSparkSqlWriter.write
###具体的代码
首先是一系列table配置的前置校验:
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") val path = optParams("path") val basePath = new Path(path) val sparkContext = sqlContext.sparkContext val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
assert判断spark中是否传入“path”参数
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) 判断是否是第一次写入,如果存在.hoodie目录,则说明不是第一次写入
getHoodieTableConfig是从当前表中获取配置,也就是从.hoodile/hoodie.properties中读取配置,其中配置文件的内容见附录
validateTableConfig就是做一系列的校验
其中判断的参数为spark配置的参数和已有参数进行比对,进行如下参数一一比对
“hoodie.datasource.write.recordkey.field”和“hoodie.table.recordkey.fields”
“hoodie.datasource.write.precombine.field”和“hoodie.table.precombine.field”
“hoodie.datasource.write.keygenerator.class”和“hoodie.table.keygenerator.class”
再次是keygen的校验
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode) val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters) val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator( originKeyGeneratorClassName, parameters) //validate datasource and tableconfig keygen are the same validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
mergeParamsAndGetHoodieConfig
translateSqlOptions || \/ HoodieWriterUtils.parametersWithWriteDefaults || \/ HoodieWriterUtils.convertMapToHoodieConfig
translateSqlOptions
这里传入spark的参数转换为huid的参数:
如果spark配置中有“__partition_columns”参数,则会获取
获取“hoodie.datasource.write.keygenerator.class”的值,并对应用到“__partition_columns” 的值上,并以逗号分隔
最终写入到"hoodie.datasource.write.partitionpath.field"配置中
HoodieWriterUtils.parametersWithWriteDefaults
首先会从classpath下查找hudi-defaults.conf,如果找到则加载,
再次从环境变量HUDI_CONF_DIR查找hudi-defaults.conf文件
保持"hoodie.payload.ordering.field"和"hoodie.datasource.write.precombine.field"一致
HoodieWriterUtils.convertMapToHoodieConfig
把map对象转换为HoodieConfig对象
HoodieWriterUtils.getOriginKeyGenerator
extractConfigsRelatedToTimestampBasedKeyGenerator
获取timestampKeyGeneratorConfigs
validateKeyGeneratorConfig
对spark中配置的keygen和table中配置的进行校验
“hoodie.datasource.write.keygenerator.class”/"hoodie.sql.origin.keygen.class"和“hoodie.table.keygenerator.class”进行比对
其他校验及操作
spark中的参数”hoodie.table.name“必须存在
"spark.serializer"必须是“KryoSerializer”
假如配置了"hoodie.datasource.write.insert.drop.duplicates"为true 且 “hoodie.datasource.write.operation”为“upsert”时,
改“hoodie.datasource.write.operation”为“insert”
附录
- .hoodile/hoodie.properties 文件内容
hoodie.table.timeline.timezone=LOCAL hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator hoodie.table.precombine.field=dt hoodie.table.version=5 hoodie.database.name= hoodie.datasource.write.hive_style_partitioning=true hoodie.table.checksum=493353519 hoodie.partition.metafile.use.base.format=false hoodie.archivelog.folder=archived hoodie.table.name=test_hudi_mor hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload hoodie.populate.meta.fields=true hoodie.table.type=MERGE_ON_READ hoodie.datasource.write.partitionpath.urlencode=false hoodie.table.base.file.format=PARQUET hoodie.datasource.write.drop.partition.columns=false hoodie.table.metadata.partitions=files hoodie.timeline.layout.version=1 hoodie.table.recordkey.fields=id hoodie.table.partition.fields=dt```