.NET Compact Framework下的进程间通信之MSMQ开发

简介:

上篇讲到WinCe下的MSMQ安装 ,这篇讲述一下MSMQ在.NET Compact Framework下的开发。

所谓MQ就是Message Queue,消息队列。消息队列可以作为不同应用程序之间,甚至不同机器之间通信的渠道。在消息队列下进行通信的内容称为消息(Message),在C#程序下Message就是对象。

MSMQ就是Microsoft公司提供的MQ服务程序。MQ服务程序负责管理消息队列,保证消息在消息队列这一渠道下能无误的发送到对端,MQ支持离线交易,有时候消息会缓存在MQ服务程序中,当接收方再线时候在提取消息。这一特性使得MQ可以广泛使用在移动领域,因为移动应用的网络不能保证7×24的长连接。

生成队列

在CF.net下开发MQ,需要引用System.Messaging库。

 

复制代码
    using  System.Messaging;
    
public class
 MQService
    {
     
private const string mMachinePrefix = @".\"
;
        
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\"
;
        
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$"
;
        
private
 MessageQueue mServiceQueue;
        
         
private void
 InitServiceQueue()
        {
            
// create the message queue

            try
            {
                
// check to make sure the message queue does not exist already
                if (! MessageQueue.Exists(mServiceQueuePath))
                {
                    
// create the new message queue and make it transactional

                    mServiceQueue =  MessageQueue.Create(mServiceQueuePath);
                    mServiceQueue.Close();
                }
                
else

                {
                    mServiceQueue 
= new  MessageQueue(mServiceQueuePath);
                }

                Type[] types = new Type[1
];
                types[0= typeof(string
);
                mServiceQueue.Formatter = new
 XmlMessageFormatter(types);
                mServiceQueue.ReceiveCompleted += new
 ReceiveCompletedEventHandler(MessageListenerEventHandler);

                
// Begin the asynchronous receive operation.

                mServiceQueue.BeginReceive();
                mServiceQueue.Close();
            }
            
// show message if we used an invalid message queue name;

            catch  (MessageQueueException MQException)
            {
                Console.WriteLine(MQException.Message);
            }
            
return
;
        }
    }
复制代码

 

在建立Q之前先检查该Q是否存在,如果存在就生成Q的处理对象,如果不存在就先在队列管理器建立这个Q。建立Q的时候,输入参数为一个string,这个string可以为path(路径),FormatName或者Label。使用path的相对广泛,在例子中使用path作为输入参数。Path由MachineName and QueueName组成,建立的Q可以分为Public,Private,Journal和DeadLetter。使用广泛的是Public和Private,Public的Q由MachineName and QueueName组成,格式如MachineName\QueueName,而Private的Q的格式为MachineName\Private$\QueueName,比Public的Q多了一个标识Private$,在例子中使用了Private的Q。路径“.\”指的是本地机器。

Property Formatter十分重要,他定义了消息体的格式,所谓消息体的格式就是通过这个Q通信的消息的数据类型,一个Q可以传递多个不同的数据类型,需要在Type进行定义然后赋值给Formatter。

Event ReceiveCompleted用来注册接收处理函数,当Q接收到消息后,使用注册的函数进行处理。使用ReceiveCompleted注册处理函数以后,必须调用BeginReceive让这个Q进入异步接收状态。

下面讲述MQ应用中两种常见的应用模式,第一种为请求回应模式,第二种为注册广播模式。

请求回应模式

 

复制代码
public class  MQService
    {
        
private const string mMachinePrefix = @".\"
;
        
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\"
;
        
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$"
;
        
private
 System.Messaging.MessageQueue mServiceQueue;

        
private void
 InitServiceQueue()
        {
            
// create the message queue

            try
            {
                
// check to make sure the message queue does not exist already
                if (! System.Messaging.MessageQueue.Exists(mServiceQueuePath))
                {
                    
// create the new message queue and make it transactional

                    mServiceQueue =  System.Messaging.MessageQueue.Create(mServiceQueuePath);
                    mServiceQueue.Close();
                }
                
else

                {
                    mServiceQueue 
= new  System.Messaging.MessageQueue(mServiceQueuePath);
                }

                Type[] types = new Type[1
];
                types[0= typeof(string
);
                mServiceQueue.Formatter = new
 System.Messaging.XmlMessageFormatter(types);
                mServiceQueue.ReceiveCompleted += new
 System.Messaging.ReceiveCompletedEventHandler(MessageListenerEventHandler);

                
// Begin the asynchronous receive operation.

                mServiceQueue.BeginReceive();
                mServiceQueue.Close();
            }
            
// show message if we used an invalid message queue name;

            catch  (System.Messaging.MessageQueueException MQException)
            {
                Console.WriteLine(MQException.Message);
            }
            
return
;
        }

        
private void MessageListenerEventHandler(object
 sender, System.Messaging.ReceiveCompletedEventArgs e)
        {
            
try

            {
                
// Connect to the queue.
                System.Messaging.MessageQueue mq =  (System.Messaging.MessageQueue)sender;

                
// End the asynchronous receive operation.

                System.Messaging.Message msg =  mq.EndReceive(e.AsyncResult);
                
if (msg.Body.ToString() == "mq_reques_1"
)
                {
                    msg.ResponseQueue.Send("mq_respond_1"
);
                }
                
else if (msg.Body.ToString() == "mq_reques_2"
)
                {
                    msg.ResponseQueue.Send(true
);
                }

                
// Restart the asynchronous receive operation.

                mq.BeginReceive();
            }
            
catch
 (System.Messaging.MessageQueueException ex)
            {
                
// Handle sources of MessageQueueException.

                Console.WriteLine(ex.Message);
            }
            
return
;
        }
    }


    
public class
 MQClient
    {
        
private const string mMachinePrefix = @".\"
;
        
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\"
;
        
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$"
;
        
private const string mClientQueuePath = mPrivateQueueNamePrefix + "MQClientQueue$"
;
        
private
 System.Messaging.MessageQueue mServiceQueue;
        
private
 System.Messaging.MessageQueue mClientQueue;

        
public void
 InitQueues()
        {
            
// create the message queue

            try
            {
                mServiceQueue 
= new  System.Messaging.MessageQueue(mServiceQueuePath);

                
// check to make sure the message queue does not exist already

                if (! System.Messaging.MessageQueue.Exists(mClientQueuePath))
                {
                    
// create the new message queue and make it transactional

                    mClientQueue =  System.Messaging.MessageQueue.Create(mClientQueuePath);
                    mClientQueue.Close();
                }
                
else

                {
                    mClientQueue 
= new  System.Messaging.MessageQueue(mClientQueuePath);
                }

                Type[] types = new Type[2
];
                types[0= typeof(string
);
                types[1= typeof(bool
);
                mClientQueue.Formatter = new
 System.Messaging.XmlMessageFormatter(types);
                mClientQueue.Close();
            }
            
// show message if we used an invalid message queue name;

            catch  (System.Messaging.MessageQueueException MQException)
            {
                Console.WriteLine(MQException.Message);
            }
            
return
;
        }

        
private void
 SendRequest()
        {
            
try

            {
                System.Messaging.Message message 
= new System.Messaging.Message("mq_reques_1" );
                message.ResponseQueue =
 mClientQueue;
                mClientQueue.Purge();
                mServiceQueue.Send(message);

                System.Messaging.Message msg = mClientQueue.Receive(new TimeSpan(004
));

                
//handle the result.

                Console.WriteLine(msg.Body.ToString());
            }
            
// show message if we used an invalid message queue name;

            catch  (System.Messaging.MessageQueueException MQException)
            {
                Console.WriteLine(MQException.Message);
            }
        }
    }
复制代码

 

MQService是服务程序,负责服务队列".\Private$\MQServiceQueue$"的建立和管理,当有新消息发送到该服务队列时MessageListenerEventHandler函数就会callback,取出消息进行分析处理和发送返回,返回是通过client原先建立的Q进行返回,不是通过原服务Q返回,因为MQ的队列是单向的。MQClient负责客户端队列".\Private$\MQClientQueue$"的建立,在发送请求的时候把客户端队列赋值到properties ResponseQueue里,让服务程序可以返回到这个客户端的队列里面,同时在等待返回的时候有超时控制。

注册广播模式

注册广播模式是Observer模式的一种应用,Observer模式可见实用设计模式之一--Observer模式

客户端可以往服务端注册关心的消息,服务端通过MQ自动广播消息到客户端。

复制代码
    public class  MQService
    {
     
private const string mMachinePrefix = @".\"
;
        
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\"
;
        
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$"
;
        
private
 System.Messaging.MessageQueue mServiceQueue;
        
private Dictionary<string, MessageQueue> mmClientQueues = new Dictionary<string, MessageQueue>
();
        
        
private void
 InitServiceQueue()
        {
            
// create the message queue

            try
            {
                
// check to make sure the message queue does not exist already
                if (! System.Messaging.MessageQueue.Exists(mServiceQueuePath))
                {
                    
// create the new message queue and make it transactional

                    mServiceQueue =  System.Messaging.MessageQueue.Create(mServiceQueuePath);
                    mServiceQueue.Close();
                }
                
else

                {
                    mServiceQueue 
= new  System.Messaging.MessageQueue(mServiceQueuePath);
                }

                Type[] types = new Type[1
];
                types[0= typeof(string
);
                mServiceQueue.Formatter = new
 System.Messaging.XmlMessageFormatter(types);
                mServiceQueue.ReceiveCompleted += new
 System.Messaging.ReceiveCompletedEventHandler(MessageListenerEventHandler);

                
// Begin the asynchronous receive operation.

                mServiceQueue.BeginReceive();
                mServiceQueue.Close();
            }
            
// show message if we used an invalid message queue name;

            catch  (System.Messaging.MessageQueueException MQException)
            {
                Console.WriteLine(MQException.Message);
            }
            
return
;
        }
        
        
private void MessageListenerEventHandler(object
 sender, System.Messaging.ReceiveCompletedEventArgs e)
        {
            
try

            {
                
// Connect to the queue.
                System.Messaging.MessageQueue mq =  (System.Messaging.MessageQueue)sender;

                
// End the asynchronous receive operation.

                System.Messaging.Message msg =  mq.EndReceive(e.AsyncResult);
                
if(msg.Body.ToString() == "mq_register_1"
)
                {
                    mmClientQueues.Add(msg.Label, msg.ResponseQueue);
                }
                
else if(msg.Body.ToString() == "mq_unregister_1"
)
                {
                    mmClientQueues[msg.Label].Purge();
                    mmClientQueues.Remove(msg.Label);
                }
                
                
// Restart the asynchronous receive operation.

                mq.BeginReceive();
            }
            
catch
 (System.Messaging.MessageQueueException ex)
            {
                
// Handle sources of MessageQueueException.

                Console.WriteLine(ex.Message);
            }
            
return

        }
        
        
private void Notify(string
 str)
        {
            
if (mmClientQueues.Count > 0
)
            {
                
foreach(MessageQueue mq in
 mmClientQueues.Values)
                {
                    mq.Send(str);
                }
            }
        }
    }
    
    
    
public class
 MQClient
    {
         
private const string mMachinePrefix = @".\"
;
        
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\"
;
        
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$"
;
        
private const string mClientQueuePath = mPrivateQueueNamePrefix + "MQClientQueue$"
;
        
private
 System.Messaging.MessageQueue mServiceQueue;
        
private
 System.Messaging.MessageQueue mClientQueue;
        
        
public void
 InitQueues()
        {
            
// create the message queue

            try
            {
                mServiceQueue 
= new  System.Messaging.MessageQueue(mServiceQueuePath);

                
// check to make sure the message queue does not exist already

                if (! System.Messaging.MessageQueue.Exists(mClientQueuePath))
                {
                    
// create the new message queue and make it transactional

                    mClientQueue =  System.Messaging.MessageQueue.Create(mClientQueuePath);
                    mClientQueue.Close();
                }
                
else

                {
                    mClientQueue 
= new  System.Messaging.MessageQueue(mClientQueuePath);
                }

                Type[] types = new Type[2
];
                types[0= typeof(string
);
                types[1= typeof(bool
);
                mClientQueue.Formatter = new
 System.Messaging.XmlMessageFormatter(types);
                
                
//
Initiate the asynchronous receive operation by telling the Message
                
//
 Queue to begin receiving messages and notify the event handler 
                
// when finished

                mClientQueue.ReceiveCompleted +=
                       
new  System.Messaging.ReceiveCompletedEventHandler(ClientQueueReceiveCompleted);
                mClientQueue.BeginReceive();
                
                mClientQueue.Close();
            }
            
// show message if we used an invalid message queue name;

            catch  (System.Messaging.MessageQueueException MQException)
            {
                Console.WriteLine(MQException.Message);
            }
            
return
;
        }
        
        
private void
 RegisterService()
        {
            
try

            {
                System.Messaging.Message message 
= new System.Messaging.Message("mq_register_1" );
                message.Label = "client1"
;
                message.ResponseQueue =
 mClientQueue;
                mServiceQueue.Send(message);
            }
            
// show message if we used an invalid message queue name;

            catch  (System.Messaging.MessageQueueException MQException)
            {
                Console.WriteLine(MQException.Message);
            }
        }

        
private void
 UnregisterService()
        {
            
try

            {
                System.Messaging.Message message 
= new System.Messaging.Message("mq_unregister_1" );
                message.Label = "client1"
;
                mServiceQueue.Send(message);
                Thread.Sleep(500
);
                mClientQueue.Purge();
            }
            
// show message if we used an invalid message queue name;

            catch  (System.Messaging.MessageQueueException MQException)
            {
                Console.WriteLine(MQException.Message);
            }
        }
        
        
private void
 ClientQueueReceiveCompleted(Object source,
                        ReceiveCompletedEventArgs asyncResult)
        {
            
try

            {
                
// End the Asynchronous Receive Operation
                Message message =
                    mClientQueue.EndReceive(asyncResult.AsyncResult);

                
if (message.Body is string )
                {
                   Console.WriteLine(message.Body.ToString());
                }
                
            }
            
catch
 (MessageQueueException e)
            {
                Console.WriteLine
                    (String.Format(System.Globalization.CultureInfo.CurrentCulture,
                                            
"Failed to receive Message: {0} "
, e.ToString()));
            }
            
//Begin the next Asynchronous Receive Operation

            mClientQueue.BeginReceive();
        }
}
复制代码

和请求回应模式相比MQService使用容器保存所有注册的客户端的Q,当需要notify的时候遍历所有客户端Q进行广播。MQClient建立广播Q,然后注册函数ClientQueueReceiveCompleted处理广播事件。MQ的应用能把Oberver模式应用跨进程和跨系统,消息订阅广播机制可以借助MQ和observer模式来实现。


参考文献

MessageQueue Class 

MessageQueue.Formatter Property



    本文转自Jake Lin博客园博客,原文链接:http://www.cnblogs.com/procoder/archive/2009/03/23/1419440.html,如需转载请自行联系原作者


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
API C++ Windows
Visual C++运行库、.NET Framework和DirectX运行库的作用及常见问题解决方案,涵盖MSVCP140.dll丢失、0xc000007b错误等典型故障的修复方法
本文介绍Visual C++运行库、.NET Framework和DirectX运行库的作用及常见问题解决方案,涵盖MSVCP140.dll丢失、0xc000007b错误等典型故障的修复方法,提供官方下载链接与系统修复工具使用指南。
647 2
|
2月前
|
开发框架 安全 .NET
Microsoft .NET Framework 3.5、4.5.2、4.8.1,适用于 Windows 版本的 .NET,Microsoft C Runtime等下载
.NET Framework是Windows平台的开发框架,包含CLR和FCL,支持多种语言开发桌面、Web应用。常用版本有3.5、4.5.2、4.8.1,系统可同时安装多个版本,确保软件兼容运行。
661 0
Microsoft .NET Framework 3.5、4.5.2、4.8.1,适用于 Windows 版本的 .NET,Microsoft C Runtime等下载
|
3月前
|
C++
提示缺少.NET Framework 3.5 安装错误:0x80070002、0x800F0950\0x80004002
.NET Framework常见问题及解决方法汇总,
512 0
|
9月前
|
Linux 数据库 Perl
【YashanDB 知识库】如何避免 yasdb 进程被 Linux OOM Killer 杀掉
本文来自YashanDB官网,探讨Linux系统中OOM Killer对数据库服务器的影响及解决方法。当内存接近耗尽时,OOM Killer会杀死占用最多内存的进程,这可能导致数据库主进程被误杀。为避免此问题,可采取两种方法:一是在OS层面关闭OOM Killer,通过修改`/etc/sysctl.conf`文件并重启生效;二是豁免数据库进程,由数据库实例用户借助`sudo`权限调整`oom_score_adj`值。这些措施有助于保护数据库进程免受系统内存管理机制的影响。
|
监控 Linux 应用服务中间件
探索Linux中的`ps`命令:进程监控与分析的利器
探索Linux中的`ps`命令:进程监控与分析的利器
420 13
|
9月前
|
Linux Shell
Linux 进程前台后台切换与作业控制
进程前台/后台切换及作业控制简介: 在 Shell 中,启动的程序默认为前台进程,会占用终端直到执行完毕。例如,执行 `./shella.sh` 时,终端会被占用。为避免不便,可将命令放到后台运行,如 `./shella.sh &`,此时终端命令行立即返回,可继续输入其他命令。 常用作业控制命令: - `fg %1`:将后台作业切换到前台。 - `Ctrl + Z`:暂停前台作业并放到后台。 - `bg %1`:让暂停的后台作业继续执行。 - `kill %1`:终止后台作业。 优先级调整:
723 5
|
运维 关系型数据库 MySQL
掌握taskset:优化你的Linux进程,提升系统性能
在多核处理器成为现代计算标准的今天,运维人员和性能调优人员面临着如何有效利用这些处理能力的挑战。优化进程运行的位置不仅可以提高性能,还能更好地管理和分配系统资源。 其中,taskset命令是一个强大的工具,它允许管理员将进程绑定到特定的CPU核心,减少上下文切换的开销,从而提升整体效率。
掌握taskset:优化你的Linux进程,提升系统性能
|
弹性计算 Linux 区块链
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
500 4
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
|
算法 Linux 调度
探索进程调度:Linux内核中的完全公平调度器
【8月更文挑战第2天】在操作系统的心脏——内核中,进程调度算法扮演着至关重要的角色。本文将深入探讨Linux内核中的完全公平调度器(Completely Fair Scheduler, CFS),一个旨在提供公平时间分配给所有进程的调度器。我们将通过代码示例,理解CFS如何管理运行队列、选择下一个运行进程以及如何对实时负载进行响应。文章将揭示CFS的设计哲学,并展示其如何在现代多任务计算环境中实现高效的资源分配。

热门文章

最新文章