开发者社区> 问答> 正文

flink写mysql问题

各位好,我在使用flink写mysql的时候,发现sink是使用了JdbcDynamicTableSink这个类,但是这个类没有实现checkpoint相关的接口,我想请问一下,1.在任务做检查点的时候,内存中缓存的一批数据如何flush到mysql中的呢

2.我的任务写mysql的qps只能到几百,反压严重,算子使用sum计算,高峰时候1000条/s,但是做检查点需要好几分钟才能完成,请问这里有什么排查方法吗*来自志愿者整理的flink邮件归档

展开
收起
彗星halation 2021-12-02 17:49:01 667 0
1 条回答
写回答
取消 提交回答
  • 在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢

    JdbcDynamicTableSink 不包含具体 sink function

    的实现,具体的实现位于 GenericJdbcSinkFunction。该类的 snapshotState 即为 snapshot 的实现。不同的

    jdbc 数据库以及不同的 sql 之间攒 batch 的行为略有不同,具体见 JdbcBatchStatementExecutor 及其子类。

    写 mysql 的 qps 只能到几百,反压严重

    jdbc connector 有一些 with 参数用来控制 flush 的时间。例如 sink.buffer-flush.interval

    就会控制攒了多少数据就 flush。它的默认值是 100,因此对于流量比较大的作业需要相应调大。其他相关参数见 [1]。

    算子使用 sum 计算,高峰时候 1000条/s,但是做检查点需要好几分钟才能完成

    checkpoint 阻塞有大量原因。从邮件中的描述来看最有可能是因为 sink 反压导致上游 checkpoint

    也被反压。排除该原因后还可以观察 checkpoint 大小是否过大,以及相应节点 gc 时间是否过长。这个要结合具体的 sql 分析。

    [1]

    https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:59:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像