使用flink-sql写入hbase sink时报错: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表 kafka source表与hbase 维表left join后的结果insert到hbase sink表中: sql如下: create table user_click_source( id
bigint, name
varchar, kafka_partition
int, event_time
bigint, write_time
bigint, snapshot_time
bigint, max_snapshot_time
bigint, catalog_id
int, device_id
int, user_id
int, proc_time
timestamp(3) PRIMARY KEY (id) NOT ENFORCED )with( 'connector.type' = 'kafka', …… ) ; create table dim_user( rowkey
varchar, cf ROW< id
int, name
varchar, kafka_partition
int, event_time
bigint, write_time
bigint, snapshot_time
bigint, max_snapshot_time
bigint (图) ts bigint )with( 'connector.type'='hbase', …… ) ;
create table dim_device( rowkey
varchar, cf ROW< id
int, name
varchar, kafka_partition
int, event_time
bigint, write_time
bigint, snapshot_time
bigint, max_snapshot_time
)with( 'connector.type'='hbase', …… ) ;
create table dim_catalog( rowkey
varchar, cf ROW< id
int, name
varchar, kafka_partition
int, event_time
bigint, write_time
bigint, snapshot_time
bigint, max_snapshot_time
bigint )with( 'connector.type'='hbase', …… ) ; create table hbase_full_user_click_case1_sink( rowkey
bigint, cf ROW< click_id
bigint, click_name
varchar, click_partition
int, click_event_time
bigint, click_write_time
bigint, click_snapshot_time
bigint, click_max_snapshot_time
bigint, catalog_id
int, catalog_name
varchar, catalog_partition
int, catalog_event_time
bigint, catalog_write_time
bigint, catalog_snapshot_time
bigint, catalog_max_snapshot_time
bigint, device_id
int, device_name
varchar, device_partition
int, device_event_time
bigint, device_write_time
bigint, device_snapshot_time
bigint, device_max_snapshot_time
bigint, user_id
int, user_name
varchar, user_partition
int, user_event_time
bigint, user_write_time
bigint, user_snapshot_time
bigint, user_max_snapshot_time
bigint PRIMARY KEY (rowkey) NOT ENFORCED )with( 'connector.type'='hbase', …… ) ; insert into hbase_full_user_click_case1_sink select click_id
, ROW( click_id
, click_name
, click_partition
, click_event_time
, click_write_time
, click_snapshot_time
, click_max_snapshot_time
, catalog_id
, catalog_name
, catalog_partition
, catalog_event_time
, catalog_write_time
, catalog_snapshot_time
, catalog_max_snapshot_time
, device_id
, device_name
, device_partition
, device_event_time
, device_write_time
, device_snapshot_time
, device_max_snapshot_time
, user_id
, user_name
, user_partition
, user_event_time
, user_write_time
, user_snapshot_time
, user_max_snapshot_time
) from (select click.id as click_id
, click.name as click_name
, click.kafka_partition as click_partition
, click.event_time as click_event_time
, click.write_time as click_write_time
, click.snapshot_time as click_snapshot_time
, click.max_snapshot_time as click_max_snapshot_time
, cat.cf.id as catalog_id
, cat.cf.name as catalog_name
, cat.cf.kafka_partition as catalog_partition
, cat.cf.event_time as catalog_event_time
, cat.cf.write_time as catalog_write_time
, cat.cf.snapshot_time as catalog_snapshot_time
, cat.cf.max_snapshot_time as catalog_max_snapshot_time
, dev.cf.id as device_id
, dev.cf.name as device_name
, dev.cf.kafka_partition as device_partition
, dev.cf.event_time as device_event_time
, dev.cf.write_time as device_write_time
, dev.cf.snapshot_time as device_snapshot_time
, dev.cf.max_snapshot_time as device_max_snapshot_time
, u.cf.id as user_id
, u.cf.name as user_name
, u.cf.kafka_partition as user_partition
, u.cf.event_time as user_event_time
, u.cf.write_time as user_write_time
, u.cf.snapshot_time as user_snapshot_time
, u.cf.max_snapshot_time as user_max_snapshot_time
from (select id, name
, kafka_partition
, event_time
, write_time
, snapshot_time
, max_snapshot_time
, cast(catalog_id as varchar) as catalog_key, cast(device_id as varchar) as device_key, cast(user_id as varchar) as user_key, catalog_id
, device_id
, user_id
, proc_time
, event_time
, FROM user_click_source GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND), id
, name
, kafka_partition
, event_time
, write_time
, snapshot_time
, max_snapshot_time
, catalog_id
, device_id
, user_id
, proc_time
) click
left join dim_catalog cat on click.catalog_key = cat.rowkey left join dim_device dev on click.device_key = dev.rowkey left join dim_user u on click.user_key = u.rowkey and click.event_time = u.ts ) t*来自志愿者整理的flink邮件归档
PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query 推导不出 PK 也不会报错了。 see more: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html*来自志愿者整理的flink