flink订阅kafka消息,同时sink到hbase和hive中, 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
query: streamTableEnv.executeSql( """ | |CREATE TABLE hbase_table ( | rowkey VARCHAR, | cf ROW(sex VARCHAR, age INT, created_time VARCHAR) |) WITH ( | 'connector.type' = 'hbase', | 'connector.version' = '2.1.0', | 'connector.table-name' = 'ods:user_hbase6', | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.zookeeper.znode.parent' = '/hbase', | 'connector.write.buffer-flush.max-size' = '1mb', | 'connector.write.buffer-flush.max-rows' = '1', | 'connector.write.buffer-flush.interval' = '0s' |) |""".stripMargin)
val statementSet = streamTableEnv.createStatementSet() val insertHbase = """ |insert into hbase_table |SELECT | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, | ROW(sex, age, created_time ) as cf |FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table) | |""".stripMargin
statementSet.addInsertSql(insertHbase)
val insertHive = """ | |INSERT INTO odsCatalog.ods.hive_table |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') |FROM kafka_table | |""".stripMargin statementSet.addInsertSql(insertHive)
statementSet.execute()
是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下: Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1
并且,按照官网文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
设置参数也不识别,报错: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
看了一下源码, org.apache.flink.table.descriptors.HBaseValidator public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase"; public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0"; public static final String CONNECTOR_TABLE_NAME = "connector.table-name"; public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum"; public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval"; 参数还是老参数*来自志愿者整理的flink邮件归档
(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。
(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1][1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。