(十七) 整合spring cloud云架构 -消息驱动 Spring Cloud Stream

简介: springboot

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制。spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施:

  1. 简介:

Spring cloud Stream 数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。

  1. 使用工具:

rabbit,具体的下载和安装细节我这里不做太多讲解,网上的实例太多了

  1. 创建commonservice-mq-producer消息的发送者项目,在pom里面配置stream-rabbit的依赖

    <dependency>  
       <groupId>org.springframework.cloud</groupId>  
       <artifactId>spring-cloud-starter-stream-rabbit</artifactId>  
    </dependency></span>  
    1. 在yml文件里面配置rabbit mq
     port: 5666  
    spring:  
     application:  
       name: commonservice-mq-producer  
     profiles:   
       active: dev  
     cloud:  
       config:  
         discovery:   
           enabled: true  
           service-id: commonservice-config-server  
     <span style="color: #ff0000;"># rabbitmq和kafka都有相关配置的默认值,如果修改,可以再次进行配置  
       stream:  
         bindings:  
           mqScoreOutput:   
             destination: honghu_exchange  
             contentType: application/json  
               
     rabbitmq:  
        host: localhost  
        port: 5672  
        username: honghu  
        password: honghu</span>  
    eureka:   
     client:  
       service-url:  
         defaultZone: http://honghu:123456@localhost:8761/eureka  
     instance:  
       prefer-ip-address: true</span>  
    1. 定义接口ProducerService
<span style="font-size: 16px;">package com.honghu.cloud.producer;  
  
import org.springframework.cloud.stream.annotation.Output;  
import org.springframework.messaging.SubscribableChannel;  
  
public interface ProducerService {  
      
    String SCORE_OUPUT = "mqScoreOutput";  
      
    @Output(ProducerService.SCORE_OUPUT)  
    SubscribableChannel sendMessage();  
}</span>  
  1. 定义绑定
<span style="font-size: 16px;">package com.honghu.cloud.producer;  
  
import org.springframework.cloud.stream.annotation.EnableBinding;  
  
@EnableBinding(ProducerService.class)  
public class SendServerConfig {  
  
}</span>  
  1. 定义发送消息业务ProducerController
<span style="font-size: 16px;">package com.honghu.cloud.controller;  
  
  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.integration.support.MessageBuilder;  
import org.springframework.messaging.Message;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RequestBody;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RequestMethod;  
import org.springframework.web.bind.annotation.RestController;  
  
import com.honghu.cloud.common.code.ResponseCode;  
import com.honghu.cloud.common.code.ResponseVO;  
import com.honghu.cloud.entity.User;  
import com.honghu.cloud.producer.ProducerService;  
  
import net.sf.json.JSONObject;  
  
@RestController  
@RequestMapping(value = "producer")  
public class ProducerController {  
      
    @Autowired  
    private ProducerService producerService;  
      
      
    /** 
     * 通过get方式发送</span>对象<span style="font-size: 16px;"> 
     * @param name 路径参数 
     * @return 成功|失败 
     */  
    @RequestMapping(value = "/sendObj", method = RequestMethod.GET)  
    public ResponseVO sendObj() {  
        User user = new User(1, "hello User");  
        <span style="color: #ff0000;">Message<User> msg = MessageBuilder.withPayload(user).build();</span>  
        boolean result = producerService.sendMessage().send(msg);  
        if(result){  
            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);  
        }  
        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);  
    }  
      
      
    /** 
     * 通过get方式发送字符串消息 
     * @param name 路径参数 
     * @return 成功|失败 
     */  
    @RequestMapping(value = "/send/{name}", method = RequestMethod.GET)  
    public ResponseVO send(@PathVariable(value = "name", required = true) String name) {  
        Message msg = MessageBuilder.withPayload(name.getBytes()).build();  
        boolean result = producerService.sendMessage().send(msg);  
        if(result){  
            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);  
        }  
        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);  
    }  
      
    /** 
     * 通过post方式发送</span>json对象<span style="font-size: 16px;"> 
     * @param name 路径参数 
     * @return 成功|失败 
     */  
    @RequestMapping(value = "/sendJsonObj", method = RequestMethod.POST)  
    public ResponseVO sendJsonObj(@RequestBody JSONObject jsonObj) {  
        Message<JSONObject> msg = MessageBuilder.withPayload(jsonObj).build();  
        boolean result = producerService.sendMessage().send(msg);  
        if(result){  
            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);  
        }  
        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);  
    }  
}  
</span>  
  1. 创建commonservice-mq-consumer1消息的消费者项目,在pom里面配置stream-rabbit的依赖

    <!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 -->  
    <dependency>  
       <groupId>org.springframework.cloud</groupId>  
       <artifactId>spring-cloud-starter-stream-rabbit</artifactId>  
    </dependency>  
  2. 在yml文件中配置:
server:  
  port: 5111  
spring:  
  application:  
    name: commonservice-mq-consumer1  
  profiles:   
    active: dev  
  cloud:  
    config:  
      discovery:   
        enabled: true  
        service-id: commonservice-config-server  
          
    <span style="color: #ff0000;">stream:  
      bindings:  
        mqScoreInput:  
          group: honghu_queue  
          destination: honghu_exchange  
          contentType: application/json  
            
  rabbitmq:  
     host: localhost  
     port: 5672  
     username: honghu  
     password: honghu</span>  
eureka:   
  client:  
    service-url:  
      defaultZone: http://honghu:123456@localhost:8761/eureka  
  instance:  
    prefer-ip-address: true 
  1. 定义接口ConsumerService
package com.honghu.cloud.consumer;  
  
import org.springframework.cloud.stream.annotation.Input;  
import org.springframework.messaging.SubscribableChannel;  
  
public interface ConsumerService {  
      
    <span style="color: #ff0000;">String SCORE_INPUT = "mqScoreInput";  
  
    @Input(ConsumerService.SCORE_INPUT)  
    SubscribableChannel sendMessage();</span>  
  
}  
  1. 定义启动类和消息消费
package com.honghu.cloud;  
  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;  
import org.springframework.cloud.stream.annotation.EnableBinding;  
import org.springframework.cloud.stream.annotation.StreamListener;  
  
import com.honghu.cloud.consumer.ConsumerService;  
import com.honghu.cloud.entity.User;  
  
@EnableEurekaClient  
@SpringBootApplication  
@EnableBinding(ConsumerService.class) //可以绑定多个接口  
public class ConsumerApplication {  
      
    public static void main(String[] args) {  
        SpringApplication.run(ConsumerApplication.class, args);  
    }  
      
    <span style="color: #ff0000;">@StreamListener(ConsumerService.SCORE_INPUT)  
    public void onMessage(Object obj) {  
        System.out.println("消费者1,接收到的消息:" + obj);  
    }</span>  
  
}  
  1. 分别启动commonservice-mq-producer、commonservice-mq-consumer1
  2. 通过postman来验证消息的发送和接收

image.png
image.png

image.png
image.png
image.png
image.png
可以看到接收到了消息,下一章我们介绍mq的集群方案。

到此,整个消息中心方案集成完毕(企业架构源码可以加求球:叁五三陆二肆柒二伍玖)!!

欢迎大家和我一起学习spring cloud构建微服务云架构,我这边会将近期研发的spring cloud微服务云架构的搭建过程和精髓记录下来,帮助更多有兴趣研发spring cloud框架的朋友,大家来一起探讨spring cloud架构的搭建过程及如何运用于企业项目。

目录
相关文章
|
2月前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
373 37
|
2月前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
1896 15
|
4月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
473 15
|
3月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
63 0
|
3月前
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
|
4月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
109 3
|
4月前
|
消息中间件 Java Nacos
通用快照方案问题之通过Spring Cloud实现配置的自动更新如何解决
通用快照方案问题之通过Spring Cloud实现配置的自动更新如何解决
75 0
|
4月前
|
缓存 监控 Java
通用快照方案问题之Spring Boot Admin的定义如何解决
通用快照方案问题之Spring Boot Admin的定义如何解决
62 0
|
4月前
|
监控 NoSQL Java
通用快照方案问题之Martin Flower提出的微服务之间的通信如何解决
通用快照方案问题之Martin Flower提出的微服务之间的通信如何解决
42 0