问题一:请教大神们关于flink-sql中数据赋值问题
看了官网的示例,发现sql中传入的值都是固定的,我有一个场景是从kafka消息队列接收查询条件,然后通过flink-sql映射hbase表进行查询并写入结果表。我使用了将消息队列映射表再join数据表的方式,回想一下这种方式很不妥,有什么好的方法实现sql入参的动态查询呢?*来自志愿者整理的flink邮件归档
参考答案:
自定义 UDF 可以解决你的问题吗? 比如 接收 kakfa 的数据字段定义成 hbaseQuery,然后自定义 UDF 去根据 query 查询数据。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371605?spm=a2c6h.13066369.question.72.6ad26382lmSgJW
问题二:flink savepoint
本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
flink 版本1.10.1
执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 hdfs://hadoopnamenodeHA/flink/flink-savepoints
出现错误信息
org.apache.flink.util.FlinkException: Triggering a savepoint for the job a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)*来自志愿者整理的flink邮件归档
参考答案:
这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, 具体的原因需要看下 Jobmaster 的日志。 PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371606?spm=a2c6h.13066369.question.75.6ad26382XXH3Ol
问题三:关于cluster.evenly-spread-out-slots参数的底层原理
有没有人对cluster.evenly-spread-out-slots参数了解比较深入的给讲解下。
我主要想知道,设置这个参数为true之后。Flink是以一个什么样的规则去尽可能均衡分配的。
standalone集群模式下,每个机器性能相同,flink slot数量配置相同情况下。基于这种分配规则,有没有一种方法让Flink做到
完全均衡,而不是尽可能均衡?
此外,我说的“均衡”都特指算子级别的均衡。不要5机器一共5个slot,然后任务有5个算子,每个算子单并发并且通过不同的share
group各独占1个slot这种均衡。我指的是每个算子都均衡到机器(假设并发设置合理)。*来自志愿者整理的flink邮件归档
参考答案:
我说一下我看源码(1.11.2)之后的理解吧,不一定准确,仅供参考。
cluster.evenly-spread-out-slots 这个参数设置后会作用在两个地方:
- JobMaster 的 Scheduler 组件
- ResourceManager 的 SlotManager 组件
对于 JobMaster 中的 Scheduler,
它在给 execution vertex 分配 slot 是按拓扑排序的顺序依次进行的。
Scheduler 策略是会倾向于把 execution vertex 分配到它的上游节点所分配到的slot上,
因此在给某个具体 execution vertex 分配 slot 时都会计算出一个当前节点倾向于选择的TaskManager集合,
然后在可选的 slot 候选集中会根据三个维度来为某个slot打分,分别是:
- 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 ResourceID
是相同的(对于standalone模式可以不考虑该维度)
- 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 全限定域名 是相同的
- 候选slot所在的 TaskManager 目前的资源占用率
只有配置了 cluster.evenly-spread-out-slots 后,才会考虑第三个维度,否则仅会用前面两个维度进行打分。
打分之后会选择得分最高的 slot 分配给当前的 exection vertex。
需要注意的是这里的资源利用率只是根据某个 slot 所在的 TaskManager 中剩下多少个能够分配该 execution vertex 的
slot 计算出的,
(因为 Flink 要求同一 job vertex 的并行任务不能分配到同一 slot 中),能分配的越多,资源利用率越小,否则利用率越大。
而不是指实际的CPU内存等资源利用率。
对于 ResourceManager 中的 SlotManager 组件(这里说的都是 Standalone 模式下的
ResourceManager),
由于 JobMaster 的 slot 都是要向 resource manager 申请的。如果 JobMaster 需要新的 slot 了,会向
ResourceManager 的 SlotManager 组件申请。
如果没有配置 cluster.evenly-spread-out-slots 的话,SlotManager 从可用 slot 中随机返回一个。
如果配置了 cluster.evenly-spread-out-slots,SlotManager 会返回资源利用率最小的一个 slot。
这里的资源利用率计算方式是:看某个 slot 所在的 TaskManager 中有多少 slot 还没有被分配,空闲的越多,利用率越小,否则越大。
最后,你提问中说的均衡我没有太理解。某个算子的并发子任务是不会被分配到同一个slot中的,
但如果想把这些子任务均匀分配到不同机器上,这个当前的调度算法应该是无法保证的。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371608?spm=a2c6h.13066369.question.74.6ad26382bJV2or
问题四:flink 1.11 cdc: 如何将DataStream<RowData> 要如何转
目前有两个DataStream 的流,通过mapfunction, 转成DataStream 流,请问DataStream 怎么转成table,并使用flink sql进行操作。 (注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)
目前我的做法会报错:
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
DataStreamSource json1 // canal json的格式 DataStreamSource json2 // canal json的格式 ConnectedStreams<String, String> connect= caliber_cdc_json.connect(caliber_snapshot_json); //connect DataStream snapshot_cdc_stream = connect.flatMap( new SnapshotCdcCoRichFlatMapFunction() ); //做连接
//3, 注册表,将表数据,直接输出 Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream); fsTableEnv.createTemporaryView("test", snapshot_cdc_table);
String output = "CREATE TABLE test_mirror (\n" + "id
INT,\n" + "name
VARCHAR(255),\n" + "time
TIMESTAMP(3),\n" + "PRIMARY KEY(id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")";
//4, app logic String op = "INSERT into test_mirror SELECT * from test"; fsTableEnv.executeSql(output); fsTableEnv.executeSql(op);
但提交任务失败,错误信息: serializationSchema:root |-- id: INT NOT NULL |-- name: VARCHAR(255) |-- time: TIMESTAMP(3) |-- status: INT |-- CONSTRAINT PK_3386 PRIMARY KEY (id)
snapshot_cdc_table:UnnamedTable$0 +----------------+ | table name | +----------------+ | UnnamedTable$0 | | test | | test_mirror | +----------------+ 3 rows in set
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown Source) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) at com.qqmusic.quku.demo_app.StreamTableSql.main(StreamTableSql.java:126) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 9 more
请问是啥原因?需要怎么做?*来自志愿者整理的flink邮件归档
参考答案:
- 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。
- 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only stream,无法识别 cdc 流。这个功能规划在了1.13 版本中。
对于你的场景,有以下几种解决办法: 1. 如果你的流中只有插入和更新,没有删除。那么用 DataStream 先注册成一个 insert-only 的 Table,然后用 Flink SQL 的去重语法 [1] 保留 pk 下的最后一条记录即可。 2. 如果你的流中有删除,那么....你得自己开发一个 sql connector,把 cdc 抓取以及“mapfunction对流做了些顺序的限制”的逻辑实现在你的 source 中。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371610?spm=a2c6h.13066369.question.75.6ad26382iS6iUL
问题五:flink1.11的cdc功能对消息顺序性的处理
麻烦请教下各位大神,flink如何处理如下问题:
flink1.11引入cdc,可以解析canal和debezuim发送的CDC数据,其中canal一般是可以指定某些字段作为key进行hash分区发送到同一topic下的不同分区的。
如果源端短时间对pk值进行多次update,则有可能导致发往不同分区,从而无法保证顺序性。
假如
1.有源表和目标表:
create table test(
id int(10) primary key
)
2.源表的增量数据通过canal发往kafka,目标表接收kafka消息进行同步。
3.发往的topic下有三个partition:p0、p1、p2
4.源端和目标端都有一条记录id=1
此时对源端进行两次update:
update1:update test set id=2 where id=1;
update2: update test set id=3 wehre id=2;
假如两条消息都在同一批message中发往kafka,其中update1发送到p1,pudate2发送到p2,这两条消息的顺序性是无法保证的,假如update2先到达,则目标端最终结果为id=2,与源端结果id=3不一致。*来自志愿者整理的flink邮件归档
参考答案:
可以看下 Jark 的 《基于 Flink SQL CDC 的实时数据同步方案》文章 [1]. 其中在最后的 Q&A 中描述了 "首先需要 kafka 在分区中保证有序,同一个 key 的变更数据需要打入到同一个 kafka 的分区里面,这样 flink 读取的时候才能保证顺序。"
个人认为,需要 Update 的 key 可以更 canal 采集到 kakfa 的 hash key 一致,这样就保证了有序?
[1] https://mp.weixin.qq.com/s/QNJlacBUlkMT7ksKKSNa5Q*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371611?spm=a2c6h.13066369.question.78.6ad26382nJnm13