Flink SQL问题之复杂JSON解析如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:ddl es 报错


源码如下:

CREATE TABLE buy_cnt_per_hour (

 hour_of_day BIGINT,

 buy_cnt BIGINT

) WITH (

 'connector.type' = 'elasticsearch',

 'connector.version' = '6',

 'connector.hosts' = 'http://localhost:9200',

 'connector.index' = 'buy_cnt_per_hour',

 'connector.document-type' = 'user_behavior',

 'connector.bulk-flush.max-actions' = '1',

 'format.type' = 'json',

 'update-mode' = 'append'

)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2、设置运行环境

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

streamEnv.setParallelism(1);

String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT "

  • ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6',"
  • "'connector.hosts' = 'http://localhost:9200','connector.index' = 'buy_cnt_per_hour',"
  • "'connector.document-type' = 'user_behavior',"
  • "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json',"
  • "'update-mode' = 'append' )";

tableEnv.sqlUpdate(sinkDDL);

Table table = tableEnv.sqlQuery("select * from test_es ");

tableEnv.toRetractStream(table, Row.class).print();

streamEnv.execute("");

}

}

具体error

The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json'The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append


参考回答:

这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作

真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。

而tableEnv.toRetractStream(table, Row.class).print();

这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372141


问题二:DataStream统计uv问题


大家好!

     想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:

     1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,

          这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。

     2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。

DataStream<UvPer10Min&gt; uvPer10MinDataStream = userBehaviorSource

.windowAll(TumblingProcessingTimeWindows.of(Time.days(1L)))

.trigger(CountTrigger.of(1L))

.evictor(CountEvictor.of(0L, true))

.process(new ProcessAllWindowFunction<UserBehavior, UvPer10Min, TimeWindow&gt;() {

private transient MapState<String, String&gt; userIdState;

private transient ValueState<Long&gt; uvCountState;

&nbsp; &nbsp;


参考回答:

我建议你用ContinuousEventTimeTrigger,可以在窗口范围内,连续触发。

你这个countTrigger,促发次数太多了,而且你后面是processWindowFunction,导致计算压力比较大。

建议你用aggregateWindowFuntion


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372140


问题三:Flink SQL如何将多个表的查询结果(列不同)聚合成一张表


select a.table_tmp1.r1 / a.table_tmp2.r2

这个是对同一行的数据进行操作,所以你需要先对table_tmp1和table_tmp2做一个join,将两个表的数据根据条件合并成一张表。

zilong xiao <acidzz163@gmail.com> 于2020年7月8日周三 下午8:55写道:

> 列如下面这样,需要查询table1 & table2,分别查询不同的字段

> 在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~

> select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from

> (

> (SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS

> table_tmp2,

> )as a


参考回答:

你得有个join条件连接两张表的


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372135


问题四:一个source多个sink的同步问题


是1个小时才到来。10:00- 11:00的数据,11:01分到来。

但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。


参考回答:

窗口的触发逻辑就是这样的,必须watermark达到了窗口结束时间才会触发,可能10-11点的窗口中的数据最大只有10:59呢


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372131


问题五:Flink SQL复杂JSON解析


像如下这种JSON输入,

{

"id": 1,

"many_names": [

{"name": "foo"},

{"name": "bar"}

]

}

输出表两行 id 1, name foo | id 1, name bar

最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?

还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。

来自 Outlookhttp://aka.ms/weboutlook


参考回答:

我理解最佳实践是第一种,先读出来array,再用table function展开成多行。 实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins 


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372126

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 监控 数据库
SQL语句是否都需要解析及其相关技巧和方法
在数据库管理中,SQL(结构化查询语言)语句的使用无处不在,它们负责数据的查询、插入、更新和删除等操作
|
27天前
|
SQL 监控 安全
员工上网行为监控软件:SQL 在数据查询监控中的应用解析
在数字化办公环境中,员工上网行为监控软件对企业网络安全和管理至关重要。通过 SQL 查询和分析数据库中的数据,企业可以精准了解员工的上网行为,包括基础查询、复杂条件查询、数据统计与分析等,从而提高网络管理和安全防护的效率。
28 0
|
2月前
|
SQL 存储 数据库
SQL语句是否都需要解析及其相关技巧与方法
在数据库管理系统中,SQL(Structured Query Language)语句作为与数据库交互的桥梁,其执行过程往往涉及到一个或多个解析阶段
|
2月前
|
SQL 数据可视化 BI
SQL语句及查询结果解析:技巧与方法
在数据库管理和数据分析中,SQL语句扮演着至关重要的角色
|
2月前
|
SQL 监控 关系型数据库
SQL错误代码1303解析与处理方法
在SQL编程和数据库管理中,遇到错误代码是常有的事,其中错误代码1303在不同数据库系统中可能代表不同的含义
|
2月前
|
SQL 存储 关系型数据库
SQL默认索引是什么:深入解析与技巧
在SQL数据库中,索引是一种用于提高查询性能的重要数据结构
|
2月前
|
SQL 开发框架 .NET
ASP.NET连接SQL数据库:实现过程与关键细节解析an3.021-6232.com
随着互联网技术的快速发展,ASP.NET作为一种广泛使用的服务器端开发技术,其与数据库的交互操作成为了应用开发中的重要环节。本文将详细介绍在ASP.NET中如何连接SQL数据库,包括连接的基本概念、实现步骤、关键代码示例以及常见问题的解决方案。由于篇幅限制,本文不能保证达到完整的2000字,但会确保
|
3月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
5月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
126 13
|
5月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。

相关产品

  • 实时计算 Flink版