使用 RocketMQ Binder的底层处理原理是什么?
"1. @EnableBinding 对应的两个接口属性 Source 和 Sink 是 SCS 内部提供的。 SCS 内部会基于 Source 和 Sink 构造 BindableProxyFactory,且对应的 output 和 input 方法返回的 MessageChannel 是 DirectChannel。output 和 input 方法修饰的注解对应的 value 是配置文件中 binding 的 name。 public interface Source { String OUTPUT = ""output""; @Output(Source.OUTPUT) MessageChannel output();}public interface Sink { String INPUT = ""input""; @Input(Sink.INPUT) SubscribableChannel input(); } 配置文件里 bindings 的 name 为 output 和 input,对应 Source 和 Sink 接口的方法上的注解里的 value: spring.cloud.stream.bindings.output.destination=test-topic spring.cloud.stream.bindings.output.content-type=text/plain spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group spring.cloud.stream.bindings.input.destination=test-topic spring.cloud.stream.bindings.input.content-type=text/plain spring.cloud.stream.bindings.input.group=test-group1 2. 构造 CommandLineRunner,程序启动的时候会执行 CustomRunner 的 run 方法。 3. 调用 Source 接口里的 output 方法获取 DirectChannel,并发送消息到这个消息通道中。这里跟之前 Spring Integration 章节里的代码一致。 Source 里的 output 发送消息到 DirectChannel 消息通道之后会被 AbstractMe ssageChannelBinder#SendingHandler 这个 MessageHandler 处理,然后它会委托给 AbstractMessageChannelBinder#createProducerMessageHandler 创建的 MessageHandler 处理(该方法由不同的消息中间件实现); 不同的消息中间件对应的 AbstractMessageChannelBinder#createProducerMess ageHandler 方法返回的 MessageHandler 内部会把 Spring Message 转换成对应中间件的 Message 模型并发送到对应中间件的 broker; 4. 使用 @StreamListener 进行消息的订阅。请注意,注解里的 Sink.input 对应的值是 ""input"",会根据配置文件里 binding 对应的 name 为 input 的值进行配置: 不同的消息中间件对应的 AbstractMessageChannelBinder#createConsumerEnd point 方法会使用 Consumer 订阅消息,订阅到消息后内部会把中间件对应的 Message 模型转换成 Spring Message; 消息转换之后会把 Spring Message 发送至 name 为 input 的消息通道中; @StreamListener 对应的 StreamListenerMessageHandler 订阅了 name 为 input 的消息通道,进行了消息的消费; 这本电子书收录开发者藏经阁,下载连接:https://developer.aliyun.com/topic/download?id=1216"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。