开发者社区> 问答> 正文

HBase Sink报错:UpsertStreamTableSink requires that

使用flink-sql写入hbase sink时报错: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表 kafka source表与hbase 维表left join后的结果insert到hbase sink表中: sql如下: create table user_click_source( id bigint, name varchar, kafka_partition int, event_time bigint, write_time bigint, snapshot_time bigint, max_snapshot_time bigint, catalog_id int, device_id int, user_id int, proc_time timestamp(3) PRIMARY KEY (id) NOT ENFORCED )with( 'connector.type' = 'kafka', …… ) ; create table dim_user( rowkey varchar, cf ROW< id int, name varchar, kafka_partition int, event_time bigint, write_time bigint, snapshot_time bigint, max_snapshot_time bigint (图) ts bigint )with( 'connector.type'='hbase', …… ) ;

create table dim_device( rowkey varchar, cf ROW< id int, name varchar, kafka_partition int, event_time bigint, write_time bigint, snapshot_time bigint, max_snapshot_time bigint

)with( 'connector.type'='hbase', …… ) ;

create table dim_catalog( rowkey varchar, cf ROW< id int, name varchar, kafka_partition int, event_time bigint, write_time bigint, snapshot_time bigint, max_snapshot_time bigint )with( 'connector.type'='hbase', …… ) ; create table hbase_full_user_click_case1_sink( rowkey bigint, cf ROW< click_id bigint, click_name varchar, click_partition int, click_event_time bigint, click_write_time bigint, click_snapshot_time bigint, click_max_snapshot_time bigint, catalog_id int, catalog_name varchar, catalog_partition int, catalog_event_time bigint, catalog_write_time bigint, catalog_snapshot_time bigint, catalog_max_snapshot_time bigint, device_id int, device_name varchar, device_partition int, device_event_time bigint, device_write_time bigint, device_snapshot_time bigint, device_max_snapshot_time bigint, user_id int, user_name varchar, user_partition int, user_event_time bigint, user_write_time bigint, user_snapshot_time bigint, user_max_snapshot_time bigint PRIMARY KEY (rowkey) NOT ENFORCED )with( 'connector.type'='hbase', …… ) ; insert into hbase_full_user_click_case1_sink select click_id, ROW( click_id, click_name, click_partition, click_event_time, click_write_time, click_snapshot_time, click_max_snapshot_time, catalog_id, catalog_name, catalog_partition, catalog_event_time, catalog_write_time, catalog_snapshot_time, catalog_max_snapshot_time, device_id, device_name, device_partition, device_event_time, device_write_time, device_snapshot_time, device_max_snapshot_time, user_id, user_name, user_partition, user_event_time, user_write_time, user_snapshot_time, user_max_snapshot_time ) from (select click.id as click_id, click.name as click_name, click.kafka_partition as click_partition, click.event_time as click_event_time, click.write_time as click_write_time, click.snapshot_time as click_snapshot_time, click.max_snapshot_time as click_max_snapshot_time, cat.cf.id as catalog_id, cat.cf.name as catalog_name, cat.cf.kafka_partition as catalog_partition, cat.cf.event_time as catalog_event_time, cat.cf.write_time as catalog_write_time, cat.cf.snapshot_time as catalog_snapshot_time, cat.cf.max_snapshot_time as catalog_max_snapshot_time, dev.cf.id as device_id, dev.cf.name as device_name, dev.cf.kafka_partition as device_partition, dev.cf.event_time as device_event_time, dev.cf.write_time as device_write_time, dev.cf.snapshot_time as device_snapshot_time, dev.cf.max_snapshot_time as device_max_snapshot_time, u.cf.id as user_id, u.cf.name as user_name, u.cf.kafka_partition as user_partition, u.cf.event_time as user_event_time, u.cf.write_time as user_write_time, u.cf.snapshot_time as user_snapshot_time, u.cf.max_snapshot_time as user_max_snapshot_time

from (select id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, cast(catalog_id as varchar) as catalog_key, cast(device_id as varchar) as device_key, cast(user_id as varchar) as user_key, catalog_id, device_id, user_id, proc_time, event_time, FROM user_click_source GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND), id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, catalog_id, device_id, user_id, proc_time) click

left join dim_catalog cat on click.catalog_key = cat.rowkey left join dim_device dev on click.device_key = dev.rowkey left join dim_user u on click.user_key = u.rowkey and click.event_time = u.ts ) t*来自志愿者整理的flink邮件归档

展开
收起
游客sadna6pkvqnz6 2021-12-07 17:37:07 1473 0
1 条回答
写回答
取消 提交回答
  • PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query 推导不出 PK 也不会报错了。 see more: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html*来自志愿者整理的flink

    2021-12-07 21:38:35
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
大数据时代的存储 ——HBase的实践与探索 立即下载
Hbase在滴滴出行的应用场景和最佳实践 立即下载
阿里云HBase主备双活 立即下载