[译]RabbitMQ教程C#版 - 发布订阅

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 先决条件本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。从哪里获得帮助如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们。

先决条件
本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们

发布/订阅

(使用.NET Client)

教程[2] 中,我们创建了一个工作队列,假设在工作队列中的每一个任务都只被分发给一个Worker。那么在这一章节,我们要做与之完全不同的事,那就是我们将要把一条消息分发给多个消费者。这种模式被称为“发布/订阅”。

为了说明、体现这种模式,我们将会建一个简单的日志系统。它将会包含两个程序 - 第一个用来发送日志消息,第二个用来接收并打印它们。

在我们建立的日志系统中,每个接收程序的运行副本都会收到消息。这样我们就可以运行一个接收程序接收消息并将日志写入磁盘;同时运行另外一个接收程序接收消息并将日志打印到屏幕上。

实质上,发布的日志消息将会被广播给所有的接收者。

交换器

在教程的前几部分,我们是发送消息到队列并从队列中接收消息。现在是时候介绍Rabbit中完整的消息传递模型了。

让我们快速回顾一下前面教程中的内容:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

在RabbitMQ中,消息传递模型的核心理念是生产者从来不会把任何消息直接发送到队列,其实,通常生产者甚至不知道消息是否会被分发到任何队列中。

然而,生产者只能把消息发送给交换器。交换器非常简单,一方面它接收来自生产者的消息,另一方面又会把接收的消息推送到队列中。交换器必须明确知道该如何处理收到的消息,应该追加到一个特定队列中?还是应该追加到多个队列中?或者应该把它丢弃?这些规则都被定义在交换器类型中。

Exchanges

目前交换器类型有这几种:directtopicheadersfanout。我们先来重点关注最后一个fanout,我们创建一个这种类型的交换器,将其命名为logs

channel.ExchangeDeclare("logs", "fanout");

fanout类型交换器非常简单,正如您可能从名字中猜出的那样,它会把收到的所有消息广播到它已知的所有队列中。这恰巧是我们的日志系统目前所需要的。

列举交换器
要列举出服务器上的交换器,您可以使用非常有用的rabbitmqctl命令行工具:

sudo rabbitmqctl list_exchanges

执行上述命令后,出现的列表中将会有一些amq.*交换器和默认(未命名)交换器。这些是默认创建的,不过目前您可能用不到它们。

默认交换器
在教程的前些部分,我们对交换器这一概念还一无所知,但仍然可以把消息发送到队列。之所以这样,是因为我们使用了一个用空字符串("")标识的默认交换器。

回顾一下我们之前如何发布消息:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "hello",
                     basicProperties: null,
                     body: body);

第一个参数就是交换器的名称,空字符串表示默认或匿名交换器:将消息路由到routingKey指定的队列(如果存在)中。

现在,我们可以把消息发布到我们指定的交换器:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
                     routingKey: "",
                     basicProperties: null,
                     body: body);

临时队列

您是否还记得之前我们使用过的队列,它们都有一个特定的名称(记得应该是hellotask_queue吧)。给队列命名对我们来说是至关重要的 -- 因为我们可能需要多个Worker指向同一个队列;当您想要在生产者和消费者之间共享队列时,给队列一个名称也是非常重要的。

但是,我们创建的日志系统并不希望如此。我们希望监听所有的日志消息,而不仅仅是其中一部分。我们也只对目前流动的消息感兴趣,而不是旧消息。为解决这个问题,我们需要做好两件事。

首先,我们无论何时连接Rabbit,都需要一个新的、空的队列。要做到这一点,我们可以使用随机名称来创建队列,或许,甚至更好的方案是让服务器为我们选择一个随机队列名称。

其次,一旦我们与消费者断开连接,与之相关的队列应该被自动删除。

在.NET客户端中,如果不向QueueDeclare()方法提供任何参数,实际上就是创建了一个非持久化、独占、且自动删除的随机命名队列:

var queueName = channel.QueueDeclare().QueueName;

您可以在队列指南中了解更多关于exclusive参数和其他队列属性的信息。

此时,queueName包含一个随机队列名称。例如,它看起来可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定

Bindings

我们已经创建好了一个fanout交换器和一个队列。现在我们需要告诉交换器把消息发送到我们的队列。而交换器和队列之间的关系就称之为绑定

// 把一个队列绑定到指定交换器。
channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

从现在起,logs交换器会把消息追加到我们的队列中。

列举绑定
您可以使用(您或许已经猜到了),列举出现有的绑定。

sudo rabbitmqctl list_bindings

组合在一起

Putting it all together

生产者程序负责分发消息,这与之前的教程看起来没有太大区别。

最重要的变化是我们现在想把消息发布到我们的logs交换器,而不是匿名交换器。在发送时我们需要提供一个路由键routingKey,但是对于fanout交换器,它的值可以被忽略。这里是EmitLog.cs文件的代码:

using System;
using RabbitMQ.Client;
using System.Text;

class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}

EmitLog.cs源码)

如你所见,在建立连接后,我们声明了交换器。这一步非常有必要,因为发布消息到一个不存在的交换器,这种情况是被禁止的。

如果没有队列绑定到交换器上,消息将会丢失,但这对我们来说并没有什么没问题;如果没有消费者正在监听,我们是可以放心地把消息丢弃的。

ReceiveLogs.cs的代码:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,
                              exchange: "logs",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

ReceiveLogs.cs源码)

按照教程[1]中的设置说明生成EmitLogsReceiveLogs项目。

如果您想把日志保存到文件中,只需打开一个控制台并输入:

cd ReceiveLogs
dotnet run > logs_from_rabbit.log

如果你想在屏幕上看到日志,我可以新开一个终端并运行:

cd ReceiveLogs
dotnet run

当然,分发日志需要输入:

cd EmitLog
dotnet run

使用rabbitmqctl list_bindings命令,您可以验证代码是否真正创建了我们想要的绑定和队列。当有两个ReceiveLogs.cs程序运行时,您应该看到如下所示的内容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对执行结果的解释简洁明了:来自logs交换器的数据转发到了两个由服务器随机分配名称的队列。这正是我们期待的结果。

想要了解如何监听消息的这一块内容,让我们继续阅读教程[4]

写在最后

本文翻译自RabbitMQ官方教程C#版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件
RabbitMQ消息模型之发布订阅Publish-Subscribe
RabbitMQ消息模型之发布订阅Publish-Subscribe
69 0
RabbitMQ消息模型之发布订阅Publish-Subscribe
|
4月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
14天前
|
设计模式 C# 开发者
C#设计模式入门实战教程
C#设计模式入门实战教程
|
4月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
1月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
126 2
|
27天前
|
消息中间件 监控 Ubuntu
RabbitMQ安装配置,超详细版教程
以上步骤为您提供了在Linux环境下安装RabbitMQ的详细过程。安装Erlang作为基础,然后通过添加官方源并安装RabbitMQ本身,最后对服务进行配置并启用Web管理界面。这些步骤操作简单直观,只需要跟随上述指南,即可在短时间内将RabbitMQ服务器运行起来,并进行进一步的配置和管理。不要忘记硬件和网络资源对性能的影响,确保RabbitMQ能够满足您的应用需求。
76 0
|
2月前
|
机器学习/深度学习 算法 搜索推荐
一个开源且全面的C#算法实战教程
一个开源且全面的C#算法实战教程
|
4月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
3月前
|
消息中间件 Java RocketMQ
教程:Spring Boot整合RocketMQ的配置与优化
教程:Spring Boot整合RocketMQ的配置与优化
|
4月前
|
消息中间件 存储 Java
RocketMQ实战教程之NameServer与BrokerServer
这是一个关于RocketMQ实战教程的概要,主要讨论NameServer和BrokerServer的角色。NameServer负责管理所有BrokerServer,而BrokerServer存储和传输消息。生产者和消费者通过NameServer找到合适的Broker进行交互,不需要直接知道Broker的具体信息。工作流程包括生产者向NameServer查询后发送消息到Broker,以及消费者同样通过NameServer获取消息进行消费。这种设计类似于服务注册中心的概念,便于系统扩展和集群管理。