Flink sql join hive 维表
好吧,刚问完,尝试出来了。 hive 的表需要通过 flink 创建,在建表时指定 lookup.join.cache.ttl 参数。
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++ package Test.Flink
import org.apache.flink.connectors.hive.{HiveTableFactory, HiveTableSource} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.factories.TableSourceFactory import org.apache.flink.types.Row import org.apache.flink.api.scala._ import org.apache.flink.table.api.config.TableConfigOptions /** * Created by dzm on 2020/7/21. */ object TestHiveJoin { def main(args: Array[String]): Unit = {
System.setProperty('HADOOP_USER_NAME','root')
// 使用 Blink Planner 创建流表运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val streamTableEnv = StreamTableEnvironment.create(env,settings)
streamTableEnv.getConfig.getConfiguration.setString('lookup.join.cache.ttl','10s')
// RetryingMetaStoreClient
val catalog = new HiveCatalog('myHiveCatalog','default','D:\\ideaProject\\hualu\\TestFlinkSQL\\src\\main\\resources\\','1.2.1')
// catalog.getTable(new ObjectPath('',''))
streamTableEnv.registerCatalog('myHiveCatalog',catalog)
streamTableEnv.useCatalog('myHiveCatalog')
streamTableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,true)
// TableSourceFactory
// streamTableEnv.getConfig.getConfiguration.setString('table.sql-dialect','default')
// streamTableEnv.getConfig.getConfiguration.setString('table.dynamic-table-options.enabled','true')
// streamTableEnv.executeSql('drop table TestJoin9')
streamTableEnv.executeSql('drop table if exists user_log')
streamTableEnv.executeSql('drop table if exists TestJoin5')
streamTableEnv.executeSql('drop table if exists flink_sql_sink1')
streamTableEnv.executeSql(FlinkSQLUtils.kafkaSqlLocal)
streamTableEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_SQL_DIALECT,'hive')
// HiveTableSource streamTableEnv.executeSql('' + 'create table TestJoin5(' + ' dwid String,' + ' dwmc String,' + ' name2 String' + ') stored as parquet tblproperties (' + ' 'lookup.join.cache.ttl' = '10s'' + ')')
// val fac = new HiveTableFactory()
val aa = streamTableEnv.sqlQuery('select * from TestJoin5 /*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.monitor-interval' = '15 s')*/')
streamTableEnv.toAppendStream[Row](aa).print().setParallelism(1)
streamTableEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_SQL_DIALECT,'default')
// HBaseConfiguration
// streamTableEnv.executeSql(FlinkSQLUtils.mysqlSinkSql1)
// streamTableEnv.executeSql('insert into TestJoin1 select * from TestJoin')
// val tableTestJoin = streamTableEnv.sqlQuery('select * from TestJoin')
// val ssss = streamTableEnv.toAppendStream[(String,String,String)](tableTestJoin)
// ssss.print().setParallelism(1)
// val table = streamTableEnv.sqlQuery('select * from TestJoin')
// val stream1 = streamTableEnv.toAppendStream[(String,String,String)](table)
// val procTable = stream1.toTable(streamTableEnv,'_1 as 'dwid','_2 as 'dwmc','_3 as 'name2','procTime.proctime())
// streamTableEnv.createTemporaryView('orcTableWithProcessTime', procTable)
//
// val temFunc = procTable.createTemporalTableFunction('procTime,'dwid)
// streamTableEnv.registerFunction('Rates',temFunc)
//
//
/*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.monitor-interval' = '15 s','lookup.join.cache.ttl'='15 s')*/
// https://baijiahao.baidu.com/s?id=1678429138706485764&wfr=spider&for=pc
try{
val ssss = streamTableEnv.sqlQuery(
// 'insert into flink_sql_sink1 ' + 'select a.name,CAST(a.id as INT),CAST(a.age as INT),a.behavior,b.dwmc,CAST('1' as INT),b.name2,CAST(a.userProctime as BIGINT) ' + 'from user_log a LEFT JOIN TestJoin5 /+ OPTIONS('lookup.join.cache.ttl' = '15 s')/ FOR SYSTEM_TIME AS OF a.userProctime as b ' + 'on a.age = b.dwid where b.dwmc is not null') /+ OPTIONS('lookup.join.cache.ttl' = '10s')/
streamTableEnv.toAppendStream[Row](ssss).print().setParallelism(1)
}catch {
case e: Exception => e.printStackTrace()
}
// stream1.print().setParallelism(1)
env.execute('cccc')
} }
赞0
踩0