问一下Flink:我这边使用pyflink将json的流数据写入kafka后,消费得到的数据却变成字段个数统计。如上面两图所示。请问应该如何解决,使得消费得到的数据和写入数据一致?使用的是canal-json格式
可以使用如下处理流程:
1、初始化运行环境。
# 初始化
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
# 加载 flink 读取 kafka 的 jar 包
env.add_jars("file:////jars/flink-sql-connector-kafka-1.16.0.jar")
2、定义 kafka source
# 源 kafka配置
kafka_servers = "node:9092"
source_topic = "test"
consume_group_id = "test_group"
sink_topic = "test1"
# 使用 SimpleStringSchema 反序列模式,因为测试数据源为非结构化数据
source = KafkaSource.builder().set_bootstrap_servers(kafka_servers) \
.set_topics(source_topic) \
.set_group_id(consume_group_id) \
.set_value_only_deserializer(SimpleStringSchema()) \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_property("partition.discovery.interval.ms", "10000") \
.build()
# 这里不使用水印了,来一条处理一条无需上下关联
data_source = env.from_source(source=source,
watermark_strategy=WatermarkStrategy.no_watermarks(),
source_name="kafka source")
3、定义转换函数,将普通非结构化数据转换为 json
def standard_log(line):
"""
标准化字符串
:param line:
:return:
"""
data = []
try:
if line:
data = [json.dumps({"key": line.upper()}], ensure_ascii=False)]
except Exception as ex:
data = [json.dumps({"key": 1}, ensure_ascii=False)]
finally:
yield from data
ds_standard = data_source.flat_map(standard_log, Types.STRING())
# 标准化日志测试打印
ds_standard.print()
4、定义 sink
# 订阅写入
sink = KafkaSink.builder() \
.set_bootstrap_servers(kafka_servers) \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic(sink_topic)
.set_value_serialization_schema(SimpleStringSchema())
.build()
) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.build()
ds_standard.sink_to(sink)
# 执行
env.execute()
5、校验数据
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
——参考链接。
当您使用PyFlink将JSON格式的数据写入Kafka,而在消费时却发现数据变成了字段个数统计,这很可能是因为在消费端对数据的解码处理不正确导致的。为了确保写入和消费数据的一致性,尤其是当您提及使用的是Canal JSON格式时,需要注意以下几点:
生产端编码:
database
、table
、type
等Canal协议所需的基本字段以及对应的实际数据字段。序列化器设置:
SimpleStringSchema
可能只会把对象转换为其字符串表示,而不是JSON格式。您需要确保使用类似JsonRowSerializationSchema
或者其他能够正确处理JSON格式的序列化器。消费端解码:
JsonDeserializer
或者自定义反序列化逻辑;如果是其他语言的消费者,也需要相应的JSON解析模块。举例说明如何在PyFlink中使用JSON序列化器:
from pyflink.table import DataTypes, StreamTableEnvironment, TableConfig
from pyflink.table.descriptors import Schema, Kafka, Json
# 创建表环境
table_env = StreamTableEnvironment.create(environment_settings=..., table_config=TableConfig())
# 定义表结构,假设与Canal JSON格式匹配
schema = Schema()
# ... 添加列定义 ...
# 设置Kafka生产者配置,使用JSON格式
table_env.connect(Kafka()
.version('universal') # 根据实际情况选择版本
.topic('your_topic')
.property('bootstrap.servers', 'kafka_broker:9092')
.property('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer') # 或使用JSONSerializer
.with_schema(Json().json_schema(schema.json())))
# 将DataStream转换为Table,并注册为一个临时表
table = table_env.from_data_stream(stream)
table_env.create_temporary_view('source_table', table)
# 将表写入Kafka
table_env.execute_sql("""
INSERT INTO kafka_table
SELECT * FROM source_table
""")
# 确保SELECT查询出来的每一行数据都被转换为合法的JSON字符串写入Kafka
消费端则需要确保按照同样的逻辑解码JSON字符串为JSON对象,具体做法取决于您使用的消费者框架和语言。如果使用的是Flink Kafka Consumer,同样需要配置合适的JSON Deserializer。
如果您的问题是发生在非Flink的Kafka消费者身上,请检查消费者配置以确保它正确处理JSON格式的消息。
使用PyFlink将JSON流数据写入Kafka,但在消费者端接收的数据变成了字段数量计数。这种情况的原因有很多可能性,请允许我对其中的一些进行简短探讨:
这个问题是由于在将 JSON 数据写入 Kafka 时,Flink 使用了默认的编码(canal-json),导致某些字段被截断或不正确。要解决这个问题,您可以在写入 Kafka 时指定自定义的编码。
以下是一个使用 PyFlink 的示例,展示了如何将 JSON 数据写入 Kafka 并自定义编码:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
env = ExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = BatchTableEnvironment.create(env, t_config)
t_env.connect(FileSystem().path('output.json')) \
.with_format(OldCsv()
.field_delimiter(',')
.field("field1", DataTypes.STRING())
.field("field2", DataTypes.STRING())
.field("field3", DataTypes.STRING())) \
.with_schema(Schema()
.field("field1", DataTypes.STRING())
.field("field2", DataTypes.STRING())
.field("field3", DataTypes.STRING())) \
.register_table_sink("Results")
data = [{"field1": "value1", "field2": "value2", "field3": "value3"},
{"field1": "value4", "field2": "value5", "field3": "value6"}]
t_env.from_elements(data, DataTypes.JSON()) \
.select("field1", "field2", "field3") \
.insert_into("Results")
t_env.execute("write_kafka_json")
CopyCopy
在这个示例中,我们使用了 OldCsv 格式来指定自定义编码。您还可以根据需要修改 field_delimiter、field 和 with_schema 方法来调整编码。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。