背景
目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:
class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider with DataSourceRegister with StreamSinkProvider with StreamSourceProvider with SparkAdapterSupport with Serializable {
闲说杂谈
继续上次的Apache Hudi初探(四)涉及的代码:
// HoodieDataSourceInternalBatchWrite 类中的方法:其所涉及的的方法调用链如下: createBatchWriterFactory => dataWriter.write => dataWriter.commit/abort => dataWriter.close || \/ onDataWriterCommit || \/ commit/abort
- 在解释commit做的事情之前,DataSourceInternalWriterHelper在构建器阶段还有做了一件事,那就是writeClient.preWrite:
this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build(); // writeClient是 SparkRDDWriteClient 实例 writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, metaClient);
metaClient构建一个HoodieTableMetaClient类型的 hoodie 元数据客户端
如果hoodie.metastore.enable开启(默认是不开启),则新建HoodieTableMetastoreClient类型的实例,否则新建HoodieTableMetastoreClient实例
writeClient.preWrite 这是在写入数据前做的准备工作
- 根据hoodie.write.concurrency.mode设置的模式来判断(默认是single_writer,还有个选项是optimistic_concurrency_control),如果是OCC则会获取上一次成功的事务,否则为空
- 根据hoodie.write.concurrency.mode设置的模式来判断(默认是single_writer,还有个选项是optimistic_concurrency_control),如果是OCC则会获取上一次成功的事务,否则为空
- 是否开启archive归档服务,会根据hoodie.archive.automatic(默认是true)或者hoodie.archive.async(默认是false)和hoodie.table.
- services.enabled(默认是true) 来启动服务 AsyncCleanerService.startAsyncArchiveIfEnabled
- 所以默认情况clean和Archive服务都不是异步后台服务
- 来看commit所做的事情,它最终会调用到dataSourceInternalWriterHelper.commit方法:
public void commit(List<HoodieWriteStat> writeStatList) { try { writeClient.commitStats(instantTime, writeStatList, Option.of(extraMetadata), CommitUtils.getCommitActionType(operationType, metaClient.getTableType())); } catch (Exception ioe) { throw new HoodieException(ioe.getMessage(), ioe); } finally { writeClient.close(); } }
这里的writeClient是SparkRDDWriteClient的实例,该实例的对一个的commit方法的如下:
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) { // Skip the empty commit if not allowed if (!config.allowEmptyCommit() && stats.isEmpty()) { return true; } LOG.info("Committing " + instantTime + " action " + commitActionType); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = createTable(config, hadoopConf); HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getWriteSchema(), commitActionType); HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime); HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config); this.txnManager.beginTransaction(Option.of(inflightInstant), lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty()); try { preCommit(inflightInstant, metadata); commit(table, commitActionType, instantTime, metadata, stats); // already within lock, and so no lock requried for archival postCommit(table, metadata, instantTime, extraMetadata, false); LOG.info("Committed " + instantTime); releaseResources(); } catch (IOException e) { throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e); } finally { this.txnManager.endTransaction(Option.of(inflightInstant)); } // We don't want to fail the commit if hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if false try { // do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period runTableServicesInline(table, metadata, extraMetadata); } catch (Exception e) { if (config.isFailOnInlineTableServiceExceptionEnabled()) { throw e; } LOG.warn("Inline compaction or clustering failed with exception: " + e.getMessage() + ". Moving further since \"hoodie.fail.writes.on.inline.table.service.exception\" is set to false."); } emitCommitMetrics(instantTime, metadata, commitActionType); // callback if needed. if (config.writeCommitCallbackOn()) { if (null == commitCallback) { commitCallback = HoodieCommitCallbackFactory.create(config); } commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath(), stats)); } return true; }
- 如果是不允许空提交(hoodie.allow.empty.commit默认是true,也就是允许空提交),也就是没有任何数据插入的情况下,就直接返回这对于比如offset的元数据也是需要记录下来
- createTable 新建一个HoodieTable,这里我们加入建立了HoodieSparkMergeOnReadTable类型的表
- CommitUtils.buildMetadata 构造元信息,
- 其中传入的参数operationType是bulk_insert,schemaToStoreInCommit是avro schema(之前有设置),commitActionType为deltacommit,partitionToReplaceFileIds为Map.empty,这里只是构建了HoodieCommitMetadata对象,把对应的元数据的信息记录了下来
- HoodieInstant 新建了一个HoodieInstant类型的实例,这里是表明是inflight阶段
- 判断heartbeat是否超时,如果是hoodie.cleaner.policy.failed.writes是LAZY,且超时,则报异常
- txnManager.beginTransaction 开启事务,主要是获取锁
- 如果是hoodie.write.concurrency.mode是optimistic_concurrency_control,则会开启事务,因为这种情况下会存在冲突的可能性
- lockManager.lock() 从hoodie.write.lock.provider配置中获取锁,默认是ZookeeperBasedLockProvider 实现是基于InterProcessMutex
- 会基于hoodie.metrics.lock.enable的配置是否开启lock时期的metrics
- reset(currentTxnOwnerInstant 把这次的TxnOwnerInstant设置为currentTxnOwnerInstant