开发者社区> 问答> 正文

flink1.7.1 写es6外部表没数据问题

用的是flink1.7.1,测试代码为

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Rowtime, Schema}
import org.apache.flink.table.api.Types
import org.apache.flink.types.Row

case class Person(id: Int, name: String, value: String, timestamp: Long)
object EsTableSinkExample extends App {

  val esSink: Elasticsearch = new Elasticsearch()
    .version("6")
    .host("192.168.1.160", 9200, "http")
    .index("test-myuser")
    .documentType("_doc")  // es6 type一定要设置成"_doc"
  
  val json = new Json().jsonSchema(
    """
      |{
      | "type": "object",
      | "properties": {
      |   "id": {
      |     "type": "number"
      |   },
      |   "name": {
      |     "type": "string"
      |   },
      |   "value": {
      |     "type": "string"
      |   },
      |   "timestamp": {
      |   "type": "number"
      |   }
      |  }
      |}
    """.stripMargin
  )

  val schema: Schema = new Schema()
    .field("id", Types.INT )
    .field("name", Types.STRING )
    .field("value", Types.STRING )
    .field("timestamp", Types.LONG )

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tableEnv = TableEnvironment.getTableEnvironment(env)
  tableEnv
    .connect(esSink)
    .withFormat(json)
    .withSchema(schema)
    .inUpsertMode()
    .registerTableSink("my-user-tableSink")

  val source = env.fromElements("1,hb,100,1547121997000", "2,fsl,99,1547121999000")
    .map(_.split(",")).map(x => Person(x(0).toInt, x(1), x(2), x(3).toLong))
  source.print()

  tableEnv.registerDataStream("source", source)
  val t1 = tableEnv.sqlQuery("select * from source ")
  t1.printSchema()
  //val t2 = tableEnv.toRetractStream[Row](t1)
  //t2.print
  t1.insertInto("my-user-tableSink")

  env.execute()
}

在idea里执行,成功,没有报错.
但是在es里查结果时,发现只有

  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "test-myuser",
        "_type": "_doc",
        "_id": "dzDWOmgBDxFAsMLp_YNp",
        "_score": 1
      },
      {
        "_index": "test-myuser",
        "_type": "_doc",
        "_id": "eTDXOmgBDxFAsMLpKoNJ",
        "_score": 1
      }
    ]
  }

只有随机id,没有实际记录写入.

问题:
怎样把完整的表写入es,并指定主键, 比如代码里的这个表 val t1 = tableEnv.sqlQuery("select * from source ").

展开
收起
冷丰 2019-01-11 10:59:35 4402 0
2 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载