开发者社区> 问答> 正文

请问如何将pyflink机器学习的结果进行输出?

已解决

各位大佬,有个问题困扰了很久。用flink实现机器学习实时计算,输入数据通过kafka传来的特征数据,计算通过MapFunction,结果为数据的序号和预测结果,输出一方面希望到kafka供后续调用,给前端实时显示预测结果,一方面希望到mysql落地。 在最后数据输出一直不成功,希望通过kafka接受,但是总是失败: Query schema: [f0: RAW('[B', '...')] Sink schema: [a: INT, b: FLOAT]

大概是说我出来的结构是RAW,而实际sink是INT,FLOAT.具体代码如下,请各位大神指导。

import pickle from sklearn.linear_model import LinearRegression from pyflink.common.serialization import JsonRowSerializationSchema from pyflink.datastream import StreamExecutionEnvironment, FilterFunction from pyflink.datastream.connectors import NumberSequenceSource from pyflink.datastream.functions import RuntimeContext, MapFunction,CoMapFunction

载入离线训练好的模型,特征为value[1],value[2],value[3],

####预测结果为yy_pre_predict[0],数据的序号为value[0]。序号很重要,需要和预测结果一起出现 class PreMap(MapFunction): def open(self, runtime_context: RuntimeContext): self.load_model=pickle.load(open("/home/flinkdata/lr.pkl","rb")) def flat_map(self, value): xx_pre= pd.DataFrame({'RM':[value[1]], 'LSTAT':[value[2]], 'PTRATIO':[value[3]]}) yy_pre_predict=self.load_model.predict(xx_pre) return value[0],yy_pre_predict[0] ###构建环境 env = StreamExecutionEnvironment.get_execution_environment() #env.set_parallelism(1) env.add_jars("file:///home/flinkdata/flink-sql-connector-kafka-1.15.0.jar") t_env = StreamTableEnvironment.create(stream_execution_environment=env)

###source来自kafka

t_env.execute_sql(""" CREATE TABLE source ( MYNO INT, RM float, LSTAT float, PTRATIO float ) WITH ( 'connector' = 'kafka', 'topic' = 'bostonsource', 'properties.bootstrap.servers' = '192.168.15.111:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ) """)

###载入source re=t_env.from_path('source') preds=t_env.to_append_stream(re,type_info=Types.TUPLE([Types.INT(),Types.FLOAT(),Types.FLOAT(),Types.FLOAT()]))

###调用PreMap实现预测生成流 final_stream = preds.map(PreMap(),result_type=Types.ROW([Types.INT(),Types.FLOAT()]))

如何将结果final_stream输出?我使用的办法是 t_env.execute_sql(""" CREATE TABLE my_sink ( MYNO INT, PRE FLOAT) WITH ( 'connector' = 'kafka',

'topic' = 'bostonresult',

'properties.bootstrap.servers' = '192.168.15.111:9092',

'scan.startup.mode' = 'earliest-offset',

'format' = 'csv' ) """)

final_table=t_env.from_data_stream(final_stream) final_table.insert_into("my_sink") t_env.execute()

出现了前面提到的报错:

Query schema: [f0: RAW('[B', '...')] Sink schema: [a: INT, b: FLOAT]

我尝试直接sink到kafka

from pyflink.common.serialization import SimpleStringSchema from pyflink.datastream.connectors.kafka import KafkaRecordSerializationSchema, KafkaSink from pyflink.datastream import DeliveryGuarantee

brokers = "localhost:9092" topic = "sinktest"

stream = env.from_collection([(1, "Hello"), (2, "World")])

sink = KafkaSink.builder()
.set_bootstrap_servers(brokers)
.set_record_serializer( KafkaRecordSerializationSchema.builder() .set_topic(topic) .set_value_serialization_schema(SimpleStringSchema()) .build() )
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()

stream.add_sink(sink)

env.execute()

但是报错from pyflink.datastream import DeliveryGuarantee 无法加载

一直没有好的办法对预测数据的SINK,请各位大神赐教,并附完整的代码,谢谢

展开
收起
侠客张 2023-06-07 21:49:22 137 0
2 条回答
写回答
取消 提交回答
  • 采纳回答

    这个问题涉及到PyFlink中的序列化和反序列化问题。看起来你的代码中的问题在于你的模型预测生成的数据类型(RAW)和你设置的Kafka数据流接收的数据类型(INT, FLOAT)之间的不匹配。在这个问题上,我有两个建议。

    第一个建议是检查一下你的模型预测的输出数据类型。从你的代码中可以看出,你的模型预测的输出应该是一个浮点数,但是报错信息指出输出类型是RAW。你可以尝试在PreMap类的flat_map函数中,明确地将yy_pre_predict[0]转换为float类型,如下所示:

    return value[0], float(yy_pre_predict[0])
    

    第二个建议是在创建Kafka数据流的时候,使用正确的序列化和反序列化模式。你可以使用JsonRowSerializationSchema来定义Kafka数据流的序列化模式,这样可以确保数据流接收到的数据类型与你的模型预测的输出类型匹配。以下是一个使用JsonRowSerializationSchema的示例:

    from pyflink.common.serialization import JsonRowSerializationSchema, JsonRowDeserializationSchema
    from pyflink.common.typeinfo import Types
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
    
    env = StreamExecutionEnvironment.get_execution_environment()
    
    deserialization_schema = JsonRowDeserializationSchema.builder()\
        .type_info(type_info=Types.ROW([Types.INT(), Types.FLOAT()]))\
        .build()
    
    serialization_schema = JsonRowSerializationSchema.builder()\
        .type_info(type_info=Types.ROW([Types.INT(), Types.FLOAT()]))\
        .build()
    
    kafka_consumer = FlinkKafkaConsumer(
        topics='bostonsource',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': '192.168.15.111:9092', 'group.id': 'test_group'}
    )
    
    kafka_producer = FlinkKafkaProducer(
        topic='bostonresult',
        serialization_schema=serialization_schema,
        properties={'bootstrap.servers': '192.168.15.111:9092'}
    )
    
    stream = env.add_source(kafka_consumer)
    # Do your map operation here.
    stream.add_sink(kafka_producer)
    
    env.execute()
    

    以上代码定义了从Kafka中读取和写入数据的数据流。首先,创建一个JsonRowDeserializationSchemaJsonRowSerializationSchema,这两个对象定义了数据流中数据的数据类型。然后,创建一个FlinkKafkaConsumerFlinkKafkaProducer,并将前面定义的反序列化和序列化模式应用于这两个对象。

    2023-06-07 23:49:07
    赞同 展开评论 打赏
  • 您可以使用 Flink 的 KafkaSink 将预测结果输出到 Kafka 中,然后使用 Flink 的 JDBC Connector 将数据写入 MySQL 数据库。

    关于您提到的报错,可能是因为您的 Flink 版本不支持 DeliveryGuarantee,您可以尝试使用 Flink 1.13 版本或更高版本,该版本已经支持 DeliveryGuarantee。

    下面是一个简单的示例代码,演示如何将 Flink 的机器学习模型预测结果输出到 Kafka 和 MySQL 中:

    from pyflink.common.serialization import SimpleStringSchema
    from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
    from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
    from pyflink.table import TableEnvironment, DataTypes
    from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, FileSystem
    from pyflink.table.udf import udf
    
    from sklearn.linear_model import LinearRegression
    import numpy as np
    import json
    
    # 定义预测函数
    def predict(features):
        # 加载模型
        model = LinearRegression()
        model.load('model.pkl')
        # 进行预测
        features = np.array(features)
        features = features.reshape(1, -1)
        prediction = model.predict(features)
        return prediction[0]
    
    # 创建执行环境
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    t_env = TableEnvironment.create(env)
    
    # 定义输入数据格式
    input_schema = Schema()
    input_schema.field("event_time", DataTypes.TIMESTAMP())
    input_schema.field("id", DataTypes.BIGINT())
    input_schema.field("feature1", DataTypes.DOUBLE())
    input_schema.field("feature2", DataTypes.DOUBLE())
    input_schema.field("feature3", DataTypes.DOUBLE())
    
    # 定义输出数据格式
    output_schema = Schema()
    output_schema.field("id", DataTypes.BIGINT())
    output_schema.field("prediction", DataTypes.DOUBLE())
    
    # 定义输入表
    t_env.connect(
        Kafka()
        .version("universal")
        .topic("input_topic")
        .start_from_earliest()
        .property("zookeeper.connect", "localhost:2181")
        .property("bootstrap.servers", "localhost:9092")
    ).with_format(
        Json()
        .fail_on_missing_field(True)
        .json_schema(
            "{" + 
                "\"type\": \"object\", " +
                "\"properties\": {" +
                    "\"event_time\": {\"type\": \"string\", \"format\": \"date-time\"}, " +
                    "\"id\": {\"type\": \"integer\"}, " +
                    "\"feature1\": {\"type\": \"number\"}, " +
                    "\"feature2\": {\"type\": \"number\"}, " +
                    "\"feature3\": {\"type\": \"number\"}" +
                "}" +
            "}"
        )
        .timestamp_format("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", "UTC")
        .for_proctime()
    ).with_schema(input_schema).create_temporary_table("input_table")
    
    # 定义输出表
    t_env.connect(
        Kafka()
        .version("universal")
        .topic("output_topic")
        .property("zookeeper.connect", "localhost:2181")
        .property("bootstrap.servers", "localhost:9092")
    ).with_format(
        Json()
        .fail_on_missing_field(True)
        .json_schema(
            "{" + 
                "\"type\": \"object\", " +
                "\"properties\": {" +
                    "\"id\": {\"type\": \"integer\"}, " +
                    "\"prediction\": {\"type\": \"number\"}" +
                "}" +
            "}"
        )
    ).with_schema(output_schema).create_temporary_table("output_table")
    
    # 定义预测函数
    predict_udf = udf(predict, DataTypes.DOUBLE(), DataTypes.ARRAY(DataTypes.DOUBLE(), 3))
    
    # 执行查询
    result = t_env.from_path("input_table") \
        .select("id, " + predict_udf("feature1", "feature2", "feature3").alias("prediction")) \
        .insert_into("output_table")
    
    # 输出到 Kafka
    kafka_producer = FlinkKafkaProducer(
        topic="output_topic",
        serialization_schema=SimpleStringSchema(),
        producer_config={
            "bootstrap.servers": "localhost:9092",
            "acks": "all"
        }
    )
    result.add_sink(kafka_producer)
    
    # 输出到 MySQL
    t_env.connect(
        FileSystem().path("/path/to/mysql-jdbc-connector.jar")
    ).with_format(
        "jdbc",
        driver="com.mysql.jdbc.Driver",
        url="jdbc:mysql://localhost:3306/mydb",
        table="output_table"
    ).with_schema(output_schema).create_temporary_table("mysql_output_table")
    
    t_env.insert_into("mysql_output_table", result)
    
    # 执行任务
    env.execute()
    

    在上面的代码中,我们首先定义了一个 predict 函数,用于加载模型并进行预测。然后,我们使用 Flink Table API 定义了输入和输出表的格式,并使用 UDF 将预测函数应用到输入表中的特征字段上,生成输出表。接着,我们使用 Flink Kafka Producer 将输出表中的数据写入到 Kafka 中,并使用 Flink JDBC Connector 将数据写入到 MySQL 数据库中。

    需要注意的是,如果您的预测函数返回的数据类型与输出表的数据类型不一致,可能会出现报错。因此,您需要根据实际情况调整输出表的数据类型,使其与预测函数的输出数据类型一致。

    2023-06-08 11:07:36
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载