开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

pyflink 连接kafka 不定字段的json反序列化,有没有比较好的写法?

pyflink 连接kafka 不定字段的json反序列化,有没有比较好的写法?

展开
收起
wenti 2023-02-06 16:24:58 246 0
2 条回答
写回答
取消 提交回答
  • 使用 PyFlink 连接 Kafka 并处理不固定字段的 JSON 数据,有以下几种比较好的写法:

    1. 使用 Apache Flink 的 JsonSchema

    JsonSchema 是 Apache Flink 提供的一个类,它可以帮助你解析 JSON 数据,即使字段是不固定的。示例代码如下:

    from pyflink.datastream import DataStream
    from pyflink.datastream.connectors import FlinkKafkaConsumer
    from pyflink.table import Schema, DataTypes
    from pyflink.table.sources import TableSource
    
    class UnboundedJsonSource(TableSource):
    
        def __init__(self, topic, bootstrap_servers, group_id):
            self.topic = topic
            self.bootstrap_servers = bootstrap_servers
            self.group_id = group_id
    
        def get_data_stream(self, context):
            json_schema = Schema.new_builder() \
                .column("name", DataTypes.STRING()) \
                .column("age", DataTypes.INT()) \
                .column("city", DataTypes.STRING()) \
                .column("occupation", DataTypes.STRING()) \
                .build()
    
            kafka_consumer = FlinkKafkaConsumer(
                topics=self.topic,
                deserialization_schema=json_schema,
                properties={'bootstrap.servers': self.bootstrap_servers,
                            'group.id': self.group_id})
    
            return context.create_data_stream(kafka_consumer)
    
    # 创建 TableSource
    json_source = UnboundedJsonSource(topic="my-topic", bootstrap_servers="localhost:9092", group_id="my-group")
    
    # 创建 TableEnvironment
    t_env = TableEnvironment.create()
    
    # 注册 TableSource
    t_env.register_table_source("json_table", json_source)
    
    # 查询 Table
    result_table = t_env.scan("json_table")
    
    # 输出结果
    result_table.execute().print()
    

    2. 使用第三方库,如 jsonschema

    jsonschema 是一个流行的 Python 库,用于验证和解析 JSON 数据。它可以帮助你动态地创建 Python 类,这些类可以根据给定的 JSON 模式解析 JSON 数据。示例代码如下:

    import jsonschema
    
    from pyflink.datastream import DataStream
    from pyflink.datastream.connectors import FlinkKafkaConsumer
    from pyflink.table import Schema, DataTypes
    from pyflink.table.sources import TableSource
    
    class UnboundedJsonSource(TableSource):
    
        def __init__(self, topic, bootstrap_servers, group_id, json_schema):
            self.topic = topic
            self.bootstrap_servers = bootstrap_servers
            self.group_id = group_id
            self.json_schema = json_schema
    
        def get_data_stream(self, context):
            json_validator = jsonschema.Draft7Validator(self.json_schema)
    
            def deserialize_json(message):
                try:
                    json_validator.validate(message)
                    return message
                except jsonschema.ValidationError:
                    return None
    
            kafka_consumer = FlinkKafkaConsumer(
                topics=self.topic,
                deserialization_schema=deserialize_json,
                properties={'bootstrap.servers': self.bootstrap_servers,
                            'group.id': self.group_id})
    
            return context.create_data_stream(kafka_consumer)
    
    # 创建 JSON 模式
    json_schema = {
        "type": "object",
        "properties": {
            "name": {"type": "string"},
            "age": {"type": "integer"},
            "city": {"type": "string"},
            "occupation": {"type": "string"}
        }
    }
    
    # 创建 TableSource
    json_source = UnboundedJsonSource(topic="my-topic", bootstrap_servers="localhost:9092", group_id="my-group", json_schema=json_schema)
    
    # 创建 TableEnvironment
    t_env = TableEnvironment.create()
    
    # 注册 TableSource
    t_env.register_table_source("json_table", json_source)
    
    # 查询 Table
    result_table = t_env.scan("json_table")
    
    # 输出结果
    result_table.execute().print()
    

    3. 使用自定义反序列化函数

    你还可以编写自己的自定义反序列化函数来解析不固定字段的 JSON 数据。示例代码如下:

    import json
    
    from pyflink.datastream import DataStream
    from pyflink.datastream.connectors import FlinkKafkaConsumer
    from pyflink.table import Schema, DataTypes
    from pyflink.table.sources import TableSource
    
    class UnboundedJsonSource(TableSource):
    
        def __init__(self, topic, bootstrap_servers, group_id):
            self.topic = topic
            self.bootstrap_servers = bootstrap_servers
            self.group_id = group_id
    
        def get_data_stream(self, context):
            def deserialize_json(message):
                data = json.loads(message)
                return (data.get("name"), data.get("age"), data.get("city"), data.get("occupation"))
    
            kafka_consumer = FlinkKafkaConsumer(
                topics=self.topic,
                deserialization_schema=deserialize_json,
                properties={'bootstrap.servers': self.bootstrap_servers,
                            'group.id': self.group_id})
    
            return context.create_data_stream(kafka_consumer)
    
    # 创建 TableSource
    json_source = UnboundedJsonSource(topic="my-topic", bootstrap_servers="localhost:9092", group_id="my-group")
    
    # 创建 TableEnvironment
    t_env = TableEnvironment.create()
    
    # 注册 TableSource
    t_env.register_table_source("json_table", json_source)
    
    # 查询 Table
    result_table = t_env.scan("json_table")
    
    # 输出结果
    result_table.execute().print()
    

    选择哪种方法取决于你的具体需求和偏好。JsonSchema 是 Apache Flink 提供的一个方便的选项,但它可能不适合所有情况。第三方库(如 jsonschema)提供了更多的灵活性,但需要额外的依赖项。自定义反序列化函数给你最大的控制权,但可能需要更多的编码工作。

    2024-02-27 18:00:36
    赞同 展开评论 打赏
  • 用于埋点数据清洗和加工——该回答整理自钉群“Flink CDC 社区”

    2023-02-06 19:23:15
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载