大家好, flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。
https://github.com/ververica/flink-cdc-connectors/blob/master/README.md
casel.chen <casel_chan@126.com> 于2021年4月20日周二 下午6:18写道:
> 目标是用flink作业实现类似canal server的功能 > > > CREATE TABLE binlog_table
( > > id
INT, > > name
STRING, > > sys_id
STRING, > > sequence
INT, > > filter
STRING, > > tag
STRING, > > remark
STRING, > > create_date
TIMESTAMP, > > update_date
TIMESTAMP, > > reserve
STRING, > > sys_name
STRING, > > metric_seq
INT, > > advanced_function
STRING, > > value_type
STRING, > > value_field
STRING, > > status
INT, > > syn_date
TIMESTAMP, > > confirmer
STRING, > > confirm_time
TIMESTAMP, > > index_explain
STRING, > > field_name
STRING, > > tag_values
STRING, > > PRIMARY KEY (id
) NOT ENFORCED > > ) WITH ( > > 'connector' = 'mysql-cdc', > > 'hostname' = '${mysql.hostname}', > > 'port' = '3306', > > 'username' = '${mysql.username}', > > 'password' = '${mysql.password}', > > 'database-name' = '${mysql.database}', > > 'table-name' = '${mysql.table}' > > ); > > > > > CREATE TABLE kafka_sink
( > > id
INT, > > name
STRING, > > sys_id
STRING, > > sequence
INT, > > filter
STRING, > > tag
STRING, > > remark
STRING, > > create_date
TIMESTAMP, > > update_date
TIMESTAMP, > > reserve
STRING, > > sys_name
STRING, > > metric_seq
INT, > > advanced_function
STRING, > > value_type
STRING, > > value_field
STRING, > > status
INT, > > syn_date
TIMESTAMP, > > confirmer
STRING, > > confirm_time
TIMESTAMP, > > index_explain
STRING, > > field_name
STRING, > > tag_values
STRING, > > PRIMARY KEY (id
) NOT ENFORCED > > ) WITH ( > > 'connector' = 'kafka', > > 'topic' = '${topic}', > > 'properties.bootstrap.servers' = '${bootstrap.servers}', > > 'format' = 'canal-json' > > ); > > > > > INSERT INTO kafka_sink
> > (SELECT * > > FROM binlog_table
); > > 出来的结果是这样: > > > { > "data": [ > { > "id": 3, > "name": "自动付款接口BuyETC金额", > "sys_id": "0184", > "sequence": 2, > "filter": "(a=1)", > "tag": "MerId(商户号)", > "remark": "O", > "create_date": "2020-11-02 15:01:31", > "update_date": "2021-04-07 09:23:59", > "reserve": "", > "sys_name": "NHL", > "metric_seq": 0, > "advanced_function": "", > "value_type": "sum", > "value_field": "value", > "status": 1, > "syn_date": "2021-01-28 19:31:36", > "confirmer": null, > "confirm_time": null, > "index_explain": "aa", > "field_name": null, > "tag_values": null > } > ], > "type": "INSERT" > } > 并不是标准的canal json格式。改用upsert-kafka connector试了也不行 > > > > CREATE TABLE kafka_sink
( id
INT, name
STRING, sys_id
STRING, > sequence
INT, filter
STRING, tag
STRING, remark
STRING, > create_date
TIMESTAMP, update_date
TIMESTAMP, reserve
STRING, > sys_name
STRING, metric_seq
INT, advanced_function
STRING, > value_type
STRING, value_field
STRING, status
INT, syn_date
> TIMESTAMP, confirmer
STRING, confirm_time
TIMESTAMP, index_explain
> STRING, field_name
STRING, tag_values
STRING, PRIMARY KEY (id
) NOT > ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}', > 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' = > 'json', > 'value.format' = 'json' ); > > > 出来的数据长这样 > > > > {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06 > 12:27:30","update_date":"2021-04-06 > 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07 > 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null} > > >*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。