Flink SQL 结合 HiveCatalog 使用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 支持 HiveCatalog 作为表元数据持久化的介质,在生产环境我们一般采用 HiveCatalog 来管理元数据, 这样的好处是不需要重复使用 DDL 创建表,只需要关心业务逻辑的 SQL,简化了开发的流程,可以节省很多时间,下面就来介绍一下怎么配置和使用 HiveCatalog.sql-client-defaults.yaml 配置

Flink 支持 HiveCatalog 作为表元数据持久化的介质,在生产环境我们一般采用 HiveCatalog 来管理元数据, 这样的好处是不需要重复使用 DDL 创建表,只需要关心业务逻辑的 SQL,简化了开发的流程,可以节省很多时间,下面就来介绍一下怎么配置和使用 HiveCatalog.


sql-client-defaults.yaml 配置


catalogs:
   - name: myhive
     type: hive
     hive-conf-dir: /home/jason/bigdata/hive/hive-2.3.4
     default-database: mydatabase


添加依赖


-rw-r--r--. 1 root root     42998 Jul 22  2020 flink-connector-filesystem_2.11-1.11.1.jar
-rw-r--r--. 1 root root    196416 Dec 11 17:51 flink-connector-jdbc_2.11-1.12.0.jar
-rw-r--r--. 1 root root     91553 Dec  2 17:46 flink-csv-1.12.0.jar
-rw-r--r--. 1 root root 114120165 Dec  2 17:50 flink-dist_2.11-1.12.0.jar
-rw-r--r--. 1 root root    136663 Dec  2 17:46 flink-json-1.12.0.jar
-rw-r--r--. 1 root root  43317025 Dec 11 12:44 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r--. 1 root root   7709741 Sep 30 01:49 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r--. 1 root root   3309441 Dec 12 15:35 flink-sql-avro-1.12.0.jar
-rw-r--r--. 1 root root  40650306 Dec 19 12:42 flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar
-rw-r--r--. 1 root root   3650212 Dec 11 14:44 flink-sql-connector-kafka_2.11-1.12.0.jar
-rw-r--r--. 1 root root   2124047 Dec 12 15:35 flink-sql-orc_2.11-1.12.0.jar
-rw-r--r--. 1 root root   5666201 Dec 12 15:35 flink-sql-parquet_2.11-1.12.0.jar
-rw-r--r--. 1 root root  36147819 Dec  2 17:49 flink-table_2.11-1.12.0.jar
-rw-r--r--. 1 root root  40286358 Dec  2 17:49 flink-table-blink_2.11-1.12.0.jar
-rw-r--r--. 1 root root  34214106 Dec 19 19:18 hive-exec-2.3.4.jar
-rw-r--r--. 1 root root     67114 Feb 22  2020 log4j-1.2-api-2.12.1.jar
-rw-r--r--. 1 root root    276771 Feb 22  2020 log4j-api-2.12.1.jar
-rw-r--r--. 1 root root   1674433 Feb 22  2020 log4j-core-2.12.1.jar
-rw-r--r--. 1 root root     23518 Feb 22  2020 log4j-slf4j-impl-2.12.1.jar
-rw-r--r--. 1 root root   1007502 Dec 11 17:50 mysql-connector-java-5.1.47.jar


代码里面使用 HiveCatalog


package flink.ddl
import java.time.ZoneOffset._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings.newInstance
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment.create
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.module.hive.HiveModule
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.io.Text
/**
 * Flink SQL 使用 hive catalog
 */
object FlinkDDLHiveCatalog {
    private val catalog_name = "myhive"
    private val hive_version = "2.3.4"
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val settings = newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        val tEnv = create(env, settings)
        tEnv.getConfig.setLocalTimeZone(ofHours(8))
        // 设置 early fired
        tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true)
        tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "5000 ms")
        // 设置 job name
        tEnv.getConfig.getConfiguration.setString("pipeline.name",this.getClass.getSimpleName.replace("$",""))
        val catalog = new HiveCatalog(
            catalog_name,                   // catalog name
            "mydatabase",                // default database
            "/home/jason/bigdata/hive/hive-2.3.4",  // Hive config (hive-site.xml) directory
            hive_version                   // Hive version
        )
        // 注册 catalog
        tEnv.registerCatalog("myhive", catalog)
        // 选择一个 catalog
        tEnv.useCatalog("myhive")
        // 选择 database
        tEnv.useDatabase("mydatabase")
        // 加载 hive 的内置函数
        tEnv.loadModule(catalog_name,new HiveModule(hive_version))
        // kafka_source_jason 和 print_table 提前已经创建好可以直接使用
        tEnv.executeSql(
            """
              |insert into print_table
              |select
              |lower(funcName),
              |MIN(`timestamp`) as min_timestamp,
              |FIRST_VALUE(`timestamp`) as first_timestamp,
              |MAX(`timestamp`) as max_timestamp
              |from kafka_source_jason
              |group by funcName
              |""".stripMargin)
    }
}


因为 kafka_source_jason 和 print_table 这两张表提前已经创建过了,已经保存在 HiveCatalog 里面,所以代码里面可以直接使用不用再次创建.


提交任务


在启动任务之前,需要先把 Hiv e的 metastore 起起来,因为 HiveCatalog 会和 metastore 连接这样才能访问元数据信息.


hive --service metastore &
flink run -d -m yarn-cluster \
-c flink.ddl.FlinkDDLHiveCatalog \
-yqu flink \
-nm FlinkDDLHiveCatalog \
-p 6 \
/home/jason/bigdata/jar/flink-1.11.1-1.0-SNAPSHOT.jar


打印结果



上面的代码还加载了 hive 的内置函数, Flink SQL 里面可以直接使用 hive 的内置函数, SQL 中的 lower 就是 hive 的函数可以直接拿来使用,这样就会非常的方便.


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
12天前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
49 15
|
14天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
38 2
|
14天前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
29 1
|
30天前
|
SQL 流计算
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
38 1
|
26天前
|
SQL 资源调度 流计算
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
|
30天前
|
SQL 设计模式 数据处理
Flink SQL 在快手实践问题之状态兼容的终极方案特点内容如何解决
Flink SQL 在快手实践问题之状态兼容的终极方案特点内容如何解决
14 0
|
2月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
705 7
阿里云实时计算Flink在多行业的应用和实践
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
26天前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行