Hi 社区。
Flink 1.12.1
现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
forword 的ETL没有作用。
insert into table_a select id,udf(a),b,c from table_b;
发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, temp_table]], fields=[id...])
Stage 3 : Operator
content : ChangelogNormalize(key=[id])
ship_strategy : HASH
Stage 4 : Operator
content : Calc(select=[...])
ship_strategy : FORWARD
Stage 5 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[id...])
ship_strategy : FORWARD
```*来自志愿者整理的flink邮件归档
对于 upsert-kafka 会默认加上 ChangelogNormalize
ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
forward。[1]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。