基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable 维表。 Flink 版本 1.12.2
场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出 场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key 场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED 则报错,主要报错信息: java.sql.BatchUpdateException: [30000, 2021060816420117201616500303151172567] cannot update pk column UID to expr
注:此处使用的MySQL 是阿里的 ADB,建表SQL如下 Create Table v2_dwd_root_game_uid_reg_log
( uid
bigint NOT NULL DEFAULT '0' COMMENT '注册uid', user_name
varchar NOT NULL DEFAULT '', // 此处省略其他字段 primary key (uid
,platform
,root_game_id
) ) DISTRIBUTE BY HASH(uid
) INDEX_ALL='Y' STORAGE_POLICY='HOT' COMMENT='按根游戏账号注册日志';
下面是场景3的SQL语句: // Kafka Source CREATE TABLE KafkaTable ( message STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'xxxxxxxxxxx', 'properties.bootstrap.servers' = 'xxxxxxxxxxx', 'properties.group.id' = 'xxxxxxxxxxxxx', 'scan.startup.mode' = 'group-offsets', 'format' = 'json' );
// 维表 CREATE TABLE DimTable ( game_id BIGINT, root_game_id BIGINT, main_game_id BIGINT, platform VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'xxxxxxxxxxxx', 'table-name' = 'v2_dim_game_id', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxxxxxxxx', 'password' = 'xxxxxxxx', 'lookup.cache.max-rows'='5000', 'lookup.cache.ttl' = '60s', 'lookup.max-retries'='3' );
// MySQL输出 CREATE TABLE sinktable ( uid BIGINT, root_game_id BIGINT, game_id BIGINT, platform VARCHAR, //....省略其它字段 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'xxxxxxxxxxxxxx', 'table-name' = 'v2_dwd_root_game_uid_reg_log', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxxxxxxxxxx', 'password' = 'xxxxxxxxxxxx', 'sink.buffer-flush.interval'='5s', 'sink.buffer-flush.max-rows' = '10' );
// 插入(关联维表) INSERT INTO sinktable select IF(IsInvalidValue(k.uid), 0 , CAST(k.uid AS BIGINT)) as uid, IF((k.game_id IS NULL), 0 , k.game_id) as game_id, d.platform as platform, d.root_game_id as root_game_id, // 省略其它字段 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message, 'uid,game_id(BIGINT),platform' )) as k LEFT JOIN DimTable as d ON k.game_id = d.game_id and k.platform = d.platform;*来自志愿者整理的flink邮件归档
我把表移动到了普通 MYSQL,可以正常运行。 经过排查应该是 ADB 建表 SQL 中的 DISTRIBUTE BY HASH(uid
) 所导致,该语法用于处理数据倾斜问题。 看起来似乎是联表 join 的时候要求定义主键,但是定义主键后会转换为 upsert 流,而 ADB 中定义了 DISTRIBUTE BY 与 upsert 冲突了,不知道是否这么理解*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。