我在尝试使用 kafka 数据流 join hive 维表时,代码启动之初,Flink 会加载一次 hive 中的维表,但是后续插入到 hive 表中的数据不能被 join 到。
查看官网和中文社区的资料,提示设置 lookup.join.cache.ttl 配置参数,我将这个参数尝试设置在 TableEnv.conf 中,和 Table hits 设置在表名后,都没有起作用。
请问有大佬实现过这个功能吗,或者有没有实现过的案例,万分感谢。
好吧,刚问完,尝试出来了。 hive 的表需要通过 flink 创建,在建表时指定 lookup.join.cache.ttl 参数。
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++ package Test.Flink
import org.apache.flink.connectors.hive.{HiveTableFactory, HiveTableSource} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.factories.TableSourceFactory import org.apache.flink.types.Row import org.apache.flink.api.scala._ import org.apache.flink.table.api.config.TableConfigOptions /** * Created by dzm on 2020/7/21. */ object TestHiveJoin { def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
// 使用 Blink Planner 创建流表运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val streamTableEnv = StreamTableEnvironment.create(env,settings)
streamTableEnv.getConfig.getConfiguration.setString("lookup.join.cache.ttl","10s")
// RetryingMetaStoreClient
val catalog = new HiveCatalog("myHiveCatalog","default","D:\\ideaProject\\hualu\\TestFlinkSQL\\src\\main\\resources\\","1.2.1")
// catalog.getTable(new ObjectPath("",""))
streamTableEnv.registerCatalog("myHiveCatalog",catalog)
streamTableEnv.useCatalog("myHiveCatalog")
streamTableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,true)
// TableSourceFactory
// streamTableEnv.getConfig.getConfiguration.setString("table.sql-dialect","default")
// streamTableEnv.getConfig.getConfiguration.setString("table.dynamic-table-options.enabled","true")
// streamTableEnv.executeSql("drop table TestJoin9")
streamTableEnv.executeSql("drop table if exists user_log")
streamTableEnv.executeSql("drop table if exists TestJoin5")
streamTableEnv.executeSql("drop table if exists flink_sql_sink1")
streamTableEnv.executeSql(FlinkSQLUtils.kafkaSqlLocal)
streamTableEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_SQL_DIALECT,"hive")
// HiveTableSource streamTableEnv.executeSql("" + "create table TestJoin5(" + " dwid String," + " dwmc String," + " name2 String" + ") stored as parquet tblproperties (" + " 'lookup.join.cache.ttl' = '10s'" + ")")
// val fac = new HiveTableFactory()
val aa = streamTableEnv.sqlQuery("select * from TestJoin5 /*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.monitor-interval' = '15 s')*/")
streamTableEnv.toAppendStream[Row](aa).print().setParallelism(1)
streamTableEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_SQL_DIALECT,"default")
// HBaseConfiguration
// streamTableEnv.executeSql(FlinkSQLUtils.mysqlSinkSql1)
// streamTableEnv.executeSql("insert into TestJoin1 select * from TestJoin")
// val tableTestJoin = streamTableEnv.sqlQuery("select * from TestJoin")
// val ssss = streamTableEnv.toAppendStream[(String,String,String)](tableTestJoin)
// ssss.print().setParallelism(1)
// val table = streamTableEnv.sqlQuery("select * from TestJoin")
// val stream1 = streamTableEnv.toAppendStream[(String,String,String)](table)
// val procTable = stream1.toTable(streamTableEnv,'_1 as "dwid",'_2 as "dwmc",'_3 as "name2",'procTime.proctime())
// streamTableEnv.createTemporaryView("orcTableWithProcessTime", procTable)
//
// val temFunc = procTable.createTemporalTableFunction('procTime,'dwid)
// streamTableEnv.registerFunction("Rates",temFunc)
//
//
/*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.monitor-interval' = '15 s','lookup.join.cache.ttl'='15 s')*/
// https://baijiahao.baidu.com/s?id=1678429138706485764&wfr=spider&for=pc
try{
val ssss = streamTableEnv.sqlQuery(
// "insert into flink_sql_sink1
" + "select a.name,CAST(a.id as INT),CAST(a.age as INT),a.behavior,b.dwmc,CAST('1' as INT),b.name2,CAST(a.userProctime as BIGINT) " + "from user_log a LEFT JOIN TestJoin5 /+ OPTIONS('lookup.join.cache.ttl' = '15 s')/ FOR SYSTEM_TIME AS OF a.userProctime as b " + "on a.age = b.dwid where b.dwmc is not null") /+ OPTIONS('lookup.join.cache.ttl' = '10s')/
streamTableEnv.toAppendStream[Row](ssss).print().setParallelism(1)
}catch {
case e: Exception => e.printStackTrace()
}
// stream1.print().setParallelism(1)
env.execute("cccc")
} }
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。