问题一:flink 1.11 es未定义pk的sink问题
根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: 不确定是我配置使用的方式不对,还是确实存在bug。。
CREATE TABLE ES6_SENSORDATA_OUTPUT ( event varchar, user_id varchar, distinct_id varchar, _date varchar, _event_time varchar, recv_time varchar, _browser_version varchar, path_name varchar, _search varchar, event_type varchar, _current_project varchar, message varchar, stack varchar, component_stack varchar, _screen_width varchar, _screen_height varchar ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<ES_YUNTU.SERVERS>', 'index' = 'flink_sensordata_target_event', 'document-type' = 'default', 'document-id.key-delimiter' = '$', 'sink.bulk-flush.interval' = '1000', 'failure-handler' = 'fail', 'format' = 'json' )
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling
参考回答:
如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372256
问题二:单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及s
请问下,有没有大佬做过类似的事情?
另外,flink side out功能,可以将单流分成多流,但是不是分成多流后,但两条流sink的时候,是不是没法保证sink时候的时序?
参考回答:
你可以先用 map 再用 addSink,这样他们的调用被 chain 在一起,可以达到先写入 mysql ,再写入 kafka 的目的。
datastream.map(new MySQLSinkMapFunction()).addSink(new
FlinkKafkaProducer()).
也就是将 mysql sink 伪装成了一个 MapFunction,里面先做了 写 mysql 的动作,写成功后再将数据输出到下游。
另外,如果要在 SQL 中解决这个需求的话,会比较麻烦,因为标准语法中没有这么个语法支持这个功能。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372259
问题三:flink使用debezium-json format报错
log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. Exception in thread "main" org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, ddd]], fields=[id, age]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)
参考回答:
这是 changgelog 里的一个bug[1], 在1.11.1和master上已经修复,1.11.1社区已经在准备中了。 [1] https://issues.apache.org/jira/browse/FLINK-18461 https://issues.apache.org/jira/browse/FLINK-18461
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372264
问题四:Flink es7 connector认证问题
我们生产环境的es7是有用户名密码认证的,使用如下代码启动后会报,这段代码调用了es rest client api,单独使用是没问题的,不过放到 flink 里就报错了 org.elasticsearch.client.ResponseException: method [HEAD], host [xxx], URI [/], status line [HTTP/1.1 401 Unauthorized] ParameterTool pt = ParameterTool.fromArgs(args); String confFile = pt.get("confFile"); pt = ParameterTool.fromPropertiesFile(new File(confFile)); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(pt.get("es.user.name"), pt.get("es.user.password")));
esSinkBuilder.setRestClientFactory( (RestClientBuilder restClientBuilder) -> restClientBuilder .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setSocketTimeout(180000) .setConnectionRequestTimeout(10000) ) .setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证 return httpClientBuilder.setDefaultCredentialsProvider(provider); } ) );
参考回答:
请问您有检查过pt.get("es.user.name"),
pt.get("es.user.password")这两个参数读出来是否都是正确的
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372267
问题五:flink sql 侧输出
大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql
api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?
参考回答:
Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据,
你可以给你的作业配上:
table.exec.emit.late-fire.enabled = true
table.exec.emit.late-fire.delay = 1min
同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window allowLateness 。
具体可以看下 org.apache.flink.table.planner.plan.utils.WindowEmitStrategy 这个类。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372272