最近在使用flink cdc的时候,遇到需要用streaming mode来对收取的binlog日志进行后续相对复杂处理的需求。因为生产环境不允许mysql的RELOAD权限,之前在利用sql模式使用cdc的时候应用了 debezium.snap.shot.locking.mode = none的参数。
但在使用streaming mode的时候遇到了困难,不知道应该如何配置可以支持在没有reload权限的时候使用flink cdc。目前使用的配置方法是 Properties properties = new Properties(); properties.setProperty("debezium.snapshot.locking.mode", "none"); SourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("localhost") .port(port) .databaseList("database") .tableList("database.test") .username(“user) .password("password") .debeziumProperties(properties) .deserializer(new StringDebeziumDeserializationSchema()) .build();
但.debeziumProperties(properties)好像并没有和sql模式一样生效。请问我应该怎么配置sourceFunction,或者Streaming模式有没有提供什么方法解决这个问题呢?*来自志愿者整理的flink邮件归档
试试这种写法,在datastreaming API中的,debezium的相关参数应该是不需要加debezium前缀的 public static Properties debeziumProperties(){ Properties properties = new Properties(); properties.setProperty(“xxxx”,”xxxx"); return properties; }来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。