开发者社区> 问答> 正文

FlinkSQL cannot update pk column UID to expr

基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable 维表。 Flink 版本 1.12.2

场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出 场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key 场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED 则报错,主要报错信息: java.sql.BatchUpdateException: [30000, 2021060816420117201616500303151172567] cannot update pk column UID to expr

注:此处使用的MySQL 是阿里的 ADB,建表SQL如下 Create Table v2_dwd_root_game_uid_reg_log ( uid bigint NOT NULL DEFAULT '0' COMMENT '注册uid', user_name varchar NOT NULL DEFAULT '', // 此处省略其他字段 primary key (uid,platform,root_game_id) ) DISTRIBUTE BY HASH(uid) INDEX_ALL='Y' STORAGE_POLICY='HOT' COMMENT='按根游戏账号注册日志';

下面是场景3的SQL语句: // Kafka Source CREATE TABLE KafkaTable ( message STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'xxxxxxxxxxx', 'properties.bootstrap.servers' = 'xxxxxxxxxxx', 'properties.group.id' = 'xxxxxxxxxxxxx', 'scan.startup.mode' = 'group-offsets', 'format' = 'json' );

// 维表 CREATE TABLE DimTable ( game_id BIGINT, root_game_id BIGINT, main_game_id BIGINT, platform VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'xxxxxxxxxxxx', 'table-name' = 'v2_dim_game_id', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxxxxxxxx', 'password' = 'xxxxxxxx', 'lookup.cache.max-rows'='5000', 'lookup.cache.ttl' = '60s', 'lookup.max-retries'='3' );

// MySQL输出 CREATE TABLE sinktable ( uid BIGINT, root_game_id BIGINT, game_id BIGINT, platform VARCHAR, //....省略其它字段 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'xxxxxxxxxxxxxx', 'table-name' = 'v2_dwd_root_game_uid_reg_log', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxxxxxxxxxx', 'password' = 'xxxxxxxxxxxx', 'sink.buffer-flush.interval'='5s', 'sink.buffer-flush.max-rows' = '10' );

// 插入(关联维表) INSERT INTO sinktable select IF(IsInvalidValue(k.uid), 0 , CAST(k.uid AS BIGINT)) as uid, IF((k.game_id IS NULL), 0 , k.game_id) as game_id, d.platform as platform, d.root_game_id as root_game_id, // 省略其它字段 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message, 'uid,game_id(BIGINT),platform' )) as k LEFT JOIN DimTable as d ON k.game_id = d.game_id and k.platform = d.platform;*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-02 14:48:39 1138 0
1 条回答
写回答
取消 提交回答
  • 我把表移动到了普通 MYSQL,可以正常运行。 经过排查应该是 ADB 建表 SQL 中的 DISTRIBUTE BY HASH(uid) 所导致,该语法用于处理数据倾斜问题。 看起来似乎是联表 join 的时候要求定义主键,但是定义主键后会转换为 upsert 流,而 ADB 中定义了 DISTRIBUTE BY 与 upsert 冲突了,不知道是否这么理解*来自志愿者整理的FLINK邮件归档

    2021-12-02 16:18:24
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
对 2000 多亿条数据做一次 group by 需要多久? 立即下载
对2000多亿条数据做一次Group By 需要多久 立即下载
Get rid of traditional ETL, Move to Spark! 立即下载