Apache Kafka - ConsumerInterceptor 实战(2)

简介: Apache Kafka - ConsumerInterceptor 实战(2)

20191116123525638.png



Pre

Apache Kafka - ConsumerInterceptor 实战 (1) 用代码的方式实现了ConsumerInterceptor , 接下来我们用 配置的方式来实现一下 。


思路

如何找配置类

KafkaProperties

5b0110c7e372455ea98d692ededf89e8.png


有些属性是很明显的有的,其他没有的一般都在 Map里


8859f6b86d47461fa81376f2ff8bd6d5.png


那map的 key value 从哪里找呢?

找原生的配置 Kafka Consumer的 都在 ConsumerConfig


979eab0aec934f2e89a8b6ab7915278e.png


找到

public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";


c81113a14b844dd897e09de4b6d27424.png

OK,继续


示例

配置文件



652452561ed74b6b945782037ca78041.png


自定义 拦截器

package net.zf.module.system.kafka.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @author artisan
 */
@Slf4j
@Component
public class FailureRateInterceptor implements ConsumerInterceptor<Object, Object> {
    /**
     * 消息消费前的拦截处理
     *
     * @param consumerRecords
     * @return
     */
    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
        // TODO
        log.info("FailureRateInterceptor#onConsume");
        // 根据设定的规则计算失败率,并进行判断是否跳过消息的消费
        // 返回ConsumerRecords对象, 继续执行下游的消费逻辑或者直接返回空的ConsumerRecords对象 (ConsumerRecords.EMPTY)
        return consumerRecords;
    }
    /**
     * 消息提交前进行拦截处理
     *
     * @param map
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        log.info("FailureRateInterceptor#onCommit");
    }
    /**
     * 拦截器关闭前进行拦截处理(如果有的话)
     */
    @Override
    public void close() {
        log.info("FailureRateInterceptor#close");
    }
    /**
     * 初始化配置(如果有的话)
     *
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {
        log.info("FailureRateInterceptor#configure");
    }
}


使用


ccf608e13a1c4cb9b23b27875ee4ea1d.png


测试

启动服务,发送消息,进行消费


405a54138cf049c18878ff2cb3079aa6.png

小结

在Spring Boot中配置Kafka消费者的拦截器需要进行以下步骤:


首先,创建一个拦截器类,实现Kafka的ConsumerInterceptor接口,定义拦截器的逻辑。

在应用的配置文件(例如application.properties或application.yml)中,添加拦截器相关的配置项,其中包括设置interceptor.class属性为拦截器类的全限定名。

下面是一个示例,演示如何在Spring Boot中配置Kafka消费者的拦截器:


创建拦截器类:

@Slf4j
@Component
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
        // 在消息消费前的处理逻辑
        // ...
        return records;
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // 在消息提交前的处理逻辑
        // ...
    }
    @Override
    public void close() {
        // 拦截器关闭前的处理逻辑
        // ...
    }
    @Override
    public void configure(Map<String, ?> configs) {
        // 初始化配置的处理逻辑
        // ...
    }
}


  1. 在应用的配置文件中设置拦截器相关的配置项:
spring.kafka.consumer.properties.interceptor.classes=com.example.MyConsumerInterceptor


或者在application.yml文件中:

spring:
  kafka:
    consumer:
      properties:
        interceptor.classes: com.example.MyConsumerInterceptor


这样配置之后,Spring Boot会自动创建Kafka消费者,并将指定的拦截器应用于消费者。在消费者处理消息的过程中,拦截器的方法将会被调用,可以在这些方法中编写自定义的逻辑来处理消息或拦截操作。

相关文章
|
2月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
137 7
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
103 5
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
74 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
58 1
|
2月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
2月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
56 0
|
24天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
312 33
The Past, Present and Future of Apache Flink
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
888 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
113 3
|
4月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。

热门文章

最新文章

推荐镜像

更多