Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。
SpringCloudStream应用模型
一、引入依赖包
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
二、自定义信息通道
官方提供了Sink(输入通道)、Source(输出通道)、Processor(集成Sink和Source通道),我们也可以自定义我们自己的信息通道。
@Input注解标识一个输入通道
@Output注解标识一个输出通道
通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。
如下我们自定义信息通道EsChannel
/** * 自定义信息通道 * @author dbq * @date 2019/9/26 14:54 */ public interface EsChannel { /** * 缺省发送消息通道名称 */ String ES_DEFAULT_OUTPUT = "es_default_output"; /** * 缺省接收消息通道名称 */ String ES_DEFAULT_INPUT = "es_default_input"; /** * 告警发送消息通道名称 */ String ES_ALARM_OUTPUT = "es_alarm_output"; /** * 告警接收消息通道名称 */ String ES_ALARM_INPUT = "es_alarm_input"; /** * 缺省发送消息通道 * @return channel 返回缺省信息发送通道 */ @Output(ES_DEFAULT_OUTPUT) MessageChannel sendEsDefaultMessage(); /** * 告警发送消息通道 * @return channel 返回告警信息发送通道 */ @Output(ES_ALARM_OUTPUT) MessageChannel sendEsAlarmMessage(); /** * 缺省接收消息通道 * @return channel 返回缺省信息接收通道 */ @Input(ES_DEFAULT_INPUT) MessageChannel recieveEsDefaultMessage(); /** * 告警接收消息通道 * @return channel 返回告警信息接收通道 */ @Input(ES_ALARM_INPUT) MessageChannel recieveEsAlarmMessage(); }
三、@EnableBinding使应用程序连接到消息代理
@EnableDiscoveryClient @SpringBootApplication @EnableFeignClients @EnableHystrix @MapperScan(basePackages = "com.es.mapper") @EnableBinding(EsChannel.class) public class EsOnenetApplication { public static void main(String[] args) { SpringApplication.run(EsOnenetApplication.class, args); } }
四、SpringCloudStream及kafka配置
#============================================================== #spring-cloud-stream-Kafka配置 开始 #============================================================== #是否开启kafka(非spring-cloud-stream配置) spring.kafka.enabled=false #缺省的输入、输出通道 spring.cloud.stream.bindings.es_default_input.destination=es_default_topic spring.cloud.stream.bindings.es_default_input.binder=kafka spring.cloud.stream.bindings.es_default_input.group=es_default_group spring.cloud.stream.bindings.es_default_output.destination=es_default_topic spring.cloud.stream.bindings.es_default_output.binder=kafka #入站消费者的并发性 spring.cloud.stream.bindings.es_default_input.consumer.concurrency=2 #告警的输入、输出通道(多主题、分组测试用,实际开发中根据业务需求定义) spring.cloud.stream.bindings.es_alarm_input.destination=es_alarm_topic spring.cloud.stream.bindings.es_alarm_input.binder=kafka spring.cloud.stream.bindings.es_alarm_input.group=es_alarm_group spring.cloud.stream.bindings.es_alarm_output.destination=es_alarm_topic spring.cloud.stream.bindings.es_alarm_output.binder=kafka #kafka配置 spring.cloud.stream.kafka.binder.brokers=172.*.*.6:9092,172.*.*.7:9092,172.*.*.8:9092 spring.cloud.stream.kafka.binder.zkNodes=172.*.*.6:2181,172.*.*.7:2181,172.*.*.8:2181 spring.cloud.stream.kafka.binder.requiredAcks=1 #============================================================== #spring-cloud-stream-Kafka配置 结束 #==============================================================
从上面配置可以看出
1、定义了通道名称及分组,binder代表绑定实现的标识名称(如kafka或者rabbit),与3中的定义名称相对应。
2、定义了入站消费者的并发性,指在一个实例内的并发性,不同实例之间本身就是并发的,默认值为1
spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2
3、定义了kafka连接信息
如果未配置autoCommitOffset,默认自动提交偏移量
详细参数配置可参考官网