开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC 我flink 读取kafka数据然后入库到starrocks不知道问题出在哪了?

Flink CDC 各位我flink 读取kafka数据然后入库到starrocks
CREATE TABLE mall_activity (
id bigint,
create_time TIMESTAMP,
create_by bigint,
create_by_name string,
update_time TIMESTAMP,
update_by bigint,
update_by_name string,
activity_name string,
appointment_start_time TIMESTAMP,
end_time TIMESTAMP,
exchange_end_time TIMESTAMP,
exchange_start_time TIMESTAMP,
pick_end_time TIMESTAMP,
pick_start_time TIMESTAMP,
term string,
year int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://devdata2:9030',
'load-url' = 'devdata2:8030',
'database-name' = 'service_geek_coin',
'table-name' = 'mall_activity',
'username' = 'xxxx',
'password' = 'xxxxx',
'sink.semantic' = 'exactly-once',
'sink.label-prefix' = 'mall_activity_20231118',
'sink.properties.partial_update' = 'true'
);
不知道问题出在哪了?

展开
收起
真的很搞笑 2023-11-22 09:33:30 211 0
4 条回答
写回答
取消 提交回答
  • 排查下看看日志的log有没有异常输出,如果没有看下自己的任务是不是全量同步(全量同步如果资源不足, 任务也会卡住,不报错,也不写数据到Starrocks),此回答整理自钉群“Flink CDC 社区”

    2023-12-01 08:32:03
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    看起来您是在创建一个外部表,用于将来自 Kafka 主题的数据插入到 StarRocks 中。但是,您似乎没有指定外部表的源位置,即 Kafka 主题的位置。
    您需要将 Kafka 主题的地址作为外部表的一部分进行指定。以下是一个示例,展示了如何指定 Kafka 主题的位置:

    WITH (
        'connector' = 'kafka',
        'topic' = 'your-topic-name',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset',
        'sink.semantic' = 'at-least-once',
        'sink.buffer-flush.max-rows' = '1'
    )
    

    您还可以根据自己的实际情况修改上述代码中的其他属性,例如 formatscan.startup.modesink.semantic 等。

    2023-11-29 13:33:02
    赞同 展开评论 打赏
  • 根据您提供的Flink CDC配置,问题可能出在以下几个方面:

    1. JDBC URL、Load URL、Database Name、Table Name、Username和Password的配置不正确。请确保这些参数的值与您的StarRocks实例相匹配。

    2. Sink语义设置为'exactly-once',这意味着Flink CDC会尝试确保每个变更事件只被处理一次。如果这个设置导致问题,您可以尝试将其更改为'at-least-once'或'upto-date',以允许部分更新或增量更新。

    3. 'partial_update'属性设置为'true',这意味着Flink CDC将尝试执行部分更新操作。如果您的表有主键约束,并且某些记录已经存在于表中,那么这个设置可能会导致问题。您可以尝试将其设置为'false',以便禁用部分更新功能。

    4. 检查您的Kafka主题和Flink CDC源之间的数据格式是否匹配。确保Kafka消息包含正确的字段和数据类型,以便Flink CDC可以正确解析它们并将其插入到StarRocks表中。

    5. 检查您的StarRocks表结构是否与Flink CDC配置中的表结构相匹配。确保表的主键列和字段名称与配置中的值相匹配。

    6. 查看Flink CDC的日志以获取更多详细信息。这可以帮助您诊断问题并找到解决方案。

    2023-11-29 12:01:12
    赞同 展开评论 打赏
  • Flink CDC 创建 starrocks 表定义看起来有些混乱,检查以下几个方面:

    1. 主键约束:在定义 PRIMARY KEY 的时候不要使用 NOT ENFORCED 关键字。正确的语法应该是 PRIMARY KEY(id)
    2. 数据库名称和表名称的格式:确保 database-nametable-name 属性的格式正确,不应带引号。
    3. 子字符串匹配:检查是否为正则表达式或字符串过滤器设置了适当的参数。
    4. JDBC URL 不正确或缺失:确保 jdbc-url 属性指向正确的 StarRocks 服务器地址和端口。
    2023-11-22 16:51:37
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载