开发者社区> 问答> 正文

Flink kafka自定义metrics在influxdb上解析失败怎么办?

大家好,Flink版本1.13.1。 我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。

但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where b.c='d'的SQL语句时,influxDB中的表可以成功被建出来; 但如果加上UDF,比如 insert into a select CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激! org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to parse 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields unable to parse ''tablename', 'ODS_INFO',CaptTimeSlewTime(Record.CAPTURE_TIME, 'YEAR_,MONTH'),'data',CreateJsonInner(true,': missing fields)*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-02 14:55:57 772 0
1 条回答
写回答
取消 提交回答
  • 是通过什么方式拿到 sink table name 呢?从报错信息来看,拿到的可能并不是“a” 这样的 table name,而是一个 insert

    语句的 digest,因此 metric 表名会变得比较复杂,导致解析错误。

    当然也可以选择对 metric 表名进行转义。*来自志愿者整理的FLINK邮件归档

    2021-12-02 15:10:09
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载

相关镜像