代码如下
import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializeFilter
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.Kafka011JsonTableSource
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps
// 不使用java的Types,使用org.apache.flink.table.api.Types
//import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.table.api.Types
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.types.Row
object RowTest1 extends App {
class MySource extends SourceFunction[Row] {
val data = Array[String](
"""{"rider_id":10,"rider_name":"hb","city":"hangzhou","rowtime":1555984311000}""",
"""{"rider_id":10,"rider_name":"hb","city":"hangzhou","rowtime":1555984315000}""",
"""{"rider_id":10,"rider_name":"hb","city":"hangzhou","rowtime":1555984343000}"""
)
override def run(ctx: SourceFunction.SourceContext[Row]): Unit = {
for (i <- data) {
val r1 = JSON.parseObject(i)
val rider_id = r1.getObject("rider_id", classOf[Int])
val rider_name = r1.getObject("rider_name", classOf[String])
val rowTime = r1.getObject("rowtime", classOf[java.sql.Timestamp])
//println(rider_id, rider_name, rowTime)
val row = Row.of(rider_id.asInstanceOf[Object], rider_name.asInstanceOf[Object], rowTime.asInstanceOf[Object])
ctx.collect(row)
Thread.sleep(1000)
}
}
override def cancel(): Unit = {}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val fieldNames = Array[String]("rider_id", "rider_name", "mytime.ROWTIME")
val types = Array[TypeInformation[_]](Types.INT, Types.STRING, Types.SQL_TIMESTAMP)
val rowSource = env.addSource(new MySource)(Types.ROW(types: _*))
//rowSource.print()
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Row](Time.seconds(10)) {
override def extractTimestamp(element: Row): Long = element.getField(2).asInstanceOf[java.sql.Timestamp].getTime
})
val table1 = rowSource.toTable(tEnv).as('rider_id, 'rider_name, 'mytime)
table1.printSchema()
tEnv.registerTable("t1", table1)
tEnv.sqlQuery(
"""
| select
| rider_id,
| count(*) as cnt
| from t1
| group by rider_id, TUMBLE(mytime, INTERVAL '10' SECOND)
|
""".stripMargin).toAppendStream[Row].print()
env.execute()
}
执行提示
Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
请问:
从DataStream[Row] 转到Table过程中, 怎么指定时间字段呢.
需要的pom依赖:
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${flink.version}</version>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${flink.version}</version>
从kafka消费数据,转换为table,然后进行sql查询。
用scala开发,需要导入的包,不要漏掉,否则会有问题。
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
1
2
3
4
5
6
下面是完整代码:
package com.ddxygq.bigdata.flink.sql
import java.util.Properties
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
/**
object TableDemo {
def main(args: Array[String]): Unit = {
demo
}
def demo2(): Unit ={
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val input:DataSet[WC] = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
val input2:DataSet[WC] = env.fromElements(WC("hello", 1), WC("hello", 1))
val table = input.toTable(tEnv, 'word, 'frequency)
val table2 = input2.toTable(tEnv, 'word2, 'frequency2)
val result = table.join(table2).where('word == 'word2).select('word, 'frequency)
result.toDataSet[(String, Long)].print()
}
def demo: Unit ={
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
// 连接kafka
val ZOOKEEPER_HOST = "qcloud-test-hadoop01:2181"
val KAFKA_BROKERS = "qcloud-test-hadoop01:9092,qcloud-test-hadoop02:9092,qcloud-test-hadoop03:9092"
val TRANSACTION_GROUP = "transaction"
val kafkaProps = new Properties()
kafkaProps.setProperty("zookeeper.connect",ZOOKEEPER_HOST)
kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKERS)
kafkaProps.setProperty("group.id",TRANSACTION_GROUP)
val input = sEnv.addSource(
new FlinkKafkaConsumer08[String]("flink-test", new SimpleStringSchema(), kafkaProps)
)
.flatMap(x => x.split(" "))
.map(x => (x, 1L))
val table = sTableEnv.registerDataStream("Words", input, 'word, 'frequency)
val result = sTableEnv
.scan("Words")
.groupBy("word")
.select('word, 'frequency.sum as 'cnt)
sTableEnv.toRetractStream[(String, Long)](result).print()
sTableEnv.sqlQuery("select * from Words").toAppendStream[(String, Long)].print()
sEnv.execute("TableDemo")
}
}
这里有两个地方:
1、这里举例用了table的算子,和标准的sql查询语法,为了演示table的基本用法。
val result = sTableEnv
.scan("Words")
.groupBy("word")
.select('word, 'frequency.sum as 'cnt)
这个分组聚合统计其实可以替换成:
val result = sTableEnv.sqlQuery("select word,sum(frequency) as cnt from Words group by word")
// 打印到控制台
sTableEnv.toRetractStream(String, Long).print()
那么这个与下面的查询结果有什么区别呢?
sTableEnv.sqlQuery("select * from Words").toAppendStream[(String, Long)].print()
区别很明显,这里消费kafka的实时数据,那么Words表是一个动态的流表,数据在不断append,一个是group by的分组聚合,结果需要不断更新,比如当前是(hello,4),这时候又来了一个词语hello,就需要update结果为(hello,5),如果有新词,还需要insert,而后者是select * from Words,只是追加结果。
所以,这里只是展示打印到控制台的写法不同,前者调用的是toRetractStream方法,而后者是调用toAppendStream。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。