SpringCloud学习笔记(五)-SpringCloudStream集成kafka(下)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。

五、发送消息到输出通道


/**
 * kafka消息发送器
 * @author dbq
 * @date 2019/9/26 17:50
 */
@Component
public class EsKafkaMessageSender {
    @Autowired
    private EsChannel channel;
    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     * @param message
     */
    public void sendToDefaultChannel(String message){
        channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
    }
    /**
     * 消息发送到告警通道:告警通道对应告警主题
     * @param message
     */
    public void sendToAlarmChannel(String message){
        channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
    }
}


注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。


六、从输入通道订阅消息


@EnableBinding(value = EsChannel.class)
public class EsStreamListener {
    /**
     * 从缺省通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_DEFAULT_INPUT)
    public void receive(Message<String> message){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        System.out.println(sdf.format(new Date())+"------start--------安全用电默认消息:" + message);
        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sdf.format(new Date())+"------end--------安全用电默认消息");
    }
    /**
     * 从告警通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_ALARM_INPUT)
    public void receiveAlarm(Message<String> message){
        System.out.println("订阅告警消息:" + message);
    }
}


从不同的通道实现消息的订阅。


七、这样完整的消息系统就搭建好了,定义Controller发送消息测试


@ApiOperation(value = "test1", httpMethod = "POST")
    @PostMapping(value = "/test1", produces = "application/json;charset=UTF-8")
    public void test1(String message, HttpServletRequest request,
                             HttpServletResponse response) {
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
    }
    @ApiOperation(value = "test", httpMethod = "POST")
    @PostMapping(value = "/test2", produces = "application/json;charset=UTF-8")
    public void test2(String message, HttpServletRequest request,
                      HttpServletResponse response) {
        sender.sendToAlarmChannel(message);
    }


test1:发送消息的缺省消息通道


test2:发送消息到告警消息通道


八、并发性测试


如七中所示,一次发送4条消息到缺省消息通道中,并启动两个实例(即两个微服务组成一个小型集群),在并发性配置为1的情况下,即spring.cloud.stream.bindings.es_default_input.consumer.concurrency=1


实例1

2019-09-30 11:13:14------start--------默认消息...
2019-09-30 11:13:24------end--------默认消息


实例2

2019-09-30 11:13:14------start--------默认消息:...
2019-09-30 11:13:24------end--------默认消息
2019-09-30 11:13:24------start--------默认消息:...
2019-09-30 11:13:34------end--------默认消息
2019-09-30 11:13:34------start--------默认消息:...
2019-09-30 11:13:44------end--------默认消息


通过打印日志(日志做了简化处理)可以看出,两个实例之间是做到了并发消费,但是在1个实例内部,并没有并发消费。


如果将concurrency修改为2.


日志如下


实例1

2019-09-30 11:31:13------start--------:...
2019-09-30 11:31:13------start--------默认消息:...
2019-09-30 11:31:23------end--------默认消息
2019-09-30 11:31:23------end--------默认消息
2019-09-30 11:31:23------start--------默认消息:...
2019-09-30 11:31:33------end--------默认消息


实例2

2019-09-30 11:31:13------start--------默认消息:...
2019-09-30 11:31:23------end--------


从日志可以看出,实例1中实现了两个线程的并发消费。


相关文章
|
3月前
|
存储 数据可视化 Java
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
191 1
|
21天前
|
存储 JavaScript 开发工具
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
本次的.HarmonyOS Next ,ArkTS语言,HarmonyOS的元服务和DevEco Studio 开发工具,为开发者提供了构建现代化、轻量化、高性能应用的便捷方式。这些技术和工具将帮助开发者更好地适应未来的智能设备和服务提供方式。
55 8
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
88 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
71 1
|
5月前
|
jenkins 持续交付
jenkins学习笔记之六:共享库方式集成构建工具
jenkins学习笔记之六:共享库方式集成构建工具
|
5月前
|
Java jenkins Shell
jenkins学习笔记之五:Maven、Ant、Gradl、Node构建工具集成
jenkins学习笔记之五:Maven、Ant、Gradl、Node构建工具集成
|
5月前
|
jenkins 持续交付
jenkins学习笔记之九:jenkins认证集成github
jenkins学习笔记之九:jenkins认证集成github
|
5月前
|
安全 jenkins 持续交付
jenkins学习笔记之八:jenkins认证集成gitlab
jenkins学习笔记之八:jenkins认证集成gitlab
|
5月前
|
jenkins Devops 持续交付
jenkins学习笔记之七:jenkins集成LDAP用户认证
jenkins学习笔记之七:jenkins集成LDAP用户认证
|
5月前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
113 0