[译]RabbitMQ教程C#版 - 远程过程调用(RPC)

简介:

先决条件
本教程假定 RabbitMQ 已经安装,并运行在localhost 标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助

如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们

在第 [教程[2]](https://www.cnblogs.com/esofar/p/rabbitmq-work-queues.html) 中,我们学习了如何使用工作队列在多个工作单元之间分配耗时任务。

但是如果我们想要运行一个在远程计算机上的函数并等待其结果呢?这将是另外一回事了。这种模式通常被称为 远程过程调用RPC

在本篇教程中,我们将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有什么耗时任务值得分发,那干脆就创建一个返回斐波那契数列的虚拟 RPC 服务吧。

客户端接口

为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。该类将暴露一个名为Call的方法,用来发送 RPC 请求并且保持阻塞状态,直到接收到应答为止。

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

关于 RPC 的说明

尽管 RPC 在计算机中是一种很常见的模式,但它经常受到批评。问题出现在当程序员不知道一个函数是本地调用还是一个耗时的 RPC 请求。这样的混淆,会导致系统不可预测,以及给调试增加不必要的复杂性。误用 RPC 可能会导致不可维护的混乱代码,而不是简化软件。

牢记这些限制,请考虑如下建议:

  • 确保可以明显区分哪些函数是本地调用,哪些是远程调用。
  • 为您的系统编写文档,明确组件之间的依赖关系。
  • 捕获异常,当 RPC 服务长时间宕机时客户端该如何应对。

当有疑问的时候可以先避免使用 RPC。如果可以的话,考虑使用异步管道 - 而不是类似 RPC 的阻塞,其会将结果以异步的方式推送到下一个计算阶段。

回调队列

一般来讲,基于 RabbitMQ 进行 RPC 通信是非常简单的,客户端发送一个请求消息,然后服务端用一个响应消息作为应答。为了能接收到响应,我们需要在发送请求过程中指定一个'callback'队列地址。

var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes);

// ... then code to read a response message from the callback_queue ...

消息属性

AMQP 0-9-1 协议在消息中预定义了一个包含 14 个属性的集合,大多数属性很少使用,但以下情况除外:
Persistent:将消息标记为持久的(值为2)或者瞬时的(其他值),可以参考 [教程[2]](https://www.cnblogs.com/esofar/p/rabbitmq-work-queues.html)

DeliveryMode:熟悉 AMQP 协议的人可以选择此属性而不是熟悉协议的人可以选择使用此属性而不是Persistent,它们控制的东西是一样的。
ContentType:用于描述编码的 mime 类型。例如,对于经常使用的 JSON 编码,将此属性设置为:application/json是一种很好的做法。
ReplyTo:通常用于命名回调队列。
CorrelationId:用于将 RPC 响应与请求相关联。

关联ID

在上面介绍的方法中,我们建议为每个 RPC 请求创建一个回调队列,但是这种方式效率低。幸运的是我们有一种更好的方式,那就是为每个客户端创建一个独立的回调队列。

这种方式会引出一个新的问题,在收到响应的回调队列中,它无法区分响应属于哪一个请求,此时便是CorrelationId属性的所用之处。我们将为每个请求的CorrelationId设置一个唯一值。之后当我们在回调队列接收到响应的时候,再去检查下这个属性是否和请求中的值匹配,如此一来,我们就可以把响应和请求关联起来了。如果出现一个未知的CorrelationId值,我们可以安全的销毁这个消息,因为这个消息不属于我们的请求。

你可能会问,为什么我们应该忽略回调队列中的未知的消息,而不是用错误来标识失败呢?这是因为于服务器端可能存在竞争条件。虽然不太可能,但是 RPC 服务器可能在仅发送了响应消息而未发送消息确认的情况下挂掉,如果出现这种情况,RPC 服务器重启之后将会重新处理该请求。这就是为什么在客户端上我们必须优雅地处理重复的响应,并且理想情况下 RPC 应该是幂等的。

总结

我们的 RPC 会是这样工作:

  • 客户端启动时,会创建一个匿名的独占回调队列。
  • 对于 RPC 请求,客户端发送带有两个属性的消息:ReplyTo(设置为回调队列)和CorrelationId(为每个请求设置唯一值)。
  • 请求被发送到rpc_queue队列。
  • RPC 工作线程(或者叫:服务器)正在等待该队列上的请求。当出现请求时,它会执行该作业,并使用ReplyTo属性设置的队列将带有结果的消息发送回客户端。
  • 客户端等待回调队列上的数据。出现消息时,它会检查CorrelationId属性。如果它与请求中的值匹配,则返回对应用程序的响应。

组合在一起

斐波纳契 任务:

private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

我们宣布我们的斐波那契函数。并假定只允许有效的正整数输入。 (不要期望这个适用于大数字,它可能是最慢的递归实现)。

我们的 RPC 服务端代码 RPCServer.cs 看起来如下所示:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class RPCServer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "rpc_queue", durable: false,
              exclusive: false, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: "rpc_queue",
              autoAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
            {
                string response = null;

                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try
                {
                    var message = Encoding.UTF8.GetString(body);
                    int n = int.Parse(message);
                    Console.WriteLine(" [.] fib({0})", message);
                    response = fib(n).ToString();
                }
                catch (Exception e)
                {
                    Console.WriteLine(" [.] " + e.Message);
                    response = "";
                }
                finally
                {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                      basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
                      multiple: false);
                }
            };

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

    /// 

    /// Assumes only valid positive integer input.
    /// Don't expect this one to work for big numbers, and it's
    /// probably the slowest recursive implementation possible.
    /// 
    private static int fib(int n)
    {
        if (n == 0 || n == 1)
        {
            return n;
        }

        return fib(n - 1) + fib(n - 2);
    }
}

服务端代码非常简单:

  • 像往常一样,首先建立连接,通道和声明队列。
  • 我们可能希望运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要设置channel.BasicQos中的prefetchCount值。
  • 使用BasicConsume访问队列,然后注册一个交付处理程序,并在其中完成工作并发回响应。

我们的 RPC 客户端 RPCClient.cs 代码:

using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public class RpcClient
{
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;

public RpcClient()
{
        var factory = new ConnectionFactory() { HostName = "localhost" };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);

        props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var response = Encoding.UTF8.GetString(body);
            if (ea.BasicProperties.CorrelationId == correlationId)
            {
                respQueue.Add(response);
            }
        };
    }

    public string Call(string message)
    {
        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(
            exchange: "",
            routingKey: "rpc_queue",
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        return respQueue.Take(); ;
    }

    public void Close()
    {
        connection.Close();
    }
}

public class Rpc
{
    public static void Main()
    {
        var rpcClient = new RpcClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");

        Console.WriteLine(" [.] Got '{0}'", response);
        rpcClient.Close();
    }
}

客户端代码稍微复杂一些:

  • 建立连接和通道,并为响应声明一个独有的 'callback' 队列。
  • 订阅这个 'callback' 队列,以便可以接收到 RPC 响应。
  • Call方法用来生成实际的 RPC 请求。
  • 在这里,我们首先生成一个唯一的CorrelationId编号并保存它,while 循环会使用该值来捕获匹配的响应。
  • 接下来,我们发布请求消息,其中包含两个属性:ReplyToCorrelationId
  • 此时,我们可以坐下来稍微一等,直到指定的响应到来。
  • while 循环做的工作非常简单,对于每个响应消息,它都会检查CorrelationId是否是我们正在寻找的那一个。如果是这样,它就会保存该响应。
  • 最后,我们将响应返回给用户。

客户发出请求:

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

现在是查看 RPCClient.csRPCServer.cs 的完整示例源代码(包括基本异常处理)的好时机哦。

像往常一样设置(请参见 [教程[1]](https://www.cnblogs.com/esofar/p/rabbitmq-hello-world.html)]):

我们的 RPC 服务现已准备就绪,现在可以启动服务端:

cd RPCServer
dotnet run
# => [x] Awaiting RPC requests

要请求斐波纳契数,请运行客户端:

cd RPCClient
dotnet run
# => [x] Requesting fib(30)

这里介绍的设计并不是 RPC 服务的唯一可能实现,但它仍具有一些重要优势:

  • 如果 RPC 服务器太慢,您可以通过运行另一个服务器来扩展。尝试在新开一个控制台,运行第二个 RPCServer。
  • 在客户端,RPC 只需要发送和接收一条消息。不需要像QueueDeclare一样同步调用。因此,对于单个 RPC 请求,RPC 客户端只需要一次网络往返。

我们的代码很简单,也并没有尝试去解决更复杂(但很重要)的问题,比如就像:

  • 如果服务端没有运行,客户端应该如何反应?
  • 客户端是否应该为 RPC 设置某种超时机制?
  • 如果服务端出现故障并引发异常,是否应将其转发给客户端?
  • 在处理之前防止无效的传入消息(例如:检查边界、类型)。

如果您想进行实验,您可能会发现 管理 UI 对于查看队列非常有用。

写在最后

本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
2月前
|
开发框架 NoSQL MongoDB
C#/.NET/.NET Core开发实战教程集合
C#/.NET/.NET Core开发实战教程集合
|
7月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
2月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
31 0
rabbitmq基础教程(ui,java,springamqp)
|
7月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
3月前
|
设计模式 C# 开发者
C#设计模式入门实战教程
C#设计模式入门实战教程
|
4月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
215 2
|
4月前
|
消息中间件 监控 Ubuntu
RabbitMQ安装配置,超详细版教程
以上步骤为您提供了在Linux环境下安装RabbitMQ的详细过程。安装Erlang作为基础,然后通过添加官方源并安装RabbitMQ本身,最后对服务进行配置并启用Web管理界面。这些步骤操作简单直观,只需要跟随上述指南,即可在短时间内将RabbitMQ服务器运行起来,并进行进一步的配置和管理。不要忘记硬件和网络资源对性能的影响,确保RabbitMQ能够满足您的应用需求。
281 0
|
5月前
|
机器学习/深度学习 算法 搜索推荐
一个开源且全面的C#算法实战教程
一个开源且全面的C#算法实战教程
100 0
|
7月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。