----------写在前面----------
近些年微服务越来越火,让我也忍不住想去一窥微服务究竟,讲到微服务,就离不开分布式,而分布式,也离不开消息队列,在消息队列中,RabbitMQ可以说是比较具有代表性的一款。
这里是一篇介绍消息队列以及各种消息队列产品对比的文章,讲得很好,有兴趣的可以看一看。
https://cloud.tencent.com/developer/article/1006035
在讲RabbitMQ之前,首先需要在电脑上安装和配置RabbitMQ,网络上已经有很多这类文章,如果懒得去搜索,可以看看这篇介绍如何安装配置RabbitMQ的文章。
https://blog.csdn.net/weixin_39735923/article/details/79288578
其中,在安装RabbitMQ的过程中,遇到了一个坑,在启用RabbltMQ的管理界面执行
rabbitmq-plugins enable rabbitmq_management
命令时,出现了以下这样的报错
可以在该指令前加上 .\ 即
.\rabbitmq-plugins enable rabbitmq_management
祝安装顺利 !!
-------正文------
基本概念
下面是在.Net中使用RabbitMQ要明白的一些名词概念。
综上所诉,他们之间的关系可以用我下面的 丑图 表示。
在图中,没有吧Routing key画出。Producer每一次发送消息,除了发出消息本身,还会随着消息带上一个routingKey,而且每一次将Exchange和Queue绑定,大体需要三个参数,
string queueName, string exchangeName, string routingKey
其中也有一个routingKey,但此RoutingKey非彼Routingkey。
大白话
对这个过程,我们可以理解为国家给灾区发送救灾物资,国家给当地政府划拨物资的时候,会规定,谁才能拿到这批物资,如(房子倒了的.家里有人受伤了的.家庭经济困难的)。
而当地政府在分配这批物资之前,为了方便物资的分配,会给每个家庭贴上一个标签,如
家庭A 经济困难
家庭B 房子倒了.经济困难
家庭C 家庭富有.房子倒了
家庭D 房子倒了的.家里有人受伤了的.家庭经济困难的
所以,发送消息时候的routingKey就是国家规定的那批物质分配规则。
而Exchange和Queue绑定时的RoutingKey可以理解为当地政府给每个家庭贴上的一个标签。
Exchange(交换机)转发消息的规则也有很多种:direct, topic, headers(不常用) 和 fanout,我们称之为交换类型。
我们可以把Exchange理解为分配这批物质的政府,现在国家规定了宏观的分配方向(发送消息时的routingKey),每个家庭也有了家庭情况的标签(绑定Exchange时的routingKey),但是这个物资具体怎么分,还是当地政府说了算。
Direct 严格按照国家规定来,只有房子倒了的,家里有人受伤了的而且家庭经济困难的才能分到救灾物资。 家庭D能分到
Fanout 只要是灾区的居民都能分到, 不管家庭情况如何。 家庭A\B\C\D都能分到
Topic 主题匹配: 只要家庭情况在国家规定分配规则内的,都能分到物资,但是家庭C分不到,因为他家太有钱了,这个条件不在国家的分配规则里。家庭A\B\D能分到
所以,我们在声明一个Exchange(交换机)的同时,还要指定该交换机的类型,即(当地政府怎么来分救灾物资)
其实,用这个例子,我是想说,生产者和消费者之间,就像国家与难民之间一样,国家只知道,我要帮助难民,但是难民有谁,物资能不能分到难民手里,还得当地政府说了算,你就说我这个例子恰不恰当吧!哈哈
好了,懂了概念,我们再来结合具体例子看看。
Fanout
Producer.cs的代码
using System; using System.Text; using RabbitMQ.Client; namespace _2_Publish { class Program { static void Main(string[] args) { //创建连接工厂 var factory = new ConnectionFactory() { HostName = "localhost" }; //创建连接 using (var connection = factory.CreateConnection()) { //创建会话 using (var chancel = connection.CreateModel()) { //生命交换机 chancel.ExchangeDeclare(exchange: "FanoutDemo", type: ExchangeType.Fanout); string readMsg = "helloWorld"; while (readMsg.ToLower() != "exit") { var body = Encoding.UTF8.GetBytes(readMsg); //给交换机发送消息 chancel.BasicPublish(exchange: "FanoutDemo", routingKey: "", body: body); Console.WriteLine($"成功发送消息{readMsg}"); Console.WriteLine("请输入要发送的内容!"); readMsg = Console.ReadLine(); } } } } } }
Customer.cs代码
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace _2_Receiver { class Program { static void Main(string[] args) { //创建连接工厂 var factory = new ConnectionFactory() { HostName = "localhost" }; //创建连接 using (var connection = factory.CreateConnection()) { //创建会话 using (var channel = connection.CreateModel()) { //声明一个Fanout类型的交换机 channel.ExchangeDeclare(exchange: "FanoutDemo", type: ExchangeType.Fanout); //声明一个消息队列并获取它的名字 var queueName = channel.QueueDeclare().QueueName; //把消息队列和交换机绑定 channel.QueueBind(exchange: "FanoutDemo", queue: queueName, routingKey: ""); //创建消费者 var consume = new EventingBasicConsumer(channel); //把消费者和队列绑定 channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume); consume.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"收到消息{message}"); }; Console.ReadLine(); } } } } }
在上面的代码中,无论是在生产者的发送消息里
//给交换机发送消息
chancel.BasicPublish(exchange: "FanoutDemo", routingKey: "", body: body);
还是消费者所在的Queue的绑定里
//把消息队列和交换机绑定
channel.QueueBind(exchange: "FanoutDemo", queue: queueName, routingKey: "");
我们都没有制定routingKey,因为没个人都能获取消息,所以此处,声明routingKey就没有意义了。
我们看看运行效果。
运行了三个消费者,当生产者发出消息时,三个消费者都收到了相同的消息。可以理解为广播模式。(Customer单词拼写错了,图片修改不方便,就不改了,大家将就一下)
Direct
Direct时严格匹配的,只有队列绑定的RoutingKey与生产者发送消息时指定的RoutingKey完全相同,才能接收成功。
Producer.cs
using System; using System.Text; using RabbitMQ.Client; namespace _2_Publish { class Program { static void Main(string[] args) { //创建连接工厂 var factory = new ConnectionFactory() { HostName = "localhost" }; //创建连接 using (var connection = factory.CreateConnection()) { //创建会话 using (var chancel = connection.CreateModel()) { //生命交换机 chancel.ExchangeDeclare(exchange: "DirectDemo", type: ExchangeType.Direct); string readMsg = "helloWorld"; while (readMsg.ToLower() != "exit") { var body = Encoding.UTF8.GetBytes(readMsg); //给交换机发送消息 chancel.BasicPublish(exchange: "DirectDemo", routingKey: "Direct.Key", body: body); Console.WriteLine($"成功发送消息{readMsg}"); Console.WriteLine("请输入要发送的内容!"); readMsg = Console.ReadLine(); } } } } } }
我把Exchange的类型更改为Direct类型,并且发送消息的routingKey设置为Direct.Key。
然后我们来定义消费者
Customer.CS
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace _2_Receiver { class Program { static void Main(string[] args) { //创建连接工厂 var factory = new ConnectionFactory() { HostName = "localhost" }; //创建连接 using (var connection = factory.CreateConnection()) { //创建会话 using (var channel = connection.CreateModel()) { //声明一个Fanout类型的交换机 channel.ExchangeDeclare(exchange: "DirectDemo", type: ExchangeType.Direct); //声明一个消息队列并获取它的名字 var queueName = channel.QueueDeclare().QueueName; Console.WriteLine("请输入RoutingKey!"); var routingKey = Console.ReadLine(); //把消息队列和交换机绑定 channel.QueueBind(exchange: "DirectDemo", queue: queueName, routingKey: routingKey); //创建消费者 var consume = new EventingBasicConsumer(channel); //把消费者和队列绑定 channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume); Console.WriteLine("开始监听消息"); consume.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"收到消息{message}"); }; Console.ReadLine(); } } } } }
消费者的RoutingKey再控制台输入,
运行效果如下:
可以看到,只有RoutingKey为Direct.Key的消费者才收到了生产者发出的消息。
Topic
RabbitMQ 中的 RouteKey 支持绑定键表达式写法,有两种主要的绑定键:
*(星号)可以代替一个单词.
# (井号) 可以代替0个或多个单词.
比如在下面这个图中(P为发送者,X为RabbitMQ中的Exchange,C为消费者,Q为队列)
在这个示例中,我们将发送一条关于动物描述的消息,也就是说 Name(routeKey) 字段中的内容包含 3 个单词。第一个单词是描述速度的(celerity),第二个单词是描述颜色的(colour),第三个是描述哪种动物的(species),它们组合起来类似:“..”。
然后在使用 CapSubscribe
绑定的时候,Q1绑定为 CapSubscribe["*.orange.*"]
, Q2 绑定为CapSubscribe["*.*.rabbit"]
和 [CapSubscribe["lazy.#]
。
那么,当发送一个名为 "quick.orange.rabbit" 消息的时候,这两个队列将会同时收到该消息。同样名为 lazy.orange.elephant
的消息也会被同时收到。另外,名为 "quick.orange.fox" 的消息将仅会被发送到Q1队列,名为 "lazy.brown.fox" 的消息仅会被发送到Q2。"lazy.pink.rabbit" 仅会被发送到Q2一次,即使它被绑定了2次。"quick.brown.fox" 没有匹配到任何绑定的队列,所以它将会被丢弃。
另外一种情况,如果你违反约定,比如使用 4个单词进行组合,例如 "quick.orange.male.rabbit",那么它将匹配不到任何的队列,消息将会被丢弃。
但是,假如你的消息名为 "lazy.orange.male.rabbit",那么他们将会被发送到Q2,因为 #(井号)可以匹配 0 或者多个单词。
我们结合代码来看一看。
Producer.cs
using System; using System.Text; using RabbitMQ.Client; namespace _2_Publish { class Program { static void Main(string[] args) { //创建连接工厂 var factory = new ConnectionFactory() { HostName = "localhost" }; //创建连接 using (var connection = factory.CreateConnection()) { //创建会话 using (var chancel = connection.CreateModel()) { //生命交换机 chancel.ExchangeDeclare(exchange: "TopicDemo", type: ExchangeType.Topic); string readMsg = "helloWorld"; while (readMsg.ToLower() != "exit") { var body = Encoding.UTF8.GetBytes(readMsg); //给交换机发送消息 chancel.BasicPublish(exchange: "TopicDemo", routingKey: "Topic.Demo.Key", body: body); Console.WriteLine($"成功发送消息{readMsg}"); Console.WriteLine("请输入要发送的内容!"); readMsg = Console.ReadLine(); } } } } } }
我给发送消息的routingKey指定为Topic.Demo.Key
再来看看消费者
Cuustomer.cs
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace _2_Receiver { class Program { static void Main(string[] args) { //创建连接工厂 var factory = new ConnectionFactory() { HostName = "localhost" }; //创建连接 using (var connection = factory.CreateConnection()) { //创建会话 using (var channel = connection.CreateModel()) { //声明一个Fanout类型的交换机 channel.ExchangeDeclare(exchange: "TopicDemo", type: ExchangeType.Topic); //声明一个消息队列并获取它的名字 var queueName = channel.QueueDeclare().QueueName; Console.WriteLine("请输入RoutingKey!"); var routingKey = Console.ReadLine(); //把消息队列和交换机绑定 channel.QueueBind(exchange: "TopicDemo", queue: queueName, routingKey: routingKey); //创建消费者 var consume = new EventingBasicConsumer(channel); //把消费者和队列绑定 channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume); Console.WriteLine("开始监听消息"); consume.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"收到消息{message}"); }; Console.ReadLine(); } } } } }
其RoutingKey也是在外部输入。
我们看看运行效果
因为Producer发布消息的RoutingKey是Topic.Demo.Key
又因为#可以代表0个或者多个单词 ,*能代表一个单词
所以*.*.Key Topic.#与Topic.Demo.Key匹配,而其他两个*.Key和test.1.2当然是不匹配的,所以没有收到消息。
总结
对于上面的例子,我们可以总结出,编写一个生产者的过程如下:
创建连接工厂-》创建连接-》创建会话(Chanel)-》创建交换机(Exchange)-》发送消息
编写一个生产者的过程如下:
创建连接工厂-》创建连接-》创建会话(Chanel)-》创建交换机(Exchange)-》创建队列-》绑定队列和交换机-》创建消费者-》把消费者和队列绑定-》监听消息
掌握这个大的方向,不管交换机怎么分配,代码应该都会写了。
为什么在生产者中和消费者中都要创建交换机呢? 因为我们不确定是生产者先执行还是消费者先执行,所以提前创建一下,避免连接时发现没有创建交换机,出现错误,如果交换机已经创建了,那么默认不会再次创建的。
另外,交换机创建后,同一名称的交换机使用完不会自动删除,但是第二次如果创建的名称和上次一样,但是交换机类型不一样了,那么便会出现报错。
这里总结的是一些RabbitMQ的基础知识,后面还会继续写一些更深入的使用技巧,如果不想错过精彩信息,点击关注一下吧(๑¯◡¯๑)!