Flink问题之嵌套 json 中string 数组的解析异常如何解决

简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:嵌套 json 中string 数组的解析异常


我使用 flink 1.9 处理嵌套 json, 它嵌套了一个string数组,构造出的 table schema结构为:

Row(parsedResponse: BasicArrayTypeInfo , timestamp: Long)

执行作业后会发生报错如下,出现 object 类型和string 类型的转换错误

Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast

to [Ljava.lang.String;

at

org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)

at

org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)

at

org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)

大佬们知道该怎么修改么?

我的json 的结构如下:

{"parsedResponse":["apple", "banana", "orange"], "timestamp": "1522253345"}

P.S:

如果把 string 数组改为 long 数组或者 double 数组执行对应的操作可以正确运行,目前来看只有 string 数组出现问题。


参考回答:

看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的 type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372075


问题二:【Flink的shuffle mode】


现在就两种:pipeline和batch

batch的话是block住,直到执行完毕才发给下游的,所以这个shuffle mode一般只对批作业有用。

理论上可以per transformation的来设置,see PartitionTransformation.


参考回答:

那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372073


问题三:Flink 1.11 SQL作业中调用UDTF 出现“No match found for func


本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:

@FunctionHint( input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")}, output = @DataTypeHint("STRING") ) public class Split extends TableFunction { public Split(){} public void eval(String str, String ch) { if (str == null || str.isEmpty()) { return; } else { String[] ss = str.split(ch); for (String s : ss) { collect(s); } } } }

在flink sql中通过 create function splitByChar as '..Split' 来创建这个function,在tableEnv 中调用executeSql(....) 来完成对这个 function的注册,在sql 后面的计算逻辑中 通过以下方式来调用这个UDTF create view view_source_1 as select dateTime,itime`, lng,lat,net,event_info, cast(split_index(T.s, '_', 0) as int) as time_page from view_source as a left join LATERAL TABLE (splitByChar('a,b,c',',')) as T(s) on true;

结果一直出现以下错误信息: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 25 to line 3, column 47: No match found for function signature splitByChar( , ) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:629) .................... Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, column 25 to line 3, column 47: r( , ) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 8 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature splitByChar( , ) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)

之前在flink 1.10 里面使用是正常的, 问下各位大佬有没有在flink 1.11 遇到过这个错误, 麻烦提供一下帮助。


参考回答:

我感觉这应该是新版本的udf的bug,我在本地也可以复现。 已经建了一个issue[1] 来跟进。

[1] https://issues.apache.org/jira/browse/FLINK-18520


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372070


问题四:flink sql 读写写kafka表的时候可以指定消息的key吗


 flink sql 写kafka表的时候可以指定消息的key吗?

看官网的kafka connector没有找到消息key相关的说明

如果可以的话,如何指定?


参考回答:

目前还不支持的,社区有一个 FLIP-107[1] 在计划做这个事情。[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell. https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372063


问题五:DataStream的state问题


想问下,在给state设置ttl的时候,如下面的代码:                    StateTtlConfig ttlConfig = StateTtlConfig

                           .newBuilder(Time.days(1))

                           .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

                           .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

                           .build();

设置了1天时间之后失效,例如2020-07-07 08:30:00点开始的job,那失效时间是这个时间段2020-07-07 00:00:00~2020-07-07 23:59:59,还是job上线之后,2020-07-07 08:30:00~2020-07-08 08:30:00这个时间段?


参考回答:

是最后一次 access 的时间到当前的时间超过了你设置的 ttl 间隔,比如你配置的是 OnCreateAndWrite

那么就是创建和写操作之后的 1 天,这个 state 会变成 expired,具体的可以参考文档[1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372062

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
479 3
|
JavaScript
js 解析 byte数组 成字符串
js 解析 byte数组 成字符串
380 5
|
9月前
|
数据采集 JSON 数据可视化
JSON数据解析实战:从嵌套结构到结构化表格
在信息爆炸的时代,从杂乱数据中提取精准知识图谱是数据侦探的挑战。本文以Google Scholar为例,解析嵌套JSON数据,提取文献信息并转换为结构化表格,通过Graphviz制作技术关系图谱,揭示文献间的隐秘联系。代码涵盖代理IP、请求头设置、JSON解析及可视化,提供完整实战案例。
586 4
JSON数据解析实战:从嵌套结构到结构化表格
|
9月前
|
存储 监控 算法
关于员工上网监控系统中 PHP 关联数组算法的学术解析
在当代企业管理中,员工上网监控系统是维护信息安全和提升工作效率的关键工具。PHP 中的关联数组凭借其灵活的键值对存储方式,在记录员工网络活动、管理访问规则及分析上网行为等方面发挥重要作用。通过关联数组,系统能高效记录每位员工的上网历史,设定网站访问权限,并统计不同类型的网站访问频率,帮助企业洞察员工上网模式,发现潜在问题并采取相应管理措施,从而保障信息安全和提高工作效率。
176 7
|
12月前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
471 16
|
人工智能 前端开发 JavaScript
拿下奇怪的前端报错(一):报错信息是一个看不懂的数字数组Buffer(475) [Uint8Array],让AI大模型帮忙解析
本文介绍了前端开发中遇到的奇怪报错问题,特别是当错误信息不明确时的处理方法。作者分享了自己通过还原代码、试错等方式解决问题的经验,并以一个Vue3+TypeScript项目的构建失败为例,详细解析了如何从错误信息中定位问题,最终通过解读错误信息中的ASCII码找到了具体的错误文件。文章强调了基础知识的重要性,并鼓励读者遇到类似问题时不要慌张,耐心分析。
387 5
|
网络安全 Docker 容器
【Bug修复】秒杀服务器异常,轻松恢复网站访问--从防火墙到Docker服务的全面解析
【Bug修复】秒杀服务器异常,轻松恢复网站访问--从防火墙到Docker服务的全面解析
531 0
|
JSON Java 数据格式
Java系列之:如何取出嵌套JSON中的数据值
这篇文章介绍了如何在Java中取出嵌套JSON数据值的方法,通过使用`JSONObject`类及其`getJSONObject`和`get`方法来逐步解析和提取所需的数据。
Java系列之:如何取出嵌套JSON中的数据值
|
网络协议 Java 数据库连接
13 Java异常(异常过程解析、throw、throws、try-catch关键字)
13 Java异常(异常过程解析、throw、throws、try-catch关键字)
319 2
|
存储 JavaScript 前端开发
一文带你深度解析:JavaScript中对象与数组的威力究竟有多大?
一文带你深度解析:JavaScript中对象与数组的威力究竟有多大?

热门文章

最新文章

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多
  • DNS