Flink SQL问题之复杂JSON解析如何解决

简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:ddl es 报错


源码如下:

CREATE TABLE buy_cnt_per_hour (

 hour_of_day BIGINT,

 buy_cnt BIGINT

) WITH (

 'connector.type' = 'elasticsearch',

 'connector.version' = '6',

 'connector.hosts' = 'http://localhost:9200',

 'connector.index' = 'buy_cnt_per_hour',

 'connector.document-type' = 'user_behavior',

 'connector.bulk-flush.max-actions' = '1',

 'format.type' = 'json',

 'update-mode' = 'append'

)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2、设置运行环境

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

streamEnv.setParallelism(1);

String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT "

  • ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6',"
  • "'connector.hosts' = 'http://localhost:9200','connector.index' = 'buy_cnt_per_hour',"
  • "'connector.document-type' = 'user_behavior',"
  • "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json',"
  • "'update-mode' = 'append' )";

tableEnv.sqlUpdate(sinkDDL);

Table table = tableEnv.sqlQuery("select * from test_es ");

tableEnv.toRetractStream(table, Row.class).print();

streamEnv.execute("");

}

}

具体error

The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json'The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append


参考回答:

这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作

真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。

而tableEnv.toRetractStream(table, Row.class).print();

这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372141


问题二:DataStream统计uv问题


大家好!

     想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:

     1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,

          这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。

     2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。

DataStream<UvPer10Min&gt; uvPer10MinDataStream = userBehaviorSource

.windowAll(TumblingProcessingTimeWindows.of(Time.days(1L)))

.trigger(CountTrigger.of(1L))

.evictor(CountEvictor.of(0L, true))

.process(new ProcessAllWindowFunction<UserBehavior, UvPer10Min, TimeWindow&gt;() {

private transient MapState<String, String&gt; userIdState;

private transient ValueState<Long&gt; uvCountState;

&nbsp; &nbsp;


参考回答:

我建议你用ContinuousEventTimeTrigger,可以在窗口范围内,连续触发。

你这个countTrigger,促发次数太多了,而且你后面是processWindowFunction,导致计算压力比较大。

建议你用aggregateWindowFuntion


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372140


问题三:Flink SQL如何将多个表的查询结果(列不同)聚合成一张表


select a.table_tmp1.r1 / a.table_tmp2.r2

这个是对同一行的数据进行操作,所以你需要先对table_tmp1和table_tmp2做一个join,将两个表的数据根据条件合并成一张表。

zilong xiao <acidzz163@gmail.com> 于2020年7月8日周三 下午8:55写道:

> 列如下面这样,需要查询table1 & table2,分别查询不同的字段

> 在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~

> select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from

> (

> (SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS

> table_tmp2,

> )as a


参考回答:

你得有个join条件连接两张表的


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372135


问题四:一个source多个sink的同步问题


是1个小时才到来。10:00- 11:00的数据,11:01分到来。

但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。


参考回答:

窗口的触发逻辑就是这样的,必须watermark达到了窗口结束时间才会触发,可能10-11点的窗口中的数据最大只有10:59呢


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372131


问题五:Flink SQL复杂JSON解析


像如下这种JSON输入,

{

"id": 1,

"many_names": [

{"name": "foo"},

{"name": "bar"}

]

}

输出表两行 id 1, name foo | id 1, name bar

最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?

还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。

来自 Outlookhttp://aka.ms/weboutlook


参考回答:

我理解最佳实践是第一种,先读出来array,再用table function展开成多行。 实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins 


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372126

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
846 43
|
3月前
|
SQL 数据可视化 关系型数据库
MCP与PolarDB集成技术分析:降低SQL门槛与简化数据可视化流程的机制解析
阿里云PolarDB与MCP协议融合,打造“自然语言即分析”的新范式。通过云原生数据库与标准化AI接口协同,实现零代码、分钟级从数据到可视化洞察,打破技术壁垒,提升分析效率99%,推动企业数据能力普惠化。
317 3
|
3月前
|
JSON 缓存 自然语言处理
多语言实时数据微店商品详情API:技术实现与JSON数据解析指南
通过以上技术实现与解析指南,开发者可高效构建支持多语言的实时商品详情系统,满足全球化电商场景需求。
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
308 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
存储 JSON 关系型数据库
【干货满满】解密 API 数据解析:从 JSON 到数据库存储的完整流程
本文详解电商API开发中JSON数据解析与数据库存储的全流程,涵盖数据提取、清洗、转换及优化策略,结合Python实战代码与主流数据库方案,助开发者构建高效、可靠的数据处理管道。
|
4月前
|
JSON 算法 API
淘宝商品评论API接口核心解析,json数据返回
淘宝商品评论API是淘宝开放平台提供的数据服务接口,允许开发者通过编程方式获取指定商品的用户评价数据,包括文字、图片、视频评论及评分等。其核心价值在于:
|
2月前
|
JSON Java Go
【GoGin】(2)数据解析和绑定:结构体分析,包括JSON解析、form解析、URL解析,区分绑定的Bind方法
bind或bindXXX函数(后文中我们统一都叫bind函数)的作用就是将,以方便后续业务逻辑的处理。
284 3
|
2月前
|
XML JSON 数据处理
超越JSON:Python结构化数据处理模块全解析
本文深入解析Python中12个核心数据处理模块,涵盖csv、pandas、pickle、shelve、struct、configparser、xml、numpy、array、sqlite3和msgpack,覆盖表格处理、序列化、配置管理、科学计算等六大场景,结合真实案例与决策树,助你高效应对各类数据挑战。(238字)
213 0
|
6月前
|
JSON 定位技术 PHP
PHP技巧:解析JSON及提取数据
这就是在PHP世界里探索JSON数据的艺术。这场狩猎不仅仅是为了获得数据,而是一种透彻理解数据结构的行动,让数据在你的编码海洋中畅游。通过这次冒险,你已经掌握了打开数据宝箱的钥匙。紧握它,让你在编程世界中随心所欲地航行。
236 67
|
8月前
|
SQL 安全 关系型数据库
SQL注入之万能密码:原理、实践与防御全解析
本文深入解析了“万能密码”攻击的运行机制及其危险性,通过实例展示了SQL注入的基本原理与变种形式。文章还提供了企业级防御方案,包括参数化查询、输入验证、权限控制及WAF规则配置等深度防御策略。同时,探讨了二阶注入和布尔盲注等新型攻击方式,并给出开发者自查清单。最后强调安全防护需持续改进,无绝对安全,建议使用成熟ORM框架并定期审计。技术内容仅供学习参考,严禁非法用途。
1251 0

热门文章

最新文章

相关产品

  • 实时计算 Flink版