概述
在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。
在Spring Boot中,要实现动态的控制或关闭消费以及动态开启或关闭监听,可以使用Spring Kafka提供的一些功能。
思路
首先,需要配置Kafka消费者的相关属性。在Spring Boot中,可以通过在application.properties或application.yml文件中添加相应的配置来实现。
以下是一个示例配置:
spring.kafka.consumer.bootstrap-servers=<Kafka服务器地址> spring.kafka.consumer.group-id=<消费者组ID>
接下来,可以创建一个Kafka消费者,使用@KafkaListener
注解来指定要监听的Kafka主题,并编写相应的消息处理方法。例如:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { @KafkaListener(topics = "<Kafka主题>") public void receive(String message) { // 处理接收到的消息 } }
现在,你可以使用以下两种方法来控制或关闭消费以及动态开启或关闭监听:
方法1:使用@KafkaListener注解的autoStartup属性
@KafkaListener注解具有一个名为autoStartup的属性,可以用于控制是否自动启动消费者。默认情况下,它的值为true,表示自动启动。如果将其设置为false,则消费者将不会自动启动。
@KafkaListener(topics = "<Kafka主题>", autoStartup = "false") public void receive(String message) { // 处理接收到的消息 }
要在运行时动态启动消费者,你可以通过KafkaListenerEndpointRegistry
bean来手动启动:
@Autowired private KafkaListenerEndpointRegistry endpointRegistry; // 启动消费者 endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").start();
同样,你也可以使用stop()
方法来停止消费者:
// 停止消费者 endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").stop();
方法2:使用KafkaListenerEndpointRegistry
bean的pause()
和resume()
方法
KafkaListenerEndpointRegistry
bean提供了pause()
和resume()
方法,用于暂停和恢复消费者的监听。
@Autowired private KafkaListenerEndpointRegistry endpointRegistry; // 暂停消费者监听 endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").pause(); // 恢复消费者监听 endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").resume();
使用这些方法,可以在运行时动态地控制或关闭消费,以及动态地开启或关闭监听。
Code
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler; import org.springframework.kafka.listener.ContainerProperties; import java.util.HashMap; import java.util.Map; /** * @author artisan */ @Slf4j @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServer; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.consumer.enable-auto-commit}") private String enableAutoCommit; @Value("${spring.kafka.consumer.key-deserializer}") private String keyDeserializer; @Value("${spring.kafka.consumer.value-deserializer}") private String valueDeserializer; @Value("${spring.kafka.consumer.group-id}") private String group_id; @Value("${spring.kafka.consumer.max-poll-records}") private String maxPollRecords; @Value("${spring.kafka.consumer.max-poll-interval-ms}") private String maxPollIntervalMs; @Value("${spring.kafka.listener.concurrency}") private Integer concurrency; private final String consumerInterceptor = "net.zf.module.system.kafka.interceptor.FailureRateInterceptor"; /** * 消费者配置信息 */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(32); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollIntervalMs); props.put(ConsumerConfig.GROUP_ID_CONFIG,group_id); props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,consumerInterceptor ); return props; } /** * 消费者批量工厂 */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() { ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); factory.setBatchListener(true); factory.setConcurrency(concurrency); return factory; } /** * 异常处理器 * * @return */ @Bean public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() { return (message, exception, consumer) -> { // log.error("消息{} , 异常原因{}", message, exception.getMessage()); log.error("consumerAwareListenerErrorHandler called"); return null; }; } }
使用
@KafkaListener(topicPattern = KafkaTopicConstant.ATTACK_MESSAGE + ".*", containerFactory = "batchFactory", errorHandler = "consumerAwareListenerErrorHandler", id = "attackConsumer") public void processMessage(List<String> records, Acknowledgment ack) { log.info("AttackKafkaConsumer 当前线程 {} , 本次拉取的数据总量:{} ", Thread.currentThread().getId(), records.size()); try { List<AttackMessage> attackMessages = new ArrayList(); records.stream().forEach(record -> { messageExecutorFactory.process(KafkaTopicConstant.ATTACK_MESSAGE).execute(record, attackMessages); }); if (!attackMessages.isEmpty()) { attackMessageESService.addDocuments(attackMessages, false); } } finally { ack.acknowledge(); } }
在这段代码中,@KafkaListener注解表示这是一个Kafka消费者,
topicPattern参数指定了该消费者要监听的主题的模式,即以 KafkaTopicConstant.ATTACK_MESSAGE开头的所有主题。
containerFactory参数指定了用于创建Kafka监听器容器的工厂类别名。
errorHandler参数指定了用于处理监听器抛出异常的错误处理器。id参数指定了该消费者的ID。
在该消费者的方法中,当有消息到达时,records参数将包含一组消息记录,ack参数用于手动确认已经消费了这些消息。
在方法中,首先记录了当前线程ID和拉取的数据总量。将消息记录逐一处理,并将处理结果存储在一个名为attackMessages的列表中。如果列表不为空,则将其添加到ES搜索引擎中。
最后,手动确认已经消费了这些消息。
【控制】
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController public class KafkaConsumerController { @Autowired private KafkaListenerEndpointRegistry registry; /** * 开启监听 */ @GetMapping("/start") public void start() { // 判断监听容器是否启动,未启动则将其启动 if (!registry.getListenerContainer("attackConsumer").isRunning()) { log.info("start "); registry.getListenerContainer("attackConsumer").start(); } // 将其恢复 registry.getListenerContainer("attackConsumer").resume(); log.info("resume over "); } /** * 关闭监听 */ @GetMapping("/pause") public void pause() { // 暂停监听 registry.getListenerContainer("attackConsumer").pause(); log.info("pause"); } }
扩展
KafkaListenerEndpointRegistry
KafkaListenerEndpointRegistry是 Spring Kafka 提供的一个组件,用于管理 Kafka 消费者监听器的注册和启动。它是一个接口,提供了管理 Kafka 监听器容器的方法,如注册和启动监听器容器,以及暂停和恢复监听器容器等。
在 Spring Boot 应用程序中使用 @KafkaListener 注解时,Spring Kafka 会自动创建一个 KafkaListenerEndpointRegistry 实例,并使用它来管理所有的 Kafka 监听器容器。 它是 Spring Kafka 中的一个核心组件,用于实现 Kafka 消费者的监听和控制。