开发者社区> 问答> 正文

Flink1.12.1 读取kafka的数据写入到clickhouse如何支持upsert操作呢

您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下:

clickhouse建表语句如下:

CREATE TABLE test_local.tzling_tb3(

uuid String,

product String,

platform String,

batchId String,

id String,

account String,

customerId String,

reportName String,

dt String,

campaign String,

adGroup String,

generalField String,

currency String,

impressions String,

cost String,

clicks String,

conversions String,

createDateTime String,

createTime BIGINT,

key String,

pdate String

)engine = MergeTree PARTITION BY pdate order by createTime;

将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。

processData.addSink(new MSKUpsertClickHouseSink());

附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为: 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。

请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢?

从 Windows 版邮件发送*来自志愿者整理的flink邮件归档

展开
收起
彗星halation 2021-12-02 17:37:45 1012 0
1 条回答
写回答
取消 提交回答
  • 你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData

    是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:47:30
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
阿里云 ClickHouse 企业版技术白皮书 立即下载
ClickHouse在手淘流量分析应用实践Jason Xu 立即下载
云数据库clickhouse最佳实践 立即下载