开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问下,Flink CDC中使用flink sql 的当时做同步没有问题,但是使用流api进行同?

请问下,Flink CDC中使用flink sql 的当时做同步没有问题,但是使用流api进行同步的时候出现 Caused by: com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool$PoolInitializationException: Failed to initialize pool: Access denied for user?这个异常怎么解决?public static void main(String[] args) throws Exception { // 解决mysql decimal 类型解析后为字符串的问题 需要结合pom debezium的版本 Properties debeziumProp = new Properties(); debeziumProp.put("decimal.handling.mode", "string"); MySqlSource mySqlSource = MySqlSource.builder() .hostname("192.168.2.253") .port(3306) // 从binlog最新开始同步 .startupOptions(StartupOptions.latest()) // 开启支持新增表 .scanNewlyAddedTableEnabled(true) // set captured database .databaseList("pig_contract") // set captured table,数据库名必须写 .tableList("pig_contract.product_view") .username("canal") .password("Canal_3gBa185KUSgSn8V") //.serverTimeZone("Asia/Shanghai") // 这是东八区 mysql注意一定要对应设置 set time_zone='+8:00'; .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .includeSchemaChanges(true) // 表变更后得到通知 .debeziumProperties(debeziumProp) .build();

    // 启动一个webUI
    Configuration configuration = new Configuration();
    configuration.setInteger(RestOptions.PORT, 28081);
    configuration.setString(RestOptions.ADDRESS, "192.168.2.252");
    // 第一次读取需要注释此行,后续增加表时,开启此行,
    // flink-ck后 ‘fd5f75bfb342fc101f9abd6e84482a92/chk-12404’换成存储路径下对应文件夹即可,实现旧表增量读取,新表全量读取

// configuration.setString("execution.savepoint.path", "file:///tmp/flink-ck/fd5f75bfb342fc101f9abd6e84482a92/chk-12404");

    StreamExecutionEnvironment env = StreamExecutionEnvironment
            .getExecutionEnvironment(configuration);

    // enable checkpoint
    //env.enableCheckpointing(5000);
    // 设置本地同步保存位置  todo 暂时也没明白为什么 会存在F 盘对应的tmp目录下,而不是项目启动环境下的;
    DataStream<String> inputDataStream = env.fromSource(mySqlSource,
                    WatermarkStrategy.noWatermarks(), "MySQL Source")
            .setParallelism(1);
    inputDataStream.sinkTo(new Elasticsearch6SinkBuilder<String>()
                    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
                    .setHosts(new HttpHost("192.168.2.252", 9200, "http"))
                    .setConnectionUsername("esuser")
                    .setConnectionPassword("sycs@2020")
                    .setEmitter(
                            (element, context, indexer) ->
                                    indexer.add(createIndexRequest(element)))
                    .build())
            .name("MysqlDemoSink");
    env.execute("test-master");


}
private static IndexRequest createIndexRequest(String element) {
    Map<String, Object> json = new HashMap<>();
    json.put("data", element);
    return Requests.indexRequest()
            .index("product_view_index_stream")
            .type("_doc")
            .id(element)
            .source(json);
} 没有问题

展开
收起
真的很搞笑 2023-06-18 15:33:32 106 0
1 条回答
写回答
取消 提交回答
  • 这不是权限验证失败吗?看看是不是用户名和密码是不是有空格,此回答整理自钉群“Flink CDC 社区”

    2023-06-18 15:43:12
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Spring Boot2.0实战Redis分布式缓存 立即下载
    CUDA MATH API 立即下载
    API PLAYBOOK 立即下载