开发者社区> 问答> 正文

FlinkSql 插入hbase group by 多个字段的时候发现报错

报错如下: **Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has full primary keys if it is updated. ** 代码如下:

tableEnv.connect(new kafka()
. property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"30.23.13.231:9092")
      .property("zookeeper.connect","....:2181"))
      .withFormat(new Json())
      .withSchema(new Schema().field("componentType",Types.STRING)
                              .field("endDate",Types.STRING)
                              .field("envName",Types.STRING)
                              .field("resultId",Types.STRING)
                              .field("spendTime",Types.INT)
                              .field("returnDataNum",Types.INT)
                              .field("startDate",Types.STRING)
                              .field("tableName",Types.STRING)
                              .field("tenantName",Types.STRING))
      .inAppendMode()
      .createTemporaryTable("MyTable")

    val hbaseDDL :String =
      """
        |Create table flink_log1 (
        |rowkey string,
        |cf ROW(f1 BIGINT,f2 BIGINT,f3 INT,f4 INT,f5 BIGINT,tenantName VARCHAR)
        |) with(
        |   'connector.type' = 'hbase',
        |   'connector.version' = '1.4.3',
        |   'connector.table-name' = 'flink_log1',
        |   'connector.zookeeper.quorum' = '....:2181,....:2181',
        |   'connector.zookeeper.znode.parent' = '/hbase',
        |   'connector.write.buffer-flush.max-size' = '10mb',
        |   'connector.write.buffer-flush.max-rows' = '1000'
        |)
      """.stripMargin
    tableEnv.sqlUpdate(hbaseDDL)
    val sql =
        "select CONCAT_WS('_',tenantName,tenantName) as tm_tenantName, " +
        "count(tenantName) as f1 ," +
        "count(case when resultId =  '2' then resultId else '' end) as f2 ,"+
        "avg(spendTime) as f3 ,"+
        "sum(returnDataNum) as f4 ,"+
        "count(case when resultId =  '1' then tenantName else '' end) as f5 ,"+
        "tenantName "+
        "from MyTable where substring(endDate,1,10)='2020-06-28' " +
        "group by CONCAT_WS('_',tenantName,tenantName),tenantName"
    val table: Table = tableEnv.sqlQuery(sql)
    tableEnv.createTemporaryView("tmp",table)
    tableEnv.sqlUpdate("insert into flink_log1 " +
      "select tm_tenantName,ROW(f1,f2,f3,f4,f5,tenantName) as cf from tmp ")
    streamEnv.execute("my insert hbase sql")

展开
收起
游客bl5sp3cdp4j7q 2020-06-29 14:51:17 1630 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
大数据时代的存储 ——HBase的实践与探索 立即下载
Hbase在滴滴出行的应用场景和最佳实践 立即下载
阿里云HBase主备双活 立即下载