开发者社区> 问答> 正文

pyflink1.15下如何把kafka数据{"f0":478,"f1":17.5}写入mysql?

已解决

各位大佬,请教一个问题: 我通过pyflink1.15,加载机器学习模型后得到了预测数据,并发送到了kafka,结果形式如下: {"f0":478,"f1":17.546555} {"f0":475,"f1":13.629357} {"f0":223,"f1":28.003633} {"f0":334,"f1":25.130732}

其中f0为INT类型,f1为float类型,现在想把结果写入到mysql里面,代码如下,但是报错,请教应该如何写?

env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings)

###从TOPIC-yyyy获取预测结果信息 table_env.execute_sql(""" CREATE TABLE source ( f0 INT, f1 float ) WITH ( 'connector' = 'kafka', 'topic' = 'yyyy', 'properties.bootstrap.servers' = '192.168.15.111:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ) """)

source_table = table_env.from_path("source")

###写入到MYSQL的print_table表 table_env.execute_sql(""" CREATE TABLE sink ( qqq INT, www float ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://192.168.1.5:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8', 'username'='root', 'driver' = 'com.mysql.cj.jdbc.Driver', 'password'='123456', 'table-name' = 'print_table'

)

""")

source_table.execute_insert("sink").wait()

执行却一直报错,请教大佬如何处理? 如何在pyflink1.15下将kafka内容写入到mysql? 麻烦提供完整的代码,谢谢

展开
收起
侠客张 2023-06-26 21:24:51 144 0
2 条回答
写回答
取消 提交回答
  • 采纳回答

    根据您提供的代码和问题描述,我注意到您在创建 sink 表时,表字段名与 source 表不一致,导致在执行 insert 操作时出现了错误。另外,您使用了 csv 格式来读取 Kafka 数据,但是 Kafka 中的数据格式是 JSON,因此需要使用 json 格式来读取。

    下面是一份修改后的代码示例,可以将 Kafka 中的 JSON 数据写入到 MySQL 中:

    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.table import StreamTableEnvironment, EnvironmentSettings
    
    env_settings = EnvironmentSettings.in_streaming_mode()
    env_settings.use_blink_planner()
    stream_env = StreamExecutionEnvironment.get_execution_environment()
    table_env = StreamTableEnvironment.create(stream_env, environment_settings=env_settings)
    
    # 从 Kafka 中读取 JSON 数据
    table_env.execute_sql("""
        CREATE TABLE source (
            f0 INT,
            f1 FLOAT
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'yyyy',
            'properties.bootstrap.servers' = '192.168.15.111:9092',
            'scan.startup.mode' = 'earliest-offset',
            'format' = 'json'
        )
    """)
    
    # 写入到 MySQL 中
    table_env.execute_sql("""
        CREATE TABLE sink (
            f0 INT,
            f1 FLOAT
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:mysql://192.168.1.5:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8',
            'table-name' = 'print_table',
            'driver' = 'com.mysql.cj.jdbc.Driver',
            'username' = 'root',
            'password' = '123456'
        )
    """)
    
    # 执行 insert 操作
    table_env.execute_sql("""
        INSERT INTO sink
        SELECT f0, f1 FROM source
    """).wait()
    

    在这份代码中,我们使用了 json 格式来读取 Kafka 中的 JSON 数据,然后将数据写入到 MySQL 中。注意,在创建 sink 表时,表字段名与 source 表一致,这样才能正确地进行 insert 操作。

    2023-06-27 08:32:22
    赞同 1 展开评论 打赏
  • 在PyFlink 1.15中,你可以使用JDBC连接器将Kafka数据写入MySQL。以下是你可以尝试的修改后的代码:

    env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings)

    从TOPIC-yyyy获取预测结果信息

    table_env.execute_sql(""" CREATE TABLE source ( f0 INT, f1 FLOAT ) WITH ( 'connector' = 'kafka', 'topic' = 'yyyy', 'properties.bootstrap.servers' = '192.168.15.111:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ) """)

    source_table = table_env.from_path("source")

    写入到MySQL的print_table表

    table_env.execute_sql(""" CREATE TABLE sink ( qqq INT, www FLOAT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.1.5:3306/index?useSSL=false', 'table-name' = 'print_table', 'username' = 'your_username', 'password' = 'your_password', 'driver' = 'com.mysql.cj.jdbc.Driver' ) """)

    table_env.from_path("source").insert_into("sink")

    table_env.execute("Sink to MySQL")

    请注意以下几点修改:

    1. 在源表和接收器表的创建语句中,将连接器的格式('format')从 'csv' 修改为 'json',以匹配你的Kafka数据的JSON格式。

    2. 在接收器表的创建语句中,添加了额外的连接器属性,如 'table-name''username''password''driver'。请替换 'your_username''your_password' 为你的MySQL数据库的用户名和密码。还要确保你已经添加了相应的MySQL驱动程序。

    3. 在将源表插入接收器表之前,你需要调用 from_path() 方法创建源表,并使用 insert_into() 方法将数据插入到接收器表中。

    4. 最后,调用 execute() 方法执行作业。

    请根据你的实际配置和要求进行相应的调整。如果仍然遇到问题,请提供完整的错误消息以便进一步帮助。

    2023-06-26 22:25:43
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像