flink报错踩坑:org.apache.flink.table.catalog.hive.client.HiveShimV100.registerTemporaryFunction

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 当想使用本地开发环境运行flink读写线上hive数据来运行时报错。我使用maven管理的开发环境依赖。由于代码发布到测试环境集群上跑时并没有报错,而测试环境对应的依赖都是使用放在上面的依赖jar的,并不使用本地maven管理的依赖(也就是没有打入项目jar)。所以我猜测是本地运行环境依赖有问题,也就是项目中maven的pom文件的依赖有问题。
出错场景

当想使用本地开发环境运行flink读写线上hive数据来运行时报错。我使用maven管理的开发环境依赖。由于代码发布到测试环境集群上跑时并没有报错,而测试环境对应的依赖都是使用放在上面的依赖jar的,并不使用本地maven管理的依赖(也就是没有打入项目jar)。所以我猜测是本地运行环境依赖有问题,也就是项目中maven的pom文件的依赖有问题。

在多次检查该项目中maven的pom文件导入的依赖和反复查看flink Table API Connector针对hive的官方文档后终于解决了问题。

下面是我处理问题的一些环境版本:

  • flink 1.13.6
  • hive 1.1.0-cdh5.15.1
  • hadoop 2.6.0-cdh5.15.1

这两个问题主要原因是使用maven管理本地环境依赖时依赖没有使用正确。虽说原因很简单,但实际在我们使用排查的时候往往要花费大量的时间解决,而且官方文档的示例主要还是依据主流版本组件给的,所以直接寻找很难找到解决的办法。

错误一:
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hive.common.JavaUtils.closeClassLoadersTo(Ljava/lang/ClassLoader;Ljava/lang/ClassLoader;)Z
at org.apache.flink.table.planner.delegation.hive.HiveParser$HiveParserSessionState.close(HiveParser.java:397)
at org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:342)
at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:210)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
at com.medbook.foreign.ella.transform.ResOperationLog.transRes(ResOperationLog.java:39)
at com.medbook.foreign.ella.transform.ResOperationLog.main(ResOperationLog.java:45)

这个是在我maven的pom中使用如下依赖时产生的,只列出了重点的依赖,在最下面我会给出完整的flink连接hive读写的依赖

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.13.6</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
    <hive.version>1.1.0</hive.version>
    <hadoop.version>2.6.0</hadoop.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>

    <!--hive-->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>${hive.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>


    <!--hadoop-->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>
 
</dependencies>

我们可以看到hive、hadoop的版本时使用原生的版本,而我的集群环境的hive、hadoop版本都使用的时cdh版本所以报错了此种找不到某个方法的错误,这是由于兼容性问题。

    <hive.version>1.1.0</hive.version>
    <hadoop.version>2.6.0</hadoop.version>

随后我把它换成了如下的版本:

<hive.version>1.1.0-cdh5.15.1</hive.version><hadoop.version>2.6.0-cdh5.15.1</hadoop.version>
错误二:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.table.catalog.hive.client.HiveShimV100.registerTemporaryFunction(HiveShimV100.java:422)
at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:207)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
at com.medbook.foreign.ella.transform.ResOperationLog.transRes(ResOperationLog.java:37)
at com.medbook.foreign.ella.transform.ResOperationLog.main(ResOperationLog.java:43)

在我解决完错误一以后又爆出了错误二,看到错误我了解到可能是flink连接hive依赖的版本有问题。如下这个依赖:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>

我查阅了官方给的示例maven,如下图。心里想着没错啊,百思不得其解。
在这里插入图片描述

最后突然想到集群环境服务器上我是下载了flink指定hive版本的连接jar包,该不会是maven导入依赖下也需要指定把,于是我再次翻看了文章对应的内容。
在这里插入图片描述

然后我将依赖改为了下面的内容:

<!-- Flink-hive -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-hive-1.2.2_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <!--<scope>provided</scope>-->
</dependency>

依赖竟然导入成功了,我想这次稳了,果然稳了,数据跑出来了。可以看出我把

flink-connector-hive_${scala.binary.version}

改成了

flink-sql-connector-hive-1.2.2_${scala.binary.version}

就是上面指定的hive版本的连接器jar,所以你们其他版本导入连接依赖时一定也要按照指定版本的来。

下面我的环境下连接hive的最简依赖pom,其他环境也类似,重点是官方示例没有给出的hadoop-client的环境依赖,为啥呢,是因为hive最后也还是要连接hadoop的。相当于我们本地环境需要连接集群的hadoop,所以需要有一个hadoop-client依赖,同时你的本地也配置了连接集群hadoop的hadoop包以及环境变量。而我们官网给出的示例是针对在线上集群环境下的依赖,本身就包含了flink-shaded-hadoop-2-uber-2.7.5-10.0.jar这种连接hadoop的依赖jar,所以就不需要给出啦!但我们本地环境当然需要我们自己配置了。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>book-data-warehouse</artifactId>
        <groupId>com.medbook.warehouse</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.6</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hive.version>1.1.0-cdh5.15.1</hive.version>
        <hadoop.version>2.6.0-cdh5.15.1</hadoop.version>
    </properties>

    <dependencies>
    
        <!-- flink-client-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink-hive -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-hive-1.2.2_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <!-- Hive Dependency -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <!--hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

    </dependencies>

    <build>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>


        </plugins>
    </build>


</project>

最后的提醒:由于大数据各组件纷繁,又存在各种版本以及操作依赖的兼容性,所以看官方给出的文档示例一定要举一反三,找到自己版本对应的依赖包、以及api,才能正确书写、并运行代码。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
17天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
465 13
Apache Flink 2.0-preview released
|
22天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
53 3
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
2月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
369 31
Apache Flink 流批融合技术介绍
|
26天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
51 1
|
25天前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
|
30天前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
66 0
|
2月前
|
SQL Java 关系型数据库
Hive常见的报错信息
文章列举了Hive常见的几种报错信息,并提供了错误复现、原因分析以及相应的解决方案。
89 1
|
3月前
|
Java 微服务 Spring
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
73 0

热门文章

最新文章

推荐镜像

更多