SpringBoot集成阿里云Kafka 示例

简介: 消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。本文主要演示如何使用SpringBoot 通过公网方式集成阿里云Kakfa。

Step By Step

1、kafka控制台创建公网类型实例
2、创建SpringBoot项目集成阿里云Kafka
3、发送接收测试


一、kafka控制台创建公网类型实例

1.1 Kafka控制台创建实例

图片.png

1.2 获取认证参数

图片.png

图片.png

二、创建SpringBoot项目集成阿里云Kafka

2.1 创建Spring Boot(2.5.2)项目

图片.png

2.2 依赖

 <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2.3 Sender.class

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    public void send(String msg) {
        this.template.sendDefault("my_msg", msg);
        System.out.println("send message:" + msg);
    }
}

2.4 Receiver.class

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {
    
    @KafkaListener(topics = { "taro_topic" }) // 参数配置要监听的Topic
    public void receiveMessage(ConsumerRecord<String, String> record) {
        System.out.println("Receive Message");
        System.out.println("【*** Message: ***】key = " + record.key() + "、value = " + record.value());
    }
}

2.5 KafkaController.class


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private Sender sender;

    @PostMapping("/send/{msg}") // 发送消息测试,注意此处为Post
    public String send(@PathVariable("msg") String msg) {
        sender.send(msg);
        return msg;
    }
}

2.6 application.yml

spring:
  kafka:
    template:
      default-topic: <topic>
    bootstrap-servers: <SSL接入点>
    jaas:
      enabled: true
      loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
      options:
        username: <用户名>
        password: <密码>
    consumer:
      ssl:
        truststoreLocation: file:/kafka.client.truststore.jks
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:
      group-id: <group>
      max-poll-records: 2
    producer:
      ssl:
        truststoreLocation: file:/kafka.client.truststore.jks
      retries: 3
      acks: 1
      compression-type: lz4
      buffer-memory: 33554432
      batch-size: 51200
      properties:
        send.buffer.bytes: 262144
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:
kafka.client.truststore.jks 下载 地址,证书下载后直接放在C盘根目录下。

2.7 项目结构

图片.png

三、发送接收测试

3.1 启动项目,使用PostMan发送Post请求

图片.png

3.2 项目日志

图片.png

3.3 控制台消息监控查看

图片.png

更多参考

SSL接入点PLAIN机制收发消息

相关文章
|
10天前
|
XML Java API
Spring Boot集成MinIO
本文介绍了如何在Spring Boot项目中集成MinIO,一个高性能的分布式对象存储服务。主要步骤包括:引入MinIO依赖、配置MinIO属性、创建MinIO配置类和服务类、使用服务类实现文件上传和下载功能,以及运行应用进行测试。通过这些步骤,可以轻松地在项目中使用MinIO的对象存储功能。
|
12天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
43 5
|
1月前
|
安全 Java API
【三方服务集成】最新版 | 阿里云短信服务SMS使用教程(包含支持单双参数模板的工具类,拿来即用!)
阿里云短信服务提供API/SDK和控制台调用方式,支持验证码、通知、推广等短信类型。需先注册阿里云账号并实名认证,然后在短信服务控制台申请资质、签名和模板,并创建AccessKey。最后通过Maven引入依赖,使用工具类发送短信验证码。
【三方服务集成】最新版 | 阿里云短信服务SMS使用教程(包含支持单双参数模板的工具类,拿来即用!)
|
14天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
22 1
|
16天前
|
存储 Prometheus 运维
在云原生环境中,阿里云ARMS与Prometheus的集成提供了强大的应用实时监控解决方案
在云原生环境中,阿里云ARMS与Prometheus的集成提供了强大的应用实时监控解决方案。该集成结合了ARMS的基础设施监控能力和Prometheus的灵活配置及社区支持,实现了全面、精准的系统状态、性能和错误监控,提升了应用的稳定性和管理效率。通过统一的数据视图和高级查询功能,帮助企业有效应对云原生挑战,促进业务的持续发展。
25 3
|
1月前
|
XML Java 数据库连接
SpringBoot集成Flowable:打造强大的工作流管理系统
在企业级应用开发中,工作流管理是一个核心组件,它能够帮助我们定义、执行和管理业务流程。Flowable是一个开源的工作流和业务流程管理(BPM)平台,它提供了强大的工作流引擎和建模工具。结合SpringBoot,我们可以快速构建一个高效、灵活的工作流管理系统。本文将探讨如何将Flowable集成到SpringBoot应用中,并展示其强大的功能。
104 1
|
15天前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
27 0
|
2月前
|
安全 Java 编译器
springboot 整合表达式计算引擎 Aviator 使用示例详解
本文详细介绍了Google Aviator 这款高性能、轻量级的 Java 表达式求值引擎
168 6
|
1月前
|
JSON Java API
springboot集成ElasticSearch使用completion实现补全功能
springboot集成ElasticSearch使用completion实现补全功能
38 1
|
1月前
|
XML 存储 Java
SpringBoot集成Flowable:构建强大的工作流引擎
在企业级应用开发中,工作流管理是核心功能之一。Flowable是一个开源的工作流引擎,它提供了BPMN 2.0规范的实现,并且与SpringBoot框架完美集成。本文将探讨如何使用SpringBoot和Flowable构建一个强大的工作流引擎,并分享一些实践技巧。
80 0