Apache Hudi初探(五)(与spark的结合)

简介: Apache Hudi初探(五)(与spark的结合)

背景


目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

闲说杂谈


继续上次的Apache Hudi初探(四)涉及的代码:

 // HoodieDataSourceInternalBatchWrite 类中的方法:其所涉及的的方法调用链如下:
 createBatchWriterFactory => dataWriter.write => dataWriter.commit/abort => dataWriter.close
     ||
     \/
 onDataWriterCommit
     ||
     \/
 commit/abort
  • 在解释commit做的事情之前,DataSourceInternalWriterHelper在构建器阶段还有做了一件事,那就是writeClient.preWrite
    this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
    // writeClient是 SparkRDDWriteClient 实例
    writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, metaClient);

metaClient构建一个HoodieTableMetaClient类型的 hoodie 元数据客户端


如果hoodie.metastore.enable开启(默认是不开启),则新建HoodieTableMetastoreClient类型的实例,否则新建HoodieTableMetastoreClient实例


writeClient.preWrite 这是在写入数据前做的准备工作


  • 根据hoodie.write.concurrency.mode设置的模式来判断(默认是single_writer,还有个选项是optimistic_concurrency_control),如果是OCC则会获取上一次成功的事务,否则为空

  • 根据hoodie.write.concurrency.mode设置的模式来判断(默认是single_writer,还有个选项是optimistic_concurrency_control),如果是OCC则会获取上一次成功的事务,否则为空

  • 是否开启archive归档服务,会根据hoodie.archive.automatic(默认是true)或者hoodie.archive.async(默认是false)和hoodie.table.

  • services.enabled(默认是true) 来启动服务 AsyncCleanerService.startAsyncArchiveIfEnabled

  • 所以默认情况clean和Archive服务都不是异步后台服务


  • 来看commit所做的事情,它最终会调用到dataSourceInternalWriterHelper.commit方法:
public void commit(List<HoodieWriteStat> writeStatList) {
    try {
      writeClient.commitStats(instantTime, writeStatList, Option.of(extraMetadata),
          CommitUtils.getCommitActionType(operationType, metaClient.getTableType()));
    } catch (Exception ioe) {
      throw new HoodieException(ioe.getMessage(), ioe);
    } finally {
      writeClient.close();
    }
  }

这里的writeClientSparkRDDWriteClient的实例,该实例的对一个的commit方法的如下:

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
                             String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
    // Skip the empty commit if not allowed
    if (!config.allowEmptyCommit() && stats.isEmpty()) {
      return true;
    }
    LOG.info("Committing " + instantTime + " action " + commitActionType);
    // Create a Hoodie table which encapsulated the commits and files visible
    HoodieTable table = createTable(config, hadoopConf);
    HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
        extraMetadata, operationType, config.getWriteSchema(), commitActionType);
    HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime);
    HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
    this.txnManager.beginTransaction(Option.of(inflightInstant),
        lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
    try {
      preCommit(inflightInstant, metadata);
      commit(table, commitActionType, instantTime, metadata, stats);
      // already within lock, and so no lock requried for archival
      postCommit(table, metadata, instantTime, extraMetadata, false);
      LOG.info("Committed " + instantTime);
      releaseResources();
    } catch (IOException e) {
      throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
    } finally {
      this.txnManager.endTransaction(Option.of(inflightInstant));
    }
    // We don't want to fail the commit if hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if false
    try {
      // do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
      runTableServicesInline(table, metadata, extraMetadata);
    } catch (Exception e) {
      if (config.isFailOnInlineTableServiceExceptionEnabled()) {
        throw e;
      }
      LOG.warn("Inline compaction or clustering failed with exception: " + e.getMessage()
          + ". Moving further since \"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.");
    }
    emitCommitMetrics(instantTime, metadata, commitActionType);
    // callback if needed.
    if (config.writeCommitCallbackOn()) {
      if (null == commitCallback) {
        commitCallback = HoodieCommitCallbackFactory.create(config);
      }
      commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath(), stats));
    }
    return true;
  }
  • 如果是不允许空提交(hoodie.allow.empty.commit默认是true,也就是允许空提交),也就是没有任何数据插入的情况下,就直接返回这对于比如offset的元数据也是需要记录下来

  • createTable 新建一个HoodieTable,这里我们加入建立了HoodieSparkMergeOnReadTable类型的表

  • CommitUtils.buildMetadata 构造元信息,

  • 其中传入的参数operationTypebulk_insertschemaToStoreInCommit是avro schema(之前有设置),commitActionTypedeltacommit,partitionToReplaceFileIdsMap.empty,这里只是构建了HoodieCommitMetadata对象,把对应的元数据的信息记录了下来

  • HoodieInstant 新建了一个HoodieInstant类型的实例,这里是表明是inflight阶段

  • 判断heartbeat是否超时,如果是hoodie.cleaner.policy.failed.writesLAZY,且超时,则报异常

  • txnManager.beginTransaction 开启事务,主要是获取锁

  • 如果是hoodie.write.concurrency.modeoptimistic_concurrency_control,则会开启事务,因为这种情况下会存在冲突的可能性

  • lockManager.lock()hoodie.write.lock.provider配置中获取锁,默认是ZookeeperBasedLockProvider 实现是基于InterProcessMutex

  • 会基于hoodie.metrics.lock.enable的配置是否开启lock时期的metrics

  • reset(currentTxnOwnerInstant 把这次的TxnOwnerInstant设置为currentTxnOwnerInstant
相关文章
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
89 1
|
6月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
180 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
5月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
98 0
|
5月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
251 0
|
6月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
7月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
205 6
|
6月前
|
分布式计算 Apache Spark
|
7月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
144 0
|
7月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
113 0
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
344 33
The Past, Present and Future of Apache Flink

热门文章

最新文章

推荐镜像

更多