开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问Flink这个语句哪里写的有问题么?运行不过去?

请问Flink这个语句哪里写的有问题么?运行不过去?CREATE TABLE dwd_czc_gnss_info ( id STRING, gps_time TIMESTAMP(3), WATERMARK FOR gps_time AS gps_time - INTERVAL '1' MINUTE ) WITH ( 'connector' = 'kafka', 'topic' = 'dwd_czc_gnss_info', 'properties.bootstrap.servers' = 'node3:9092,node2:9092,node4:9092', 'scan.startup.mode' = 'latest-offset', 'properties.group.id' = 'wuhan_czc_ads', 'format' = 'json' ); CREATE TABLE ts_taxi_area_empty_rate_real_d ( TUMBLE_START STRING, num INT ) WITH ( 'connector' = 'print' ); INSERT INTO ts_taxi_area_empty_rate_real_d SELECT
TUMBLE_START(gps_time, INTERVAL '2' MINUTE) as TUMBLE_START, COUNT(DISTINCT id) AS num FROM dwd_czc_gnss_info GROUP BY TUMBLE(gps_time, INTERVAL '2' MINUTE);

展开
收起
真的很搞笑 2023-06-11 22:03:32 99 0
4 条回答
写回答
取消 提交回答
  • 可能在INSERT INTO语句中,TUMBLE_START作为别名字段,但是在SELECT语句中应该直接使用TUMBLE_START(gps_time, INTERVAL '2' MINUTE)作为窗口聚合的一部分。

    调整语句参考如下:

    
              CREATE TABLE dwd_czc_gnss_info (
      id STRING,
      gps_time TIMESTAMP(3),
      WATERMARK FOR gps_time AS gps_time - INTERVAL '1' MINUTE  -- 检查这个语法是否正确
    ) WITH (
      connector = 'kafka',
      topic = 'dwd_czc_gnss_info',
      properties.bootstrap.servers = 'node3:9092,node2:9092,node4:9092',
      scan.startup.mode = 'latest-offset',
      properties.group.id = 'wuhan_czc_ads',
      format = 'json'
    );
    
    CREATE TABLE ts_taxi_area_empty_rate_real_d (
      TUMBLE_START TIMESTAMP(3),  -- 这里应该是TIMESTAMP类型
      num INT
    ) WITH (
      connector = 'print'
    );
    
    INSERT INTO ts_taxi_area_empty_rate_real_d
    SELECT
      TUMBLE_START(gps_time, INTERVAL '2' MINUTE) as TUMBLE_START,
      COUNT(DISTINCT id) AS num
    FROM dwd_czc_gnss_info
    GROUP BY TUMBLE(gps_time, INTERVAL '2' MINUTE);
    
    2024-01-25 20:23:51
    赞同 1 展开评论 打赏
  • 某政企事业单位运维工程师,主要从事系统运维及大数据开发工作,多次获得阿里云、华为云、腾讯云征文比赛一二等奖;CTF选手,白帽,全国交通行业网络安全大赛二等奖,全国数信杯数据安全大赛银奖,手握多张EDU、CNVD、CNNVD证书

    根据您提供的信息,Flink语句中有几个问题需要修正。

    首先,'format'属性的值应为'json',需要将其修改为'json',而不是'format' = 'jsion'。

    另外,INSERT INTO语句的SELECT子句中的TUMBLE_START函数应该被改为TUMBLE_START(gps_time, INTERVAL '2' MINUTE) as TUMBLE_START。

    修正后的Flink语句如下所示:

    CREATE TABLE dwd_czc_gnss_info (
      id STRING,
      gps_time TIMESTAMP(3),
      WATERMARK FOR gps_time AS gps_time - INTERVAL '1' MINUTE
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'dwd_czc_gnss_info',
      'properties.bootstrap.servers' = 'node3:9092,node2:9092,node4:9092',
      'scan.startup.mode' = 'latest-offset',
      'properties.group.id' = 'wuhan_czc_ads',
      'format' = 'json'
    );
    
    CREATE TABLE ts_taxi_area_empty_rate_real_d (
      TUMBLE_START STRING,
      num INT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO ts_taxi_area_empty_rate_real_d
    SELECT
      TUMBLE_START(gps_time, INTERVAL '2' MINUTE) as TUMBLE_START,
      COUNT(DISTINCT id) AS num
    FROM
      dwd_czc_gnss_info
    GROUP BY
      TUMBLE(gps_time, INTERVAL '2' MINUTE);
    

    在运行此代码之前,确保您已经正确安装和配置了相关的Flink环境,并且所需的依赖项已经正确引入。

    2024-01-19 15:53:20
    赞同 展开评论 打赏
  • 该Flink SQL语句中存在一个问题,即在INSERT INTO子句中引用了TUMBLE_START字段,但在目标表ts_taxi_area_empty_rate_real_d的定义中,TUMBLE_START字段的数据类型是STRING,而在SELECT子句中,TUMBLE_START(gps_time, INTERVAL '2' MINUTE)返回的是一个TIMESTAMP类型。因此,数据类型不匹配导致了错误。

    为了解决这个问题,你需要将目标表ts_taxi_area_empty_rate_real_d中的TUMBLE_START字段类型改为TIMESTAMP,如下所示:

    CREATE TABLE dwd_czc_gnss_info (
        id STRING,
        gps_time TIMESTAMP(3),
        WATERMARK FOR gps_time AS gps_time - INTERVAL '1' MINUTE
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'dwd_czc_gnss_info',
        'properties.bootstrap.servers' = 'node3:9092,node2:9092,node4:9092',
        'scan.startup.mode' = 'latest-offset',
        'properties.group.id' = 'wuhan_czc_ads',
        'format' = 'json'
    );
    
    CREATE TABLE ts_taxi_area_empty_rate_real_d (
        TUMBLE_START TIMESTAMP,
        num INT
    ) WITH (
        'connector' = 'print'
    );
    
    INSERT INTO ts_taxi_area_empty_rate_real_d
    SELECT
        TUMBLE_START(gps_time, INTERVAL '2' MINUTE) as TUMBLE_START,
        COUNT(DISTINCT id) AS num
    FROM dwd_czc_gnss_info
    GROUP BY TUMBLE(gps_time, INTERVAL '2' MINUTE);
    

    这样修改后,INSERT INTO语句中的TUMBLE_START字段类型就和目标表中的定义一致了。

    2024-01-15 15:21:53
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    可能需要检查以下几点:

    1. 确保您已经正确安装并配置了Flink,以及相关的Kafka连接器。
    2. 检查您的Kafka集群地址是否正确,以及消费者组ID是否正确。
    3. 确保您的Flink作业中已经定义了dwd_czc_gnss_info表和ts_taxi_area_empty_rate_real_d表。
    4. 检查您的Flink版本是否支持您使用的SQL语法。Flink的SQL支持在1.11及更高版本中引入,所以如果您使用的是较旧的Flink版本,可能需要升级到最新版本。
    2024-01-12 21:44:59
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载