开发者社区> 问答> 正文

如何在pyflink1.15下将kafka内容写入到mysql?

各位大佬,请教一个问题: 我通过pyflink1.15,加载机器学习模型后得到了预测数据,并发送到了kafka,结果形式如下: {"f0":478,"f1":17.546555} {"f0":475,"f1":13.629357} {"f0":223,"f1":28.003633} {"f0":334,"f1":25.130732}

其中f0为INT类型,f1为float类型,现在想把结果写入到mysql里面,代码如下,但是报错,请教应该如何写?

env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings)

###从TOPIC-yyyy获取预测结果信息 table_env.execute_sql(""" CREATE TABLE source ( f0 INT, f1 float ) WITH ( 'connector' = 'kafka', 'topic' = 'yyyy', 'properties.bootstrap.servers' = '192.168.15.111:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ) """)

source_table = table_env.from_path("source")

###写入到MYSQL的print_table表 table_env.execute_sql(""" CREATE TABLE sink ( qqq INT, www float ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://192.168.1.5:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8', 'username'='root', 'driver' = 'com.mysql.cj.jdbc.Driver', 'password'='123456', 'table-name' = 'print_table'

)

""")

source_table.execute_insert("sink").wait()

执行却一直报错,请教大佬如何处理?

展开
收起
侠客张 2023-06-16 20:59:30 135 0
1 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    要将 Kafka 中的数据写入 MySQL 中,您需要使用 Flink 的 Table API 和 SQL API,以及相应的 Flink Connectors 来连接 Kafka 和 MySQL。

    下面是一个简单的示例代码,演示了如何使用 Flink 将 Kafka 中的数据写入 MySQL 中:

    scheme Copy from pyflink.table import * from pyflink.table.descriptors import * from pyflink.table.types import DataTypes

    env_settings = EnvironmentSettings
    .new_instance()
    .in_streaming_mode()
    .use_blink_planner()
    .build()

    t_env = TableEnvironment.create(env_settings)

    kafka_props = {'bootstrap.servers': 'kafka-broker:9092', 'group.id': 'group1'}

    t_env
    .connect( Kafka() .version('universal') .topic('input-topic') .start_from_earliest() .properties(kafka_props))
    .with_format( Json() .json_schema('{"type":"object","properties":{"f0":{"type":"integer"},"f1":{"type":"float"}}}') .fail_on_missing_field(True))
    .with_schema( Schema() .field('f0', DataTypes.INT()) .field('f1', DataTypes.FLOAT()))
    .create_temporary_table('input_table')

    t_env
    .connect( JDBC() .url('jdbc:mysql://localhost:3306/mydb') .table_name('output_table') .driver_name('com.mysql.jdbc.Driver') .username('user') .password('password'))
    .with_schema( Schema() .field('f0', DataTypes.INT()) .field('f1', DataTypes.FLOAT()))
    .create_temporary_table('output_table')

    t_env
    .from_path('input_table')
    .insert_into('output_table')

    t_env.execute('kafka_to_mysql') 在上面的示例中,我们首先创建了一个 Kafka source,使用 Json() 格式来解析 Kafka 中的数据。然后我们将解析得到的数据插入到一个 MySQL sink 中,使用 JDBC() 连接器来连接 MySQL 数据库。

    需要注意的是,您需要根据实际情况来修改代码中的连接参数和表名等信息,以及将 Json() 格式中的 json_schema 参数修改为适合您数据格式的 JSON Schema。

    2023-06-17 09:03:23
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载

相关镜像