Flink 1.14.0 消费 kafka 数据自定义反序列化类

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在最近发布的 Flink 1.14.0 版本中对 Source 接口进行了重构,细节可以参考 FLIP-27: Refactor Source Interface重构之后 API 层面的改动还是非常大的,那在使用新的 API 消费 kafka 数据的时候如何自定义序列化类呢?

在最近发布的 Flink 1.14.0 版本中对 Source 接口进行了重构,细节可以参考 FLIP-27: Refactor Source Interface


重构之后 API 层面的改动还是非常大的,那在使用新的 API 消费 kafka 数据的时候如何自定义序列化类呢?


Kafka Source


KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");


KafkaSourceBuilder 类提供了两个方法来反序列数据,分别是 setDeserializer 和 setValueOnlyDeserializer 从名字上就应该可以看出这两者的区别,前者是反序列化完整的 ConsumerRecord,后者只反序列化 ConsumerRecord 的 value.然后我们来看一下底层的源码


KafkaSourceBuilder 源码


/**
 * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link
 * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource.
 *
 * @param recordDeserializer the deserializer for Kafka {@link
 *     org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
 * @return this KafkaSourceBuilder.
 */
public KafkaSourceBuilder<OUT> setDeserializer(
        KafkaRecordDeserializationSchema<OUT> recordDeserializer) {
    this.deserializationSchema = recordDeserializer;
    return this;
}
/**
 * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link
 * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. The given
 * {@link DeserializationSchema} will be used to deserialize the value of ConsumerRecord. The
 * other information (e.g. key) in a ConsumerRecord will be ignored.
 *
 * @param deserializationSchema the {@link DeserializationSchema} to use for deserialization.
 * @return this KafkaSourceBuilder.
 */
public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
        DeserializationSchema<OUT> deserializationSchema) {
    this.deserializationSchema =
            KafkaRecordDeserializationSchema.valueOnly(deserializationSchema);
    return this;
}


可以看到这两个方法实际上是一样的,虽然两个方法的参数不同,setDeserializer 方法参数类型是 KafkaRecordDeserializationSchema 而 setValueOnlyDeserializer 方法的参数类型是 DeserializationSchema 那这两种参数类型有什么区别和联系呢?下面会进一步解释, 但是这两个方法最后返回的都是 KafkaRecordDeserializationSchema 对象,我们继续来看 KafkaRecordDeserializationSchema 的源码


先来看一下 DeserializationSchema 的部分源码


DeserializationSchema 源码


@Public
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    @PublicEvolving
    default void open(InitializationContext context) throws Exception {}
    T deserialize(byte[] message) throws IOException;
    @PublicEvolving
    default void deserialize(byte[] message, Collector<T> out) throws IOException {
        T deserialize = deserialize(message);
        if (deserialize != null) {
            out.collect(deserialize);
        }
    }
    boolean isEndOfStream(T nextElement);
}


KafkaRecordDeserializationSchema 源码


/** An interface for the deserialization of Kafka records. */
public interface KafkaRecordDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    @PublicEvolving
    default void open(DeserializationSchema.InitializationContext context) throws Exception {}
    @PublicEvolving
    void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out) throws IOException;
    static <V> KafkaRecordDeserializationSchema<V> of(
            KafkaDeserializationSchema<V> kafkaDeserializationSchema) {
        return new KafkaDeserializationSchemaWrapper<>(kafkaDeserializationSchema);
    }
    static <V> KafkaRecordDeserializationSchema<V> valueOnly(
            DeserializationSchema<V> valueDeserializationSchema) {
        return new KafkaValueOnlyDeserializationSchemaWrapper<>(valueDeserializationSchema);
    }
    static <V> KafkaRecordDeserializationSchema<V> valueOnly(
            Class<? extends Deserializer<V>> valueDeserializerClass) {
        return new KafkaValueOnlyDeserializerWrapper<>(
                valueDeserializerClass, Collections.emptyMap());
    }
    static <V, D extends Configurable & Deserializer<V>>
            KafkaRecordDeserializationSchema<V> valueOnly(
                    Class<D> valueDeserializerClass, Map<String, String> config) {
        return new KafkaValueOnlyDeserializerWrapper<>(valueDeserializerClass, config);
    }
}


顾名思义,这两个都是反序列接口,并且都继承了 Serializable, ResultTypeQueryable 这两个接口。不同点是,deserialize 方法的参数不一样,KafkaDeserializationSchema 接口很明显是为反序列化 kafka 数据而生的。DeserializationSchema 接口可以反序列化任意二进制数据,更加具有通用性。所以这两个是同一级接口


如果你想要获取 kafka 的元数据信息选择实现 KafkaDeserializationSchema 接口就可以了,KafkaDeserializationSchema 接口还有 4 个静态方法,其中的 of 方法就是用来反序列化 ConsumerRecord 的,剩下的 3 个 valueOnly 是用来反序列化 kafka 消息中的 value 的.


到这里就非常清楚了,如果我们要自定义序列化类,实现 DeserializationSchema 和 KafkaRecordDeserializationSchema 任何一个都是可以的.下面就以 KafkaRecordDeserializationSchema 接口为例,实现一个简单的反序列化类.


MyKafkaDeserialization 自定义序列化类


package flink.stream.deserialization;
import bean.Jason;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;
public class MyKafkaDeserialization implements KafkaDeserializationSchema<Jason> {
    private static final Logger log = Logger.getLogger(MyKafkaDeserialization.class);
    private final String encoding = "UTF8";
    private boolean includeTopic;
    private boolean includeTimestamp;
    public MyKafkaDeserialization(boolean includeTopic, boolean includeTimestamp) {
        this.includeTopic = includeTopic;
        this.includeTimestamp = includeTimestamp;
    }
    @Override
    public TypeInformation<Jason> getProducedType() {
        return TypeInformation.of(Jason.class);
    }
    @Override
    public boolean isEndOfStream(Jason nextElement) {
        return false;
    }
    @Override
    public Jason deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
        if (consumerRecord != null) {
            try {
                String value = new String(consumerRecord.value(), encoding);
                Jason jason = JSON.parseObject(value, Jason.class);
                if (includeTopic) jason.setTopic(consumerRecord.topic());
                if (includeTimestamp) jason.setTimestamp(consumerRecord.timestamp());
                return jason;
            } catch (Exception e) {
                log.error("deserialize failed : " + e.getMessage());
            }
        }
        return null;
    }
}


整个实现是非常简单的,这样就可以把消费到的数据反序列化成自己想要的格式,虽然 Flink 1.14.0 重构了 Source 接口,但是反序列化接口几乎没变,只不过在原有的基础上增加了几个方法而已.


使用


KafkaSource<Jason> source = KafkaSource.<Jason>builder()
        .setProperty("security.protocol", "SASL_PLAINTEXT")
        .setProperty("sasl.mechanism", "PLAIN")
        .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";")
        // discover new partitions per 10 seconds
        .setProperty("partition.discovery.interval.ms", "10000")
        .setBootstrapServers(broker)
        .setTopics(topic)
        .setGroupId(group_id)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserialization(true, true)))
        // 只反序列化 value
        .setValueOnlyDeserializer(new MyDeSerializer())
        .build();


setDeserializer 和 setValueOnlyDeserializer 只用设置一个即可.

相关文章
|
27天前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
26天前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
1月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
22天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22天前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
27天前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
1月前
|
Java API 数据处理
实时计算 Flink版产品使用问题之遇到org.codehaus.janino.CompilerFactory类找不到,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
28天前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
65 9
|
1月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
52 3