基于 Apache Hudi + dbt 构建开放的Lakehouse

简介: 基于 Apache Hudi + dbt 构建开放的Lakehouse

本博客的重点展示如何利用增量数据处理和执行字段级更新来构建一个开放式 Lakehouse。我们很高兴地宣布,用户现在可以使用 Apache Hudi + dbt 来构建开放Lakehouse。

在深入了解细节之前,让我们先澄清一下本博客中使用的一些术语。

什么是 Apache Hudi?

Apache Hudi 为Lakehouse带来了 ACID 事务、记录级更新/删除和变更流。Apache Hudi 是一个开源数据管理框架,用于简化增量数据处理和数据管道开发。该框架更有效地管理数据生命周期等业务需求并提高数据质量。

什么是dbt?

dbt(数据构建工具)是一种数据转换工具,使数据分析师和工程师能够在云数据仓库中转换、测试和记录数据。dbt 使分析工程师能够通过简单地编写select语句来转换其仓库中的数据。dbt 处理将这些select语句转换为表和视图。dbt 在 ELT(提取、加载、转换)过程中执行 T——它不提取或加载数据,但它非常擅长转换已经加载到仓库中的数据。

什么是Lakehouse?

Lakehouse 是一种新的开放式架构,它结合了数据湖和数据仓库的最佳元素。Lakehouses 是通过一种新的系统设计实现的:在开放格式的低成本云存储之上直接实施类似于数据仓库中的事务管理和数据管理功能。如果必须在现代世界中重新设计数据仓库,Lakehouse便是首选,因为现在可以使用廉价且高度可靠的存储(以对象存储的形式)。换句话说,虽然数据湖历来被视为添加到云存储文件夹中的一堆文件,但 Lakehouse 表支持事务、更新、删除,在 Apache Hudi 的情况下,甚至支持索引或更改捕获等类似数据库的功能。

如何建造一个开放的Lakehouse?

现在我们知道什么是Lakehouse了,所以让我们建造一个开放的Lakehouse,你需要几个组件:

• 支持 ACID 事务的开放表格式

• Apache Hudi(与 dbt 集成)

• Delta Lake(锁定到 Databricks 运行时的专有功能)

• Apache Iceberg(目前未与 dbt 集成)

• 数据转换工具

• 开源 dbt 是转换层事实上的流行选择

• 分布式数据处理引擎

• Apache Spark 是计算引擎事实上的流行选择

• 云储存

• 可以选择任何具有成本效益的云存储或 HDFS

• 选择最心仪的查询引擎

构建 Lakehouse需要一种方法来提取数据并将其加载为 Hudi 表格式,然后使用 dbt 就地转换。DBT 通过 dbt-spark 适配器[1]包支持开箱即用的 Hudi。使用 dbt 创建建模数据集时,您可以选择 Hudi 作为表的格式。可以按照此页面[2]上的说明学习如何安装和配置 dbt+hudi。

第 1 步:如何提取和加载原始数据集?

这是构建Lakehouse的第一步,这里有很多选择可以将数据加载到我们的开放Lakehouse中。可以使用 Hudi 的 Delta Streamer工具,因为所有摄取功能都是预先构建的,并在大规模生产中经过实战测试。Hudi 的 DeltaStreamer 在 ELT(提取、加载、转换)过程中执行 EL——它非常擅长提取、加载和可选地转换已经加载到 Lakehouse 中的数据。

第二步:如何用dbt项目配置Hudi?

要将 Hudi 与 dbt 项目一起使用,需要选择文件格式为 Hudi。文件格式配置可以在特定模型中指定,也可以为 dbt_project.yml 文件中的所有模型指定:

models:
   +file_format: hudi

或者

{{ config(
  materialized = 'incremental',
  incremental_strategy = 'merge',
  file_format = 'hudi',
  unique_key = 'id',
) }}

选择 Hudi 作为 file_format 后,可以使用 dbt 创建物化数据集,这提供了 Hudi 表格式独有的额外好处,例如字段级更新/删除。

第三步:如何增量读取原始数据?

在我们学习如何构建增量物化视图之前,让我们快速了解一下,什么是 dbt 中的物化?物化是在 Lakehouse 中持久化 dbt 模型的策略。dbt 中内置了四种类型的物化:

• table

• view

• incremental

• ephemeral

在所有物化类型中,只有增量模型允许 dbt 自上次运行 dbt 以来将记录插入或更新到表中,这释放了 Hudi 的能力,我们将深入了解细节。使用增量模型需要执行以下两个步骤:

• 告诉 dbt 如何过滤增量执行的行

• 定义模型的唯一性约束(使用>= Hudi 0.10.1版本时需要)

如何在增量运行中应用过滤器?

dbt 提供了一个宏 is_incremental(),它对于专门为增量实现定义过滤器非常有用。通常需要过滤“新”行,例如自上次 dbt 运行此模型以来已创建的行。查找此模型最近运行的时间戳的最佳方法是检查目标表中的最新时间戳。dbt 通过使用“{{ this }}”变量可以轻松查询目标表。

{{
   config(
       materialized='incremental',
       file_format='hudi',
   )
}}
select
   *
from raw_app_data.events
{% if is_incremental() %}
   -- this filter will only be applied on an incremental run
   where event_time > (select max(event_time) from {{ this }})
{% endif %}

如何定义唯一性约束?

unique_key 是数据集的主键,它确定记录是否具有新值,是否应该更新/删除或插入。可以在模型顶部的配置块中定义 unique_key。这个 unique_key 将作为 Hudi 表上的主键(hoodie.datasource.write.recordkey.field)。

第 4 步:如何在编写数据集时使用 upsert 功能?

dbt 在加载转换后的数据集时提供了多种加载策略,例如:

• append(默认)

• insert_overwrite(可选)

• merge(可选,仅适用于 Hudi 和 Delta 格式)

默认情况下dbt 使用 append 策略,当在同一有效负载上多次执行 dbt run 命令时,可能会导致重复行。当你选择insert_overwrite策略时,dbt每次运行dbt都会覆盖整个分区或者全表加载,这样会造成不必要的开销,而且非常昂贵。除了所有现有的加载数据的策略外,使用增量物化时还可以使用Hudi独占合并策略。使用合并策略可以对Lakehouse执行字段级更新/删除,这既高效又经济,因此可以获得更新鲜的数据和更快的洞察力。

如何执行字段级更新?

如果使用合并策略并指定了 unique_key,默认情况下dbt 将使用新值完全覆盖匹配的行。由于 Apache Spark 适配器支持合并策略,因此可以选择将列名列表传递给 merge_update_columns 配置。在这种情况下dbt 将仅更新配置指定的列,并保留其他列的先前值。

{{ config(
   materialized = 'incremental',
   incremental_strategy = 'merge',
   file_format = 'hudi',
   unique_key = 'id',
   merge_update_columns = ['msg', 'updated_ts'],
) }}

如何配置额外的Hudi自定义配置?

如果想指定额外的 Hudi 配置时,可以使用选项配置来做到这一点:

{{ config(
   materialized='incremental',
   file_format='hudi',
   incremental_strategy='merge',
   options={
       'type': 'mor',
       'primaryKey': 'id',
       'precombineKey': 'ts',
   },
   unique_key='id',
   partition_by='datestr',
   pre_hook=["set spark.sql.datetime.java8API.enabled=false;"],
  )
}}

总结

希望本篇博文可以助力基于Apache Hudi 与 dbt构建开放的 Lakehouse

目录
相关文章
|
3月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
115 5
|
3月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
133 4
|
4月前
|
消息中间件 分布式计算 大数据
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
112 5
|
3月前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
143 61
|
3月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
76 1
|
4月前
|
Java 大数据 数据库连接
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
69 2
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
|
3月前
|
分布式计算 大数据 Apache
Apache Spark & Paimon Meetup · 北京站,助力 LakeHouse 架构生产落地
2024年11月15日13:30北京市朝阳区阿里中心-望京A座-05F,阿里云 EMR 技术团队联合 Apache Paimon 社区举办 Apache Spark & Paimon meetup,助力企业 LakeHouse 架构生产落地”线下 meetup,欢迎报名参加!
121 3
|
2月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
363 33
The Past, Present and Future of Apache Flink
|
4月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
982 13
Apache Flink 2.0-preview released
|
4月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
160 3

热门文章

最新文章

推荐镜像

更多