Flink报错问题之flink 1.11 sql作业提交JM报错如何解决

简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink1.10.1 flink sql消费kafka当parallelism大于1时不产生wat

本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:

insert into

x.report.bi_report_fence_common_indicators

select

fence_id,

'finishedOrderCnt' as indicator_name,

TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,

count(1) as indicator_val

from

(

select

dt,

fence_id,

fence_coordinates_array,

c.driver_location

from

(

select

*

from

(

select

dt,

driver_location,

r1.f1.fence_info as fence_info

from

(

select

o.dt,

o.driver_location,

MD5(r.city_code) as k,

PROCTIME() as proctime

from

(

select

order_no,

dt,

driver_location,

PROCTIME() as proctime

from

x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner

where

_type = 'insert'

and event_code = 'arriveAndSettlement'

) o

LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no

) o1

LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k

) a

where

fence_info is not null

) c

LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE

) as b

where

in_fence(fence_coordinates_array, driver_location)

group by

TUMBLE(dt, INTERVAL '5' MINUTE),

fence_id;

其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:

CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(

_type STRING,

_old_id BIGINT,

id BIGINT,

_old_order_no STRING,

order_no STRING,

_old_event_code STRING,

event_code STRING,

_old_from_state TINYINT,

from_state TINYINT,

_old_to_state TINYINT,

to_state TINYINT,

_old_operator_type TINYINT,

operator_type TINYINT,

_old_passenger_location STRING,

passenger_location STRING,

_old_driver_location STRING,

driver_location STRING,

_old_trans_time STRING,

trans_time STRING,

_old_create_time STRING,

create_time STRING,

_old_update_time STRING,

update_time STRING,

_old_passenger_poi_address STRING,

passenger_poi_address STRING,

_old_passenger_detail_address STRING,

passenger_detail_address STRING,

_old_driver_poi_address STRING,

driver_poi_address STRING,

_old_driver_detail_address STRING,

driver_detail_address STRING,

_old_operator STRING,

operator STRING,

_old_partition_index TINYINT,

partition_index TINYINT,

dt as TO_TIMESTAMP(trans_time),

WATERMARK FOR dt AS dt - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.properties.bootstrap.servers' = '*',

'connector.properties.zookeeper.connect' = '*',

'connector.version' = 'universal',

'format.type' = 'json',

'connector.properties.group.id' = 'testGroup',

'connector.startup-mode' = 'group-offsets',

'connector.topic' = 'xxxxx'

)

*来自志愿者整理的flink邮件归档



参考答案:

可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370157?spm=a2c6h.12873639.article-detail.103.6f9243783Lv0fl



问题二:flink sql 侧输出

大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql

api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?

*来自志愿者整理的flink邮件归档



参考答案:

Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据,

你可以给你的作业配上:

table.exec.emit.late-fire.enabled = true

table.exec.emit.late-fire.delay = 1min

同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window

allowLateness 。

具体可以看下 org.apache.flink.table.planner.plan.utils.WindowEmitStrategy 这个类。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370156?spm=a2c6h.12873639.article-detail.104.6f9243783Lv0fl



问题三:flink 1.11 sql作业提交JM报错

我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more

*来自志愿者整理的flink邮件归档



参考答案:

简单说下这两个方法, TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。 Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法, 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”), TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job, 这应该不是用户需要的。 具体使用根据你的需要来使用。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370155?spm=a2c6h.12873639.article-detail.105.6f9243783Lv0fl



问题四:flink 1.11 local execution oom问题

我在使用1.11版本在本地idea起一个作业时,并发为1,抛出了如下关于内存的异常。。问题是之前从来没有显示配置过taskmanager的memory参数,这是为何? 感觉由1.10升级到1.11问题还是挺多的。。我尝试增加了JVM参数,增加DirectMemory内存配置,还是没有作用,请教大神帮忙看下。

Exception in thread "main" java.lang.OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool (required (Mb): 64, allocated (Mb): 63, missing (Mb): 1). Cause: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...

*来自志愿者整理的flink邮件归档



参考答案:

这个问题可以看下是否和 releasenote[1] 中 memory configuration

相关的修改有关,具体到这个错误,你可以按照提示增加一些内存看看

[1]

https://flink.apache.org/news/2020/07/06/release-1.11.0.html#other-improvements

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370153?spm=a2c6h.12873639.article-detail.106.6f9243783Lv0fl



问题五:flink1.10.1在yarn上无法写入kafka的问题

请教各位:

flink任务在本机写入测试环境kafka集群没问题,

但是上传到yarn环境,就是写不进去,其他job运行在yarn可以写入测试环境的kafka

异常信息如下:

2020-07-09 19:17:33,126 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (1/2) (9449b1e3b758a40fb5e1e60cf84fd844) switched from DEPLOYING to RUNNING.

2020-07-09 19:17:33,164 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (2/2) (bc6eefd911cf44412121939d0afa6a81) switched from DEPLOYING to RUNNING.

2020-07-09 19:17:39,049 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - async wait operator -> Sink: Unnamed (1/2) (cfc31005099a8ad7e44a94dc617dd45f) switched from RUNNING to FAILE

D.

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka:

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)

at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)

at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)

at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)

at org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)

*来自志愿者整理的flink邮件归档



参考答案:

应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。

这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370151?spm=a2c6h.12873639.article-detail.107.6f9243783Lv0fl

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1067 43
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
483 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
9月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1175 1
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
2311 27
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
723 13
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
518 9
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
338 6
|
SQL Oracle 关系型数据库
MySQL、SQL Server和Oracle数据库安装部署教程
数据库的安装部署教程因不同的数据库管理系统(DBMS)而异,以下将以MySQL、SQL Server和Oracle为例,分别概述其安装部署的基本步骤。请注意,由于软件版本和操作系统的不同,具体步骤可能会有所变化。
1364 3
|
存储 SQL C++
对比 SQL Server中的VARCHAR(max) 与VARCHAR(n) 数据类型
【7月更文挑战7天】SQL Server 中的 VARCHAR(max) vs VARCHAR(n): - VARCHAR(n) 存储最多 n 个字符(1-8000),适合短文本。 - VARCHAR(max) 可存储约 21 亿个字符,适合大量文本。 - VARCHAR(n) 在处理小数据时性能更好,空间固定。 - VARCHAR(max) 对于大文本更合适,但可能影响性能。 - 选择取决于数据长度预期和业务需求。
1358 1

相关产品

  • 实时计算 Flink版