问题一:ddl es 报错
源码如下:
CREATE TABLE buy_cnt_per_hour (
hour_of_day BIGINT,
buy_cnt BIGINT
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'buy_cnt_per_hour',
'connector.document-type' = 'user_behavior',
'connector.bulk-flush.max-actions' = '1',
'format.type' = 'json',
'update-mode' = 'append'
)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class ESTest {
public static void main(String[] args) throws Exception {
//2、设置运行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT "
- ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6',"
- "'connector.hosts' = 'http://localhost:9200','connector.index' = 'buy_cnt_per_hour',"
- "'connector.document-type' = 'user_behavior',"
- "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json',"
- "'update-mode' = 'append' )";
tableEnv.sqlUpdate(sinkDDL);
Table table = tableEnv.sqlQuery("select * from test_es ");
tableEnv.toRetractStream(table, Row.class).print();
streamEnv.execute("");
}
}
具体error
The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json'The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append
参考回答:
这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作
真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。
而tableEnv.toRetractStream(table, Row.class).print();
这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372141
问题二:DataStream统计uv问题
大家好!
想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:
1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,
这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。
2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。
DataStream<UvPer10Min> uvPer10MinDataStream = userBehaviorSource
.windowAll(TumblingProcessingTimeWindows.of(Time.days(1L)))
.trigger(CountTrigger.of(1L))
.evictor(CountEvictor.of(0L, true))
.process(new ProcessAllWindowFunction<UserBehavior, UvPer10Min, TimeWindow>() {
private transient MapState<String, String> userIdState;
private transient ValueState<Long> uvCountState;
参考回答:
我建议你用ContinuousEventTimeTrigger,可以在窗口范围内,连续触发。
你这个countTrigger,促发次数太多了,而且你后面是processWindowFunction,导致计算压力比较大。
建议你用aggregateWindowFuntion
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372140
问题三:Flink SQL如何将多个表的查询结果(列不同)聚合成一张表
select a.table_tmp1.r1 / a.table_tmp2.r2
这个是对同一行的数据进行操作,所以你需要先对table_tmp1和table_tmp2做一个join,将两个表的数据根据条件合并成一张表。
zilong xiao <acidzz163@gmail.com> 于2020年7月8日周三 下午8:55写道:
> 列如下面这样,需要查询table1 & table2,分别查询不同的字段
> 在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~
> select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from
> (
> (SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS
> table_tmp2,
> )as a
参考回答:
你得有个join条件连接两张表的
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372135
问题四:一个source多个sink的同步问题
是1个小时才到来。10:00- 11:00的数据,11:01分到来。
但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。
参考回答:
窗口的触发逻辑就是这样的,必须watermark达到了窗口结束时间才会触发,可能10-11点的窗口中的数据最大只有10:59呢
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372131
问题五:Flink SQL复杂JSON解析
像如下这种JSON输入,
{
"id": 1,
"many_names": [
{"name": "foo"},
{"name": "bar"}
]
}
输出表两行 id 1, name foo | id 1, name bar
最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?
还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。
来自 Outlookhttp://aka.ms/weboutlook
参考回答:
我理解最佳实践是第一种,先读出来array,再用table function展开成多行。 实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372126