各位大佬,请教一个问题: 我通过pyflink1.15,加载机器学习模型后得到了预测数据,并发送到了kafka,结果形式如下: {"f0":478,"f1":17.546555} {"f0":475,"f1":13.629357} {"f0":223,"f1":28.003633} {"f0":334,"f1":25.130732}
其中f0为INT类型,f1为float类型,现在想把结果写入到mysql里面,代码如下,但是报错,请教应该如何写?
env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings)
###从TOPIC-yyyy获取预测结果信息 table_env.execute_sql(""" CREATE TABLE source ( f0 INT, f1 float ) WITH ( 'connector' = 'kafka', 'topic' = 'yyyy', 'properties.bootstrap.servers' = '192.168.15.111:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ) """)
source_table = table_env.from_path("source")
###写入到MYSQL的print_table表 table_env.execute_sql(""" CREATE TABLE sink ( qqq INT, www float ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://192.168.1.5:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8', 'username'='root', 'driver' = 'com.mysql.cj.jdbc.Driver', 'password'='123456', 'table-name' = 'print_table'
)
""")
source_table.execute_insert("sink").wait()
执行却一直报错,请教大佬如何处理? 如何在pyflink1.15下将kafka内容写入到mysql? 麻烦提供完整的代码,谢谢
根据您提供的代码和问题描述,我注意到您在创建 sink 表时,表字段名与 source 表不一致,导致在执行 insert 操作时出现了错误。另外,您使用了 csv
格式来读取 Kafka 数据,但是 Kafka 中的数据格式是 JSON,因此需要使用 json
格式来读取。
下面是一份修改后的代码示例,可以将 Kafka 中的 JSON 数据写入到 MySQL 中:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
env_settings.use_blink_planner()
stream_env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(stream_env, environment_settings=env_settings)
# 从 Kafka 中读取 JSON 数据
table_env.execute_sql("""
CREATE TABLE source (
f0 INT,
f1 FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'yyyy',
'properties.bootstrap.servers' = '192.168.15.111:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
""")
# 写入到 MySQL 中
table_env.execute_sql("""
CREATE TABLE sink (
f0 INT,
f1 FLOAT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.5:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'print_table',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '123456'
)
""")
# 执行 insert 操作
table_env.execute_sql("""
INSERT INTO sink
SELECT f0, f1 FROM source
""").wait()
在这份代码中,我们使用了 json
格式来读取 Kafka 中的 JSON 数据,然后将数据写入到 MySQL 中。注意,在创建 sink 表时,表字段名与 source 表一致,这样才能正确地进行 insert 操作。
在PyFlink 1.15中,你可以使用JDBC连接器将Kafka数据写入MySQL。以下是你可以尝试的修改后的代码:
env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(""" CREATE TABLE source ( f0 INT, f1 FLOAT ) WITH ( 'connector' = 'kafka', 'topic' = 'yyyy', 'properties.bootstrap.servers' = '192.168.15.111:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ) """)
source_table = table_env.from_path("source")
table_env.execute_sql(""" CREATE TABLE sink ( qqq INT, www FLOAT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.1.5:3306/index?useSSL=false', 'table-name' = 'print_table', 'username' = 'your_username', 'password' = 'your_password', 'driver' = 'com.mysql.cj.jdbc.Driver' ) """)
table_env.from_path("source").insert_into("sink")
table_env.execute("Sink to MySQL")
请注意以下几点修改:
在源表和接收器表的创建语句中,将连接器的格式('format'
)从 'csv'
修改为 'json'
,以匹配你的Kafka数据的JSON格式。
在接收器表的创建语句中,添加了额外的连接器属性,如 'table-name'
、'username'
、'password'
和 'driver'
。请替换 'your_username'
和 'your_password'
为你的MySQL数据库的用户名和密码。还要确保你已经添加了相应的MySQL驱动程序。
在将源表插入接收器表之前,你需要调用 from_path()
方法创建源表,并使用 insert_into()
方法将数据插入到接收器表中。
最后,调用 execute()
方法执行作业。
请根据你的实际配置和要求进行相应的调整。如果仍然遇到问题,请提供完整的错误消息以便进一步帮助。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。