背景
之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:
/dt=1/.hoodie_partition_metadata /dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet /dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet /dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet /dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet /dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet /dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet /dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet
因为是"bulk insert"操作,所以没有去重的需要,所以直接采用spark原生的方式,
以下我们讨论非spark原生的方式,
闲说杂谈
剩下的代码:
val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean val (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) = ... case _ => { // any other operation // register classes & schemas val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName) sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) // TODO(HUDI-4472) revisit and simplify schema handling val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) val latestTableSchema = getLatestTableSchema(sqlContext.sparkSession, tableMetaClient).getOrElse(sourceSchema) val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean var internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient) val writerSchema: Schema = if (reconcileSchema) { // In case we need to reconcile the schema and schema evolution is enabled, // we will force-apply schema evolution to the writer's schema if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) { internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema)) } if (internalSchemaOpt.isDefined) { ... // Convert to RDD[HoodieRecord] val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema, org.apache.hudi.common.util.Option.of(writerSchema)) val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || operation.equals(WriteOperationType.UPSERT) || parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean val hoodieAllIncomingRecords = genericRecords.map(gr => { val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns) val hoodieRecord = if (shouldCombine) { val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse( DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean) .asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME)) } else { DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME)) } hoodieRecord }).toJavaRDD() val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema // Create a HoodieWriteClient & issue the write. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key) )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { asyncCompactionTriggerFn.get.apply(client) } if (isAsyncClusteringEnabled(client, parameters)) { asyncClusteringTriggerFn.get.apply(client) } val hoodieRecords = if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) { DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters)) } else { hoodieAllIncomingRecords } client.startCommitWithTime(instantTime, commitActionType) val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation) (writeResult, client) }
如果开启了Schema Evolution,也就是hoodie.datasource.write.reconcile.schema是true,默认是false,就会进行schema的合并
convertStructTypeToAvroSchema 把df的schema转换成avro的schema
并且从*.hoodie/20230530073115535.deltacommit* 获取internalSchemaOpt,具体的合并就是把即将写入的schema和internalSchemaOpt进行合并
最后赋值给writerSchema,有可能还需要hoodie.schema.on.read.enable,默认是false
- HoodieSparkUtils.createRdd 创建RDD
把df转换为了RDD[GenericRecord]类型,赋值给genericRecords
val hoodieAllIncomingRecords = genericRecords.map(gr => {
首先如果是hoodie.datasource.write.drop.partition.columns为true(默认是false),则会从schema中删除hoodie.datasource.write.
partitionpath.field字段
如果hoodie.datasource.write.insert.drop.duplicates为true(默认是false)或者hoodie.datasource.write.operation是upsert(默认
是upsert),或者hoodie.combine.before.insert为true(默认是false),
则会创建HoodieAvroRecord<>(hKey, payload)类型的实例,其中HoodieKey以recordkey和partitionpath组成,playload为OverwriteWithLatestAvroPayload实例,
hoodieAllIncomingRecords就变成了RDD[HoodieAvroRecord]
- writerDataSchema= client 这些就是创建SparkRDDWriteClient 客户端
- isAsyncCompactionEnabled
默认asyncCompactionTriggerFnDefined是没有的,所以不会开启异步的Compaction,isAsyncClusteringEnabled同理也是
val hoodieRecords =
如果配置了hoodie.datasource.write.insert.drop.duplicates为true(默认是false),则会进行去重处理,具体是调用DataSourceUtils.dropDuplicates方法:
SparkRDDReadClient client = new SparkRDDReadClient<>(new HoodieSparkEngineContext(jssc), writeConfig); return client.tagLocation(incomingHoodieRecords) .filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
SparkRDDReadClient client 在创建Client的时候,会进行索引的创建this.index = SparkHoodieIndexFactory.createIndex(clientConfig);
如果有hoodie.index.class设置,则实例化对象,否则根据hoodie.index.type的值来建立索引(默认是HoodieSimpleIndex,适合做测试用)
client.tagLocation(incomingHoodieRecords)…
从要插入的记录中过滤出在index中不存在的记录,最终调用的是index.tagLocation方法
如果hoodie.datasource.write.insert.drop.duplicates为false,则保留所有的记录
client.startCommitWithTime 开始写操作,这涉及到回滚的操作
- 会先过滤出需要回滚的的的写失败的文件,如果hoodie.cleaner.policy.failed.writes是EAGER(默认是EAGER),就会在这次提交中回滚失败的文件
- 然后创建一个后缀为deltacommit.requested的文件,此时没有真正的写
- val writeResult = DataSourceUtils.doWriteOperation
真正的写操作