请问下,Flink CDC中使用flink sql 的当时做同步没有问题,但是使用流api进行同步的时候出现 Caused by: com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool$PoolInitializationException: Failed to initialize pool: Access denied for user?这个异常怎么解决?public static void main(String[] args) throws Exception { // 解决mysql decimal 类型解析后为字符串的问题 需要结合pom debezium的版本 Properties debeziumProp = new Properties(); debeziumProp.put("decimal.handling.mode", "string"); MySqlSource mySqlSource = MySqlSource.builder() .hostname("192.168.2.253") .port(3306) // 从binlog最新开始同步 .startupOptions(StartupOptions.latest()) // 开启支持新增表 .scanNewlyAddedTableEnabled(true) // set captured database .databaseList("pig_contract") // set captured table,数据库名必须写 .tableList("pig_contract.product_view") .username("canal") .password("Canal_3gBa185KUSgSn8V") //.serverTimeZone("Asia/Shanghai") // 这是东八区 mysql注意一定要对应设置 set time_zone='+8:00'; .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .includeSchemaChanges(true) // 表变更后得到通知 .debeziumProperties(debeziumProp) .build();
// 启动一个webUI
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 28081);
configuration.setString(RestOptions.ADDRESS, "192.168.2.252");
// 第一次读取需要注释此行,后续增加表时,开启此行,
// flink-ck后 ‘fd5f75bfb342fc101f9abd6e84482a92/chk-12404’换成存储路径下对应文件夹即可,实现旧表增量读取,新表全量读取
// configuration.setString("execution.savepoint.path", "file:///tmp/flink-ck/fd5f75bfb342fc101f9abd6e84482a92/chk-12404");
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment(configuration);
// enable checkpoint
//env.enableCheckpointing(5000);
// 设置本地同步保存位置 todo 暂时也没明白为什么 会存在F 盘对应的tmp目录下,而不是项目启动环境下的;
DataStream<String> inputDataStream = env.fromSource(mySqlSource,
WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(1);
inputDataStream.sinkTo(new Elasticsearch6SinkBuilder<String>()
.setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
.setHosts(new HttpHost("192.168.2.252", 9200, "http"))
.setConnectionUsername("esuser")
.setConnectionPassword("sycs@2020")
.setEmitter(
(element, context, indexer) ->
indexer.add(createIndexRequest(element)))
.build())
.name("MysqlDemoSink");
env.execute("test-master");
}
private static IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("product_view_index_stream")
.type("_doc")
.id(element)
.source(json);
} 没有问题
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。