开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段):SQL语法过滤】学习笔记,与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/702/detail/12388
SQL语法过滤
内容介绍:
一、绑定属性
二、限制条件
一、绑定属性
结构还是一个生产者和一个消费者,consumer 和 producer 主题改为filtersqltopic,再给消息绑定一个自定义的属性,
代码示例:
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主 题Topic、tog 和消息体
/**
*参数一:消息主题 Topic
*参数二:消息Tag
*参数三:消息内容
*/
Messagemsg=new Message(topic: “FilterSQLTopic”,taqs:"Tag1”,(“Hello World*+i).getBytes());
msg.putuerProperty( name:’’i’’string,valueof(i));
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
Sendstatus status=result.getSendstatus();
System.out.println("发送结果:"+result);
//线程睡一秒
TimeUnit.SECONDS.sleep(timeout1);
}
//16.关闭生产者 producer
pnoducer.shutdowm();
}
二、限制条件
启动消费者,限制条件为i>5,因此可以得到发送了i为6、7、8、9的消息,
public class consumer{
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,指定消费者组名
DefaultNOPushConsumerconsumer=newDefaultMQPushConsumer(consumerGroup: “group1”);
//2.指定 nameserver 地址
consumer.setNamesrvAddr("192.168.25 .135:9876;192.168.25.138:9876”);
//3.订阅主题Topic和Tag
consumer.subscribe(topic:"FiltersQLTopic”,
MessageSelector.bySql("i>5"));
//设置回调函数,处理消息
consumer.registerMessageListener((MessageListenerConcurrently)(msgs;context){
for (MessageExt msg :msgs){
System.out.println("consumeThreadm=”+Thread.currentThread()-getName() +","*+new string(msg.getbody()));
return ConsumeConcurrentlystatus.CONSUME_SUCCESS;
})
Consumer :
consuneThread=ConsumeMessageThread_1,Hello world 6
consumeThread=ConsumeMessageThread_2,Helloworld7 consumeThreed=ConsumeMessageThread 3 Hello world8
consumeThread=ConsumeMessageThread_4,Hello World9
虽然对方发送了十条,但根据用户的属性,过滤了消息,这就是由 sql 过滤得到
的。