相信大家在遇到洪峰流量的时候都是考虑过消息队列这个东西的,市面上有很多主流的消息队列模型,今天呢,我就使用SpringBoot项目简单接入一下主流的Kafka消息队列。
1.安装Kafka消息队列
这个Kafka的安装我这里就不过多的介绍了,我之前有发布过一篇安装Kafka3.0的文章,有需要的可以点击文章去查阅一下。
2.新建项目导入依赖
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3.application.yml配置Kafka参数
spring:
kafka:
bootstrap-servers: 127.0.0.1:8848
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 1
acks: 1
batch-size: 65535
properties:
linger:
ms: 0
buffer-memory: 33554432
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: true
auto-commit-interval: 1000ms
auto-offset-reset: latest
properties:
session:
timeout:
ms: 120000
request:
timeout:
ms: 180000
listener:
missing-topics-fatal: false
4.生产者生产消息
Kafka接入SpringBoot提供了kafkaTemplate模版,直接调用API发送消息即可。
package cn.youhaveme.controller;
import cn.hutool.core.util.IdUtil;
import cn.youhaveme.kafka.constant.TopicConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/sms")
@RestController
@Slf4j
public class SmsController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/send")
public void send(@RequestBody String requestParam) {
try {
kafkaTemplate.send(TopicConst.SMS, IdUtil.getSnowflakeNextIdStr(), "1");
} catch (Exception e) {
log.error("Kafka发送消息异常", e);
}
}
}
5.消费者消费消息
@KafkaListener这个注解就是消费者注解,可以消费指定主题的消息,相对而言比代码消费要简单一些,一个注解即可搞定实现消息消费。
package cn.youhaveme.kafka.consumer;
import cn.youhaveme.kafka.constant.TopicConst;
import cn.youhaveme.service.SmsService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
@Slf4j
public class SmsConsumer {
@Autowired
private SmsService smsService;
@KafkaListener(id = "smsConsumer", topics = TopicConst.SMS, groupId = "sms")
public void smsConsumer(ConsumerRecord<String, String> record) {
try {
log.info("开始消费:{}", record.value());
} catch (Exception e) {
log.error("【Kafka消费短信异常】:{}--{}--{}", record.topic(), record.partition(), record.key(), e);
}
}
}
至此,通过SpringBoot接入Kafka消息队列的过程就此完成了,这只是一个简单的接入,目的是让大家大致了解KafkaApi的使用,更深层次的讲解下次再会。