如何在 Flink 1.9 中使用 Hive?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink 从 1.9.0 版本开始增加了与 Hive 集成的功能,用户可以通过 Flink 来访问 Hive 的元数据,以及读写 Hive 中的表。本文将主要从项目的设计架构、最新进展、使用说明等方面来介绍这一功能。

Apache Flink 从 1.9.0 版本开始增加了与 Hive 集成的功能,用户可以通过 Flink 来访问 Hive 的元数据,以及读写 Hive 中的表。本文将主要从项目的设计架构、最新进展、使用说明等方面来介绍这一功能。

Flink on Hive 介绍

SQL 是大数据领域中的重要应用场景,为了完善 Flink 的生态,发掘 Flink 在批处理方面的潜力,我们决定增强 FlinkSQL 的功能,从而让用户能够通过 Flink 完成更多的任务。

Hive 是大数据领域最早出现的 SQL 引擎,发展至今有着丰富的功能和广泛的用户基础。之后出现的 SQL 引擎,如 Spark SQL、Impala 等,都在一定程度上提供了与 Hive 集成的功能,从而方便用户使用现有的数据仓库、进行作业迁移等。因此我们认为提供与 Hive 交互的能力对于 FlinkSQL 也是非常重要的。

设计架构

与 Hive 集成主要包含了元数据和实际表数据的访问,因此我们会从这两方面介绍一下该项目的架构。

元数据

为了访问外部系统的元数据,Flink 提供了 ExternalCatalog 的概念。但是目前 ExternalCatalog 的定义非常不完整,基本处于不可用的状态。因此,我们提出了一套全新的 Catalog 接口来取代现有的 ExternalCatalog。新的 Catalog 能够支持数据库、表、分区等多种元数据对象;允许在一个用户 Session 中维护多个 Catalog 实例,从而同时访问多个外部系统;并且 Catalog 以可插拔的方式接入 Flink,允许用户提供自定义的实现。下图展示了新的 Catalog API 的总体架构。
image
创建 TableEnvironment 的时候会同时创建一个 CatalogManager,负责管理不同的 Catalog 实例。TableEnvironment 通过 Catalog 来为 Table API 和 SQL Client 用户提供元数据服务。

目前 Catalog 有两个实现,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元数据管理机制,将所有元数据保存在内存中。而 HiveCatalog 会与一个 Hive Metastore 的实例连接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户需要配置一个 HiveCatalog,并通过 HiveCatalog 访问 Hive 中的元数据。另一方面,HiveCatalog 也可以用来处理 Flink 自身的元数据,在这种场景下,HiveCatalog 仅将 Hive Metastore 作为持久化存储使用,写入 Hive Metastore 中的元数据并不一定是 Hive 所支持的格式。一个 HiveCatalog 实例可以同时支持这两种模式,用户无需为管理 Hive 和 Flink 的元数据创建不同的实例。

另外,我们设计了 HiveShim 来支持不同版本的 Hive Metastore。目前支持的 Hive 版本包括 2.3.4 和 1.2.1。

表数据

我们提供了 Hive Data Connector 来读写 Hive 的表数据。Hive Data Connector 尽可能的复用了 Hive 本身的 Input/Output Format 和 SerDe 等类,这样做的好处一方面是减少了代码重复,更重要的是可以最大程度的保持与 Hive 的兼容,即 Flink 写入的数据 Hive 可以正常读取,并且反之亦然。

与 HiveCatalog 类似的,Hive Data Connector 目前支持的 Hive 版本也是 2.3.4 和 1.2.1。

项目进展

Flink 与 Hive 集成的功能会在 1.9.0 版本中作为试用功能发布,用户可以通过 Table API 或者 SQL Client 的模式与 Hive 进行交互。下面列出的是在 1.9.0 中已经支持的功能:

  • 提供简单的 DDL 来读取 Hive 元数据,比如 show databases、show tables、describe table 等。
  • 可通过 Catalog API 来修改 Hive 元数据,如 create table、drop table 等。
  • 读取 Hive 数据,支持分区表和非分区表。
  • 写 Hive 数据,支持非分区表。
  • 支持 Text、ORC、Parquet、SequenceFile 等文件格式。
  • 支持调用用户在 Hive 中创建的 UDF。

由于是试用功能,因此还有一些方面不够完善,下面列出的是在 1.9.0 中缺失的功能:

  • 不支持INSERT OVERWRITE。
  • 不支持写分区表。
  • 不支持ACID表。
  • 不支持Bucket表。
  • 不支持View。
  • 部分数据类型不支持,包括Decimal、Char、Varchar、Date、Time、Timestamp、Interval、Union等。

如何应用

添加依赖

使用 Flink 与 Hive 集成的功能,用户首先需要添加相应的依赖。如果是使用 SQL Client,则需要将依赖的 jar 添加到 Flink 的 lib 目录中;如果使用 Table API,则需要将相应的依赖添加到项目中(如pom.xml)。

如上文所述,目前支持的 Hive 版本包括 2.3.4 和 1.2.1,下表列出的是针对不同版本所需的依赖。

Hive版本 所需依赖
2.3.4 flink-connector-hive_2.11
flink-hadoop-compatibility
flink-shaded-hadoop-2-uber-2.7.5
hive-exec
1.2.1 flink-connector-hive_2.11
flink-hadoop-compatibility
flink-shaded-hadoop-2-uber-2.6.5
hive-metastore
hive-exec
libfb303-0.9.3

其中 flink-shaded-hadoop-2-uber 包含了 Hive 对于 Hadoop 的依赖。如果不用 Flink 提供的包,用户也可以将集群中使用的 Hadoop 包添加进来,不过需要保证添加的 Hadoop 版本与 Hive 所依赖的版本是兼容的(Hive 2.3.4 依赖的 Hadoop 版本是 2.7.2;Hive 1.2.1 依赖的 Hadoop 版本是 2.6.0)。

依赖的 Hive 包(即 hive-exec 和 hive-metastore)也可以使用用户集群中 Hive 所提供的 jar 包,详情请见支持不同的 Hive 版本。

配置 HiveCatalog

要与 Hive 交互,必须使用 HiveCatalog,下面介绍一下如何配置 HiveCatalog。

SQL Client

使用 SQL Client 时,用户需要在 sql-client-defaults.yaml 中指定自己所需的 Catalog,在 sql-client-defaults.yaml 的“catalogs”列表中可以指定一个或多个 Catalog 实例。以下的示例展示了如何指定一个 HiveCatalog:

catalogs:
# A typical catalog definition looks like:
  - name: myhive
    type: hive
hive-conf-dir: /path/to/hive_conf_dir
hive-version: 2.3.4

其中 name 是用户给每个 Catalog 实例指定的名字, Catalog 名字和 DB 名字构成了 FlinkSQL 中元数据的命名空间,因此需要保证每个 Catalog 的名字是唯一的。type 表示 Catalog 的类型,对于 HiveCatalog 而言,type 应该指定为 hive。hive-conf-dir 用于读取 Hive 的配置文件,用户可以将其设定为集群中 Hive 的配置文件目录。hive-version 用于指定所使用的 Hive 版本,可以设定为 2.3.4 或者 1.2.1。

指定了 HiveCatalog 以后,用户就可以启动 sql-client,并通过以下命令验证 HiveCatalog 已经正确加载。

Flink SQL> show catalogs;
default_catalog
myhive

Flink SQL> use catalog myhive;

其中 show catalogs 会列出加载的所有 Catalog 实例。需要注意的是,除了用户在sql-client-defaults.yaml 文件中配置的 Catalog 以外,FlinkSQL 还会自动加载一个 GenericInMemoryCatalog 实例作为内置的 Catalog,该内置 Catalog 默认名字为 default_catalog。

使用 use catalog 可以设定用户 Session 当前的 Catalog。用户在 SQL 语句中访问元数据对象(如 DB、Table 等)时,如果不指定 Catalog 名字,则 FlinkSQL 会在当前 Catalog 中进行查找。

Table API

下面的代码展示了如何通过 TableAPI 来创建 HiveCatalog,并注册到 TableEnvironment。

String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive_conf_dir";
String version = "2.3.4";

TableEnvironment tableEnv = …; // create TableEnvironment
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
tableEnv.registerCatalog(name, hiveCatalog);
tableEnv.useCatalog(name);

将 HiveCatalog 注册到 TableEnvironment 以后,就可以在通过 TableEnvironment 提交 SQL 的时候访问 HiveCatalog 中的元数据了。与 SQL Client 类似, TableEnvironment 也提供了 useCatalog 接口让用户设定当前 Catalog。

读写 Hive 表

设置好 HiveCatalog 以后就可以通过 SQL Client 或者 Table API 来读写 Hive 中的表了。

SQL Client

假设 Hive 中已经有一张名为 src 的表,我们可以用以下的 SQL 语句来读写这张表。

Flink SQL> describe src;
root
 |-- key: STRING
 |-- value: STRING


Flink SQL> select * from src;

                      key                     value
                       100                   val_100
                       298                   val_298
                         9                     val_9
                       341                   val_341
                       498                   val_498
                       146                   val_146
                       458                   val_458
                       362                   val_362
                       186                   val_186
                       ……                   ……

Flink SQL> insert into src values ('newKey','newVal');

Table API

类似的,也可以通过 Table API 来读写上面提到的这张表。下面的代码展示了如何实现这一操作。

TableEnvironment tableEnv = …; // create TableEnvironment
tableEnv.registerCatalog("myhive", hiveCatalog);
// set myhive as current catalog
tableEnv.useCatalog("myhive");

Table src = tableEnv.sqlQuery("select * from src");
// write src into a sink or do further analysis
……

tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')");
tableEnv.execute("insert into src");

支持不同的 Hive 版本

Flink 1.9.0 中支持的 Hive 版本是 2.3.4 和 1.2.1,目前我们只针对这两个版本进行了测试。使用 SQL Client 时,如果用户没有在 sql-client-defaults.yaml 文件中指定 Hive 版本,我们会自动检测 classpath 中的 Hive 版本。如果检测到的 Hive 版本不是 2.3.4 或 1.2.1 就会报错。

借助 Hive 兼容性的保证,其它不同的小版本也比较可能是可以正常工作的。因此,如果用户使用的 Hive 小版本与我们所支持的不同,可以指定一个支持的版本来试用与 Hive 集成的功能。比如用户使用的 Hive 版本是 2.3.3,可以在 sql-client-defaults.yaml 文件或者代码中将 Hive 版本指定为 2.3.4。

执行模式与 Planner 的选择

Flink 1.9.0 中 Hive 的 TableSink 只能在 batch 模式下工作,因此如果用户想要使用 Hive 的 TableSink,需要将执行模式设置为 batch。

Flink 1.9.0 增加了新的 blink planner,由于 blink planner 相比于原来的 planner 功能更加全面,因此我们建议在使用 FlinkSQL 与 Hive 集成时使用 blink planner。后续新的功能也可能会只支持 blink planner。

使用 SQL Client 时可以像这样在 sql-client-defaults.yaml 中指定执行模式和 planner:

execution:
  # select the implementation responsible for planning table programs
  # possible values are 'old' (used by default) or 'blink'
  planner: blink
  # 'batch' or 'streaming' execution
  type: batch

对应的 Table API 的写法如下:

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

后期规划

我们会在 Flink 后续版本中进一步完善与 Hive 集成的功能,预计会在 1.10.0 版本中实现 Production-Ready。我们在后续版本中计划开展的工作包括:

  • 更完整的数据类型支持
  • 支持写分区表,包括静态和动态分区
  • 支持 INSERT OVERWRITE
  • 支持 View
  • 更完整的 DDL、DML 的支持
  • 支持 Hive 的 TableSink 在 streaming 模式下工作,以便用户将流式数据写入到 Hive 中
  • 测试并支持更多的 Hive 版本
  • 支持 Bucket 表
  • 性能测试与优化

欢迎大家试用 Flink 1.9 中的 Hive 功能,如果遇到任何问题也欢迎大家通过钉钉、邮件列表等方式与我们联系。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
59 6
|
3月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 分布式计算 数据处理
实时计算 Flink版产品使用问题之怎么将数据从Hive表中读取并写入到另一个Hive表中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
185 1
|
6月前
|
SQL 存储 大数据
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
249 0
|
6月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
235 0
|
30天前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
30 0
|
4月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。