Flink报错问题之cdc代码报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink 1.11.2 cep rocksdb 性能调优

我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。

private void bufferEvent(IN event, long currentTime) throws Exception {

long currentTs = System.currentTimeMillis();

List elementsForTimestamp = elementQueueState.get(currentTime);

if (elementsForTimestamp == null) {

this.bufferEventGetNullhistogram.update(System.currentTimeMillis()

  • currentTs);

elementsForTimestamp = new ArrayList<>();

}else {

this.bufferEventGethistogram.update(System.currentTimeMillis()-currentTs);

}

elementsForTimestamp.add(event);

long secondCurrentTs = System.currentTimeMillis();

elementQueueState.put(currentTime, elementsForTimestamp);

this.bufferEventPuthistogram.update(System.currentTimeMillis() -

secondCurrentTs);

this.bufferEventhistogram.update(System.currentTimeMillis() - currentTs);

}

通过复写CepOperator,加入了一些metics发现

this.bufferEventhistogram = metrics.histogram("buffer_event_delay", new

DescriptiveStatisticsHistogram(1000));

this.bufferEventGethistogram = metrics.histogram("buffer_event_get_delay", new

DescriptiveStatisticsHistogram(1000));

this.bufferEventGetNullhistogram =

metrics.histogram("buffer_event_get_null_delay", new

DescriptiveStatisticsHistogram(1000));

this.bufferEventPuthistogram = metrics.histogram("buffer_event_put_delay", new

DescriptiveStatisticsHistogram(1000));

在get和put比较耗时,整个bufferEvent 能达到200ms

从rocksdb的metric来看没有进行太多flush和compaction。

[image: image.png]

[image: image.png]

也参考了https://www.jianshu.com/p/2e61c2c83c57这篇文章调优过,发现效果也不是很好,一样反压。

也看过类似的问题http://apache-flink.147419.n8.nabble.com/rocksDB-td1785.html

,但是我这sst文件很小。

请教大家,为啥get和put这么耗时呢?有什么好的优化方案不?谢谢。*来自志愿者整理的flink邮件归档



参考答案:

‘@Override

public UV get(UK userKey) throws IOException, RocksDBException {

byte[] rawKeyBytes =

serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey,

userKeySerializer);

byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);

return (rawValueBytes == null ? null :

deserializeUserValue(dataInputView, rawValueBytes,

userValueSerializer));

}

@Override

public void put(UK userKey, UV userValue) throws IOException, RocksDBException {

byte[] rawKeyBytes =

serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey,

userKeySerializer);

byte[] rawValueBytes = serializeValueNullSensitive(userValue,

userValueSerializer);

backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);

}

通过源码跟踪发现,RocksDBMapState每次get和put都需要序列化和反序列化。。。应该是这个原因导致比较耗时。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371646?spm=a2c6h.13066369.question.84.6ad263826CoWoh



问题二:cdc代码报错

我运行ververica/flink-cdc-connectors git上的demo代码,报错:

2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched from RUNNING to FAILED.

java.lang.AbstractMethodError: org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V

at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)

at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)

at io.debezium.embedded.EmbeddedEngine. (EmbeddedEngine.java:583)

at io.debezium.embedded.EmbeddedEngine. (EmbeddedEngine.java:80)

at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)

at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)

at io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)

at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

源码:

public class CdcTest {

public static void main(String[] args) throws Exception {

SourceFunction sourceFunction = MySQLSource. builder()

.hostname("localhost")

.port(3306)

.databaseList("sohay") // monitor all tables under inventory database

.username("root")

.password("123456")

.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String

.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

env.execute();

}

}*来自志愿者整理的flink邮件归档



参考答案:

环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371645?spm=a2c6h.13066369.question.83.6ad26382ziEiEA



问题三:使用flink-CDC checkpoint超时问题

我刚调研flink sql cdc功能,我有一个这样的需求,就是三张CDC的表做left join关联,由于三张表都会变化,所以使用cdc来做。

前两张表数据每张大概20万条数据,最后一张表只有几十条数据,我是讲三张表关联之后做成宽表写入的mysql中。

每次数据写到10万条左右任务失败,查看了一下日志,是checkpoint超时造成的。状态后端用的rocksDB。我想问

数据量也不是很大,为什么checkpoint会超时呢,也就是10分钟没有完成checkpoint?*来自志愿者整理的flink邮件归档



参考答案:

我建议可以从两方面排查一下:

1、检查 checkpoint 的大小,是不是很大?

2、检查作业是否反压?反压的情况下 checkpoint 一般很难成功,这种情况可以先解决反压的问题。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371644?spm=a2c6h.13066369.question.86.6ad263822mbZYT



问题四:flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

本人使用flink版本为1.11.0,自定义udaf如下:

public class GetContinuousListenDuration extends AggregateFunction<Row, ContinuousListenDuration> {

private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");

@Override

@DataTypeHint("ROW<startTime TIMESTAMP(3), duration BIGINT>")

public Row getValue(ContinuousListenDuration acc) {

return Row.of(acc.getStartTime(), acc.getDuration());

}

@Override

public ContinuousListenDuration createAccumulator() {

return new ContinuousListenDuration();

}

public void accumulate(ContinuousListenDuration acc, @DataTypeHint("TIMESTAMP(3)") LocalDateTime dt, Boolean isListening) {

// 此处省略逻辑

}

}

聚合时以Timestamp(3)、Boolean作为参数,getValue返回类型是ROW<startTime TIMESTAMP(3), duration BIGINT>,函数名定义为get_continuous_listen_duration,调用该函数的sql如下:

insert into

report.result

select

id,

city_code,

get_continuous_listen_duration(

dt,

(order_no is null)

or (trim(order_no) = '')

).startTime as start_time,

get_continuous_listen_duration(

dt,

(order_no is null)

or (trim(order_no) = '')

).duration as duration

from

(

select

o.id,

o.dt,

o.order_no,

r.city_code

from

(

select

req [1] as id,

dt,

proctime,

req [2] as order_no

from

tmp_v

where

extra [1] is null

or extra [1] <> 'false'

) o

JOIN dim.right FOR SYSTEM_TIME AS OF o.proctime AS r ON r.id = o.id

) a

group by

id,

city_code

having

get_continuous_listen_duration(

dt,

(order_no is null)

or (trim(order_no) = '')

).duration >= 2

运行时发生如下异常:

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Incompatible types

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_171]

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_171]

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_171]

at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_171]

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateHavingClause(SqlValidatorImpl.java:4214) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3515) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at com.ververica.flink.table.gateway.operation.MultiSqlOperation.lambda$executeInternal$0(MultiSqlOperation.java:119) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]

at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:223) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]

at com.ververica.flink.table.gateway.operation.MultiSqlOperation.executeInternal(MultiSqlOperation.java:109) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]

我的问题是这样定义udf有什么问题吗?*来自志愿者整理的flink邮件归档



参考答案:

从这行报错堆栈来看:at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) , 应该是在对 row.startTime 或者 row. duration validate 阶段,推断类型时识别出不兼容类型,可以检测下用法有没有错误。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371643?spm=a2c6h.13066369.question.85.6ad26382Qobcby



问题五:flink 1.11.1 web ui 页面查看source算子的detail数据,recoreds

    我的flink程序正常执行,但是我在web ui监控页面查看source算子的detail信息,里面的Records Sent等度量信息,永远为0。请问是什么问题?*来自志愿者整理的flink邮件归档



参考答案:

你任务的DAG是什么样子的呢,可能的原因:

1.source本来就没有收到数据,或者没有发送到下游

2.source和下游算子chain在一起看不出来*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371642?spm=a2c6h.13066369.question.88.6ad26382p86CnM

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5天前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
117 0
Flink CDC 在阿里云实时计算Flink版的云上实践
|
29天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
93 16
|
2月前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
166 9
|
4月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
761 2
Flink CDC:新一代实时数据集成框架
|
4月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
617 14
Flink CDC 在货拉拉的落地与实践
|
5月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
486 13
Flink CDC 在新能源制造业的实践
|
5月前
|
SQL 数据库 流计算
Flink CDC数据读取问题之一致性如何解决
Flink CDC 使用Change Data Capture (CDC)技术从数据库捕获变更事件,并利用Flink的流处理能力确保数据读取一致性。相较于传统工具,它具备全增量一体化数据集成能力,满足实时性需求。在实践中解决了高效数据同步、稳定同步大量表数据等问题。应用场景包括实时数据同步、实时数据集成等。快速上手需学习基本概念与实践操作。未来发展方向包括提升效率与稳定性,并依据用户需求持续优化。
175 1
|
8月前
|
Oracle 关系型数据库 MySQL
flink cdc 插件问题之报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
8月前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
127 2
|
8月前
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

相关产品

  • 实时计算 Flink版