开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC元数据我加上去了,但是在sink端写入的时候 还是无法作为字段写入?

Flink CDC元数据我加上去了,但是在sink端写入的时候 还是无法作为字段写入?image.png

展开
收起
真的很搞笑 2023-10-03 09:11:06 126 0
1 条回答
写回答
取消 提交回答
  • Flink CDC元数据是指从数据库中捕获的变更数据的一些额外信息,例如操作类型、时间戳、表名等。 这些元数据可以更好地理解和处理变更数据,例如根据操作类型进行过滤或者转换,或者根据时间戳进行排序或者窗口等。

    如果想在sink端写入的时候使用Flink CDC元数据作为字段写入,需要注意以下几点:

    • 需要在创建Flink CDC表的时候,指定需要的元数据列,并且给它们分配一个别名。例如,如果想获取操作类型和时间戳的元数据,可以这样创建表:
    CREATE TABLE mysql_binlog (
      id INT NOT NULL,
      name STRING,
      description STRING,
      weight DECIMAL(10, 3),
      op_type STRING METADATA FROM 'value.source.op' VIRTUAL, -- 操作类型
      op_ts TIMESTAMP(3) METADATA FROM 'value.source.ts_ms' VIRTUAL -- 时间戳
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'inventory',
      'table-name' = 'products'
    );
    
    • 需要在创建sink表的时候,也要包含相应的元数据列,并且保持和Flink CDC表一致的名称和类型。 例如,如果想将变更数据写入到Elasticsearch中,可以这样创建表:
    CREATE TABLE es_sink (
      id INT NOT NULL,
      name STRING,
      description STRING,
      weight DECIMAL(10, 3),
      op_type STRING, -- 操作类型
      op_ts TIMESTAMP(3) -- 时间戳
    ) WITH (
      'connector' = 'elasticsearch-7',
      'hosts' = 'http://localhost:9200',
      'index' = 'products'
    );
    
    • 需要在执行插入语句的时候,也要包含相应的元数据列,并且保持和Flink CDC表一致的顺序。 例如,如果想将变更数据从Flink CDC表插入到Elasticsearch表中,可以这样执行语句:
    INSERT INTO es_sink
    SELECT id, name, description, weight, op_type, op_ts
    FROM mysql_binlog;
    
    2023-10-20 15:57:58
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载