SpringBoot接入Kafka

简介: 相信大家在遇到洪峰流量的时候都是考虑过消息队列这个东西的,市面上有很多主流的消息队列模型,今天呢,我就使用SpringBoot项目简单接入一下主流的Kafka消息队列。
相信大家在遇到洪峰流量的时候都是考虑过消息队列这个东西的,市面上有很多主流的消息队列模型,今天呢,我就使用SpringBoot项目简单接入一下主流的Kafka消息队列。
1.安装Kafka消息队列

这个Kafka的安装我这里就不过多的介绍了,我之前有发布过一篇安装Kafka3.0的文章,有需要的可以点击文章去查阅一下。

2.新建项目导入依赖
<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
3.application.yml配置Kafka参数
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:8848
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 1
      acks: 1
      batch-size: 65535
      properties:
        linger:
          ms: 0
      buffer-memory: 33554432
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: true
      auto-commit-interval: 1000ms
      auto-offset-reset: latest
      properties:
        session:
          timeout:
            ms: 120000
        request:
          timeout:
            ms: 180000
    listener:
      missing-topics-fatal: false
4.生产者生产消息

Kafka接入SpringBoot提供了kafkaTemplate模版,直接调用API发送消息即可。

package cn.youhaveme.controller;

import cn.hutool.core.util.IdUtil;
import cn.youhaveme.kafka.constant.TopicConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/sms")
@RestController
@Slf4j
public class SmsController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping("/send")
    public void send(@RequestBody String requestParam) {
        try {
            kafkaTemplate.send(TopicConst.SMS, IdUtil.getSnowflakeNextIdStr(), "1");
        } catch (Exception e) {
            log.error("Kafka发送消息异常", e);
        }
    }
}
5.消费者消费消息

@KafkaListener这个注解就是消费者注解,可以消费指定主题的消息,相对而言比代码消费要简单一些,一个注解即可搞定实现消息消费。

package cn.youhaveme.kafka.consumer;

import cn.youhaveme.kafka.constant.TopicConst;
import cn.youhaveme.service.SmsService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
@Slf4j
public class SmsConsumer {
    @Autowired
    private SmsService smsService;

    @KafkaListener(id = "smsConsumer", topics = TopicConst.SMS, groupId = "sms")
    public void smsConsumer(ConsumerRecord<String, String> record) {
        try {
            log.info("开始消费:{}", record.value());
        } catch (Exception e) {
            log.error("【Kafka消费短信异常】:{}--{}--{}", record.topic(), record.partition(), record.key(), e);
        }
    }
}

至此,通过SpringBoot接入Kafka消息队列的过程就此完成了,这只是一个简单的接入,目的是让大家大致了解KafkaApi的使用,更深层次的讲解下次再会。

目录
相关文章
|
23天前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
37 3
|
1月前
|
消息中间件 Java Kafka
|
1月前
|
消息中间件 Java Kafka
|
20天前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
57 0
|
1月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
|
2月前
|
消息中间件 Java Kafka
spring boot 整合kafka
spring boot 整合kafka
33 8
|
1月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
29 0
|
2月前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成
|
消息中间件 Java Kafka
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
17329 1
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
|
4月前
|
消息中间件 Java Kafka
spring boot 集成kafka
spring boot 集成kafka
82 0
spring boot 集成kafka