Flink报错问题之报类型转换错误如何解决

简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到

INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*)

from kafka_ods_artemis_out_order group by warehouse_id;

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.TableException: Table sink

'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming

update changes which is produced by node

GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS

EXPR$1])

在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。

我看现在 Flink-1.11 中是用了 KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让

GroupBy 的结果也发送到 Kafka 呢?

*来自志愿者整理的flink邮件归档



参考答案:

DynamicTableSink有一个方法是getChangelogMode,可以通过这个方法来指定这个sink接收什么种类的数据*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370230?spm=a2c6h.12873639.article-detail.63.6f9243783Lv0fl



问题二:FlinkSQL 任务提交后 任务名称问题

代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into esSinkTable select ... from kafkaSourceTable")执行

任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”

这样很不友好啊,能不能我自己指定任务名称呢?

*来自志愿者整理的flink邮件归档



参考答案:

在zeppelin中你可以指定insert 语句的job name,如下图,(对Zeppelin感兴趣的,可以加入钉钉群:32803524)

%flink.ssql(jobName="my job")

insert into sink_kafka select status, direction, cast(event_ts/1000000000

as timestamp(3)) from source_kafka where status <> 'foo'*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370228?spm=a2c6h.12873639.article-detail.64.6f9243783Lv0fl



问题三:state无法从checkpoint中恢复

state无法从checkpoint中恢复 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//作业失败后不重启

env.setRestartStrategy(RestartStrategies.noRestart());

env.getCheckpointConfig().setCheckpointTimeout(500);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

env.setStateBackend(new RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); 使用状态的代码private transient ListState<String> counts;

@Override

public void open(Configuration parameters) throws Exception {

StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.minutes(30))

.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

.build();

ListStateDescriptor<String> lastUserLogin = new ListStateDescriptor<>("lastUserLogin", String.class);

lastUserLogin.enableTimeToLive(ttlConfig);

counts = getRuntimeContext().getListState(lastUserLogin);

}

我重启了task managers 后。发现 counts 里面的数据都丢失了

*来自志愿者整理的flink邮件归档



参考答案:

1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象

2 能否把你关于 counts 的其他代码也贴一下

  1. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
  2. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370227?spm=a2c6h.12873639.article-detail.65.6f9243783Lv0fl



问题四:sql 内嵌josn数组解析报 类型转换报错

hi all我这边有个嵌套的json数组,报类型转换错误(ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),这里报错),是不是不能这么写

create table hiido_push_sdk_mq (

datas   ARRAY<ROW<from string,hdid string,event string,hiido_time bigint,ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE>>

) with (

'connector' = 'kafka',

'topic' = 'hiido_pushsdk_event',

'properties.bootstrap.servers' = 'kafkafs002-core001.yy.com:8103,kafkafs002-core002.yy.com:8103,kafkafs002-core003.yy.com:8103',

'properties.group.id' = 'push_click_sql_version_consumer',

'scan.startup.mode' = 'latest-offset',

'format.type' = 'json');

错误如下:

[ERROR] 2020-07-17 20:17:50,640(562284338) --> [http-nio-8080-exec-10] com.yy.push.flink.sql.gateway.sql.parse.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:77): parseBySqlParser, parse: com.yy.push.flink.sql.gateway.context.JobContext$1@5d5f32d1, stmt: create table hiido_push_sdk_mq (    datas   ARRAY<ROW<from string,hdid string,event string,hiido_time bigint,ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE>>) with ('connector' = 'kafka','topic' = 'hiido_pushsdk_event','properties.bootstrap.servers' = 'kafkafs002-core001.yy.com:8103,kafkafs002-core002.yy.com:8103,kafkafs002-core003.yy.com:8103','properties.group.id' = 'push_click_sql_version_consumer','scan.startup.mode' = 'latest-offset','format.type' = 'json'), error info: SQL parse failed. Encountered "AS" at line 1, column 115.

Was expecting one of:

   "ROW" ...

   <BRACKET_QUOTED_IDENTIFIER> ...

   <QUOTED_IDENTIFIER> ...

   <BACK_QUOTED_IDENTIFIER> ...

   <IDENTIFIER> ...

   <UNICODE_QUOTED_IDENTIFIER> ...

   "STRING" ...

   "BYTES" ...

   "ARRAY" ...

   "MULTISET" ...

   "RAW" ...

   "BOOLEAN" ...

   "INTEGER" ...

   "INT" ...

   "TINYINT" ...

   "SMALLINT" ...

   "BIGINT" ...

   "REAL" ...

   "DOUBLE" ...

   "FLOAT" ...

   "BINARY" ...

   "VARBINARY" ...

   "DECIMAL" ...

   "DEC" ...

   "NUMERIC" ...

   "ANY" ...

   "CHARACTER" ...

   "CHAR" ...

   "VARCHAR" ...

   "DATE" ...

   "TIME" ...

   "TIMESTAMP" ...

*来自志愿者整理的flink邮件归档



参考答案:

计算列只能写在最外层,不能在嵌套类型里面有计算列。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370226?spm=a2c6h.12873639.article-detail.66.6f9243783Lv0fl



问题五:flink不带参数的udf始终返回第一次调用的结果

我有一个不带参数的udf,用于返回系统当前时间的字符串格式,但是调用时每次都返回这个udf第一次调用的结果,所以拿到的时间全部都是一样的

udf的实时如下:

public class GetTimeFunc extends ScalarFunction {

public String eval() {

return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

}

}

请问,针对这种没有入参的udf,flink内部是有做什么优化吗,导致每次调用返回的结果都一样?

*来自志愿者整理的flink邮件归档



参考答案:

是的,这种就被当做常量被优化掉了。

你可以覆盖一下ScalarFunction#isDeterministic方法,说明你这个函数时非确定性的,就不会被优化掉了。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370224?spm=a2c6h.12873639.article-detail.67.6f9243783Lv0fl

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL Oracle NoSQL
实时计算 Flink版操作报错合集之报错“找不到对应的归档日志文件”,怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
8月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
722 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

相关产品

  • 实时计算 Flink版