C#的并发机制优秀在哪?

简介: 笔者上次用C#写.Net代码差不多还是10多年以前,由于当时Java已经颇具王者风范,Net几乎被打得溃不成军。因此当时笔者对于这个.Net的项目态度比较敷衍了事,没有对其中一些优秀机制有很深的了解,在去年写《C和Java没那么香了,高并发时代谁能称王》时都没给.Net以一席之地,不过最近恰好机缘巧合,我又接手了一个Windows方面的项目,这也让我有机会重新审视一下自己关于.Net框架的相关知识。项目原型要实现的功能并不复杂,主要就是记录移动存储设备中文件拷出的记录,而且需要尽可能少的占用系统资源,而在开发过程中的一个现象令我颇我惊异,在使用Invoke方法记录文件拷出情况时,程序执行效率

笔者上次用C#写.Net代码差不多还是10多年以前,由于当时Java已经颇具王者风范,Net几乎被打得溃不成军。因此当时笔者对于这个.Net的项目态度比较敷衍了事,没有对其中一些优秀机制有很深的了解,在去年写《C和Java没那么香了,高并发时代谁能称王》时都没给.Net以一席之地,不过最近恰好机缘巧合,我又接手了一个Windows方面的项目,这也让我有机会重新审视一下自己关于.Net框架的相关知识。

项目原型要实现的功能并不复杂,主要就是记录移动存储设备中文件拷出的记录,而且需要尽可能少的占用系统资源,而在开发过程中的一个现象令我颇我惊异,在使用Invoke方法记录文件拷出情况时,程序执行效率明显会更高。

一行看似没用的代码却大幅提升效率

由于我需要记录的文件拷出信息并没有回显在UI的需要,因此也就没考虑并发冲突的问题,在最初版本的实现中,我对于filesystemwatcher的回调事件,都是直接处理的,如下:

privatevoidDeleteFileHandler(object sender, FileSystemEventArgs e)         {             if(files.Contains(e.FullPath))             {                 files.Remove(e.FullPath);                //一些其它操作               }         }

这个程序的处理效率在普通的办公PC上如果同时拷出20个文件,那么在拷贝过程中,U盘监测程序的CPU使用率大约是0.7%。

image.png

但是一个非常偶然的机会,我使用了Event/Delegate的Invoke机制,结果发现这样一个看似的废操作,却让程序的CPU占用率下降到0.2%左右

privatevoidUdiskWather_Deleted(object sender, FileSystemEventArgs e)         {             if(this.InvokeRequired)             {                 this.Invoke(newDeleteDelegate(DeleteFileHandler),newobject[]{ sender,e });               }             else             {                 DeleteFileHandler(sender, e);             }         }

image.png

在我最初的认识中.net中的Delegate机制在调用过程中是要进行拆、装箱操作的,因此这不拖慢操作就不错了,但实际的验证结果却相反,这背后的原因引起了我的好奇。

看似没用的Invoke为何提高效率?

这里先给出结论,Invoke能提升程序执行效率,其关键还是在于线程在多核之间切换的消耗要远远高于拆、装箱的资源消耗,我们知道我们程序的核心就是操作files这个共享变量,每次在被检测的U盘目录中如果发生文件变动,其回调通知函数可能都运行在不同的线程,如下:

image.png

Invoke机制的背后其实就是保证所有对于files这个共享变量的操作,全部都是由一个线程执行完成的。

image.png

由于目前.Net的代码都开源的,下面我们大致讲解一下Invoke的调用过程,不管是BeginInvoke还是Invoke背后其实都是调用的MarshaledInvoke方法来完成的,如下:

public IAsyncResult BeginInvoke(Delegate method, params Object[] args){             using(newMultithreadSafeCallScope()){                 Control marshaler =FindMarshalingControl();                 return(IAsyncResult)marshaler.MarshaledInvoke(this, method, args,false);             }         }

MarshaledInvoke的主要工作是创建ThreadMethodEntry对象,并把它放在一个链表里进行管理,然后调用PostMessage将相关信息发给要通信的线程,如下:

private Object MarshaledInvoke(Control caller, Delegate method, Object[] args, bool synchronous){                   if(!IsHandleCreated){                 thrownewInvalidOperationException(SR.GetString(SR.ErrorNoMarshalingThread));             }                          ActiveXImpl activeXImpl =(ActiveXImpl)Properties.GetObject(PropActiveXImpl);             if(activeXImpl !=null){                 IntSecurity.UnmanagedCode.Demand();             }             // We don't want to wait if we're on the same thread, or else we'll deadlock.             // It is important that syncSameThread always be false for asynchronous calls.             //             bool syncSameThread =false;             int pid;// ignored             if(SafeNativeMethods.GetWindowThreadProcessId(newHandleRef(this, Handle), out pid)== SafeNativeMethods.GetCurrentThreadId()){                 if(synchronous)                     syncSameThread =true;             }             // Store the compressed stack information from the thread that is calling the Invoke()             // so we can assign the same security context to the thread that will actually execute             // the delegate being passed.             //             ExecutionContext executionContext =null;             if(!syncSameThread){                 executionContext = ExecutionContext.Capture();             }             ThreadMethodEntry tme =newThreadMethodEntry(caller,this, method, args, synchronous, executionContext);             lock(this){                 if(threadCallbackList ==null){                     threadCallbackList =newQueue();                 }             }             lock(threadCallbackList){                 if(threadCallbackMessage ==0){                     threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion +"_ThreadCallbackMessage");                 }                 threadCallbackList.Enqueue(tme);             }             if(syncSameThread){                 InvokeMarshaledCallbacks();             }  else{                 //                 UnsafeNativeMethods.PostMessage(newHandleRef(this, Handle), threadCallbackMessage, IntPtr.Zero, IntPtr.Zero);             }             if(synchronous){                 if(!tme.IsCompleted){                     WaitForWaitHandle(tme.AsyncWaitHandle);                 }                 if(tme.exception !=null){                     throw tme.exception;                 }                 return tme.retVal;             }             else{                 return(IAsyncResult)tme;             }         }

Invoke的机制就保证了一个共享变量只能由一个线程维护,这和  GO语言使用通信来替代共享内存的设计是暗合的,他们的理念都是 "让同一块内存在同一时间内只被一个线程操作" 。这和现代计算体系结构的多核CPU(SMP)有着密不可分的联系,

这里我们先来科普一下CPU之间的通信MESI协议的内容。我们知道现代的CPU都配备了高速缓存,按照多核高速缓存同步的MESI协议约定,每个缓存行都有四个状态,分别是E(exclusive)、M(modified)、S(shared)、I(invalid),其中:

M:代表该缓存行中的内容被修改,并且该缓存行只被缓存在该CPU中。这个状态代表缓存行的数据和内存中的数据不同。

E:代表该缓存行对应内存中的内容只被该CPU缓存,其他CPU没有缓存该缓存对应内存行中的内容。这个状态的缓存行中的数据与内存的数据一致。

I:代表该缓存行中的内容无效。

S:该状态意味着数据不止存在本地CPU缓存中,还存在其它CPU的缓存中。这个状态的数据和内存中的数据也是一致的。不过只要有CPU修改该缓存行都会使该行状态变成 I 。

四种状态的状态转移图如下:

image.png

我们上文也提到了,不同的线程是有大概率是运行在不同CPU核上的,在不同CPU操作同一块内存时,站在CPU0的角度上看,就是CPU1会不断发起remote write的操作,这会使该高速缓存的状态总是会在S和I之间进行状态迁移,而一旦状态变为I将耗费比较多的时间进行状态同步。

image.png

因此我们可以基本得出 this.Invoke(new DeleteDelegate(DeleteFileHandler), new object[] { sender,e });   ;这行看似无关紧要的代码之后,无意中使files共享变量的维护操作,由多核多线程共同操作,变成了众多子线程向主线程通信,所有维护操作均由主线程进行,这也使最终的执行效率有所提高。

深度解读,为何要用两把锁

在当前使用通信替代共享内存的大潮之下,锁其实是最重要的设计。

我们看到在.Net的Invoke实现中,使用了两把锁lock (this) 与lock (threadCallbackList)

lock(this){                 if(threadCallbackList ==null){                     threadCallbackList =newQueue();                 }             }             lock(threadCallbackList){                 if(threadCallbackMessage ==0){                     threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion +"_ThreadCallbackMessage");                 }                 threadCallbackList.Enqueue(tme);             }

在.NET当中lock关键字的基本可以理解为提供了一个近似于CAS的锁(Compare And Swap)。CAS的原理不断地把"期望值"和"实际值"进行比较,当它们相等时,说明持有锁的CPU已经释放了该锁,那么试图获取这把锁的CPU就会尝试将"new"的值(0)写入"p"(交换),以表明自己成为spinlock新的owner。伪代码演示如下:

voidCAS(int p, int old,int new){    if*p != old         do nothing     else       *p ← new}

基于CAS的锁效率没问题,尤其是在没有多核竞争的情况CAS表现得尤其优秀,但CAS最大的问题就是不公平,因为如果有多个CPU同时在申请一把锁,那么刚刚释放锁的CPU极可能在下一轮的竞争中获取优势,再次获得这把锁,这样的结果就是一个CPU忙死,而其它CPU却很闲,我们很多时候诟病多核SOC“一核有难,八核围观”其实很多时候都是由这种不公平造成的。

为了解决CAS的不公平问题,业界大神们又引入了TAS(Test And Set Lock)机制,个人感觉还是把TAS中的T理解为Ticket更好记一些,TAS方案中维护了一个请求该锁的头尾索引值,由"head"和"tail"两个索引组成。

struct lockStruct{    int32 head;    int32 tail;};

"head"代表请求队列的头部,"tail"代表请求队列的尾部,其初始值都为0。

最一开始时,第一个申请的CPU发现该队列的tail值是0,那么这个CPU会直接获取这把锁,并会把tail值更新为1,并在释放该锁时将head值更新为1。

在一般情况下当锁被持有的CPU释放时,该队列的head值会被加1,当其他CPU在试图获取这个锁时,锁的tail值获取到,然后把这个tail值加1,并存储在自己专属的寄存器当中,然后再把更新后的tail值更新到队列的tail当中。接下来就是不断地循环比较,判断该锁当前的"head"值,是否和自己存储在寄存器中的"tail"值相等,相等时则代表成功获得该锁。

TAS这类似于用户到政务大厅去办事时,首先要在叫号机取号,当工作人员广播叫到的号码与你手中的号码一致时,你就获取了办事柜台的所有权。

但是TAS却存在一定的效率问题,根据我们上文介绍的MESI协议,这个lock的头尾索引其实是在各个CPU之间共享的,因此tail和head频繁更新,还是会引发调整缓存不停的invalidate,这会极大的影响效率。

因此我们看到在.Net的实现中干脆就直接引入了threadCallbackList的队列,并不断将tme(ThreadMethodEntry)加入队尾,而接收消息的进程,则不断从队首获取消息

image.png

lock(threadCallbackList){                 if(threadCallbackMessage ==0){                     threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion +"_ThreadCallbackMessage");                 }                 threadCallbackList.Enqueue(tme);             }

当队首指向这个tme时,消息才被发送,其实是一种类似于MAS的实现,当然MAS实际是为每个CPU都建立了一个专属的队列,和Invoke的设计略有不同,不过基本的思想是一致的。

很多时候年少时不是品不出很多东西背后味道的,这也让我错过了很多非常值得总结的技术要点,因此在春节假期总结一下最近使用C#的心得,以飨读者,顺祝大家新春愉快!


相关文章
|
2月前
|
SQL 传感器 开发框架
今天我们聊聊C#的并发和并行
今天我们聊聊C#的并发和并行
61 1
|
5月前
|
数据库 C#
[C#] 在异步请求并发情况下,dbcontext的安全问题
摘要: 在多线程异步环境中,偶发的数据库修改失败可能因并发的`dbContext`操作引起,当一个线程的修改未保存时,另一线程尝试相同操作会导致错误。另外,单次执行成功但随后失败的情况可能源于`dbContext`的瞬时生命周期。若`saveChangesAsync()`在刷新页面请求到来前未完成,新的请求可能会尝试在写操作期间读取数据,从而引发问题。
|
安全 C#
C#并发实战Parallel.ForEach使用
C#并发实战Parallel.ForEach使用 前言:最近给客户开发一个伙食费计算系统,大概需要计算2000个人的伙食。需求是按照员工的预定报餐计划对消费记录进行检查,如有未报餐有刷卡或者有报餐没刷卡的要进行一定的金额扣减等一系列规则。
1119 0
|
安全 C#
C#并发处理-锁OR线程安全?
每次写博客,第一句话都是这样的:程序员很苦逼,除了会写程序,还得会写博客! 当然,题外话说多了,咱进入正题! 背景 基于任务的程序设计、命令式数据并行和任务并行都要求能够支持并发更新的数组、列表和集合。
2330 0
|
存储 SQL 数据库连接
C# 模拟并发
每次写博客,第一句话都是这样的:程序员很苦逼,除了会写程序,还得会写博客! 当然,题外话说多了,咱进入正题! 在处理大数据的时候,经常会发生并发,并发的情况发生后,会出现数据污读,从而产生脏数据。 首先通过一段程序进行说明、。
1193 0
|
安全 程序员 C#
C# 集合-并发处理-锁OR线程
每次写博客,第一句话都是这样的:程序员很苦逼,除了会写程序,还得会写博客!当然,希望将来的一天,某位老板看到此博客,给你的程序员职工加点薪资吧!因为程序员的世界除了苦逼就是沉默。我眼中的程序员大多都不爱说话,默默承受着编程的巨大压力,除了技术上的交流外,他们不愿意也不擅长和别人交流,更不乐意任何人走进他们的内心!    最近悟出来一个道理,在这儿分享给大家:学历代表你的过去,能力代表你的现在,学习代表你的将来。
1223 0
|
存储 C# 数据库
C# 数据库并发的解决方案(通用版、EF版)
还是那句老话:十年河东,十年河西,莫欺骚年穷!~_~ 打错个字,应该是莫欺少年穷! 学历代表你的过去,能力代表你的现在,学习代表你的将来。 学无止境,精益求精。 自ASP.NET诞生以来,微软提供了不少控制并发的方法,在了解这些控制并发的方法前,我们先来简单介绍下并发! 并发:同一时间或者同一时刻多个访问者同时访问某一更新操作时,会产生并发! 针对并发的处理,又分为悲观并发处理和乐观并发处理 所谓悲观/乐观并发处理,可以这样理解: 悲观者认为:在程序的运行过程中,并发很容易发生滴,因此,悲观者提出了他们的处理模式:在我执行一个方法时,不允许其他访问者介入这个方法。
3206 0
|
监控 测试技术 C#
C#高性能大容量SOCKET并发(一):IOCP完成端口例子介绍
原文:C#高性能大容量SOCKET并发(一):IOCP完成端口例子介绍 例子主要包括SocketAsyncEventArgs通讯封装、服务端实现日志查看、SCOKET列表、上传、下载、远程文件流、吞吐量协议,用于测试SocketAsyncEventArgs的性能和压力,最大连接数支持65535个长连接,最高命令交互速度达到250MB/S(使用的是127.0.0.1的方式,相当于千兆网卡1Gb=125MB/S两倍的吞吐量)。
3249 0
|
C# 缓存
C#高性能大容量SOCKET并发(二):SocketAsyncEventArgs封装
原文:C#高性能大容量SOCKET并发(二):SocketAsyncEventArgs封装 1、SocketAsyncEventArgs介绍 SocketAsyncEventArgs是微软提供的高性能异步Socket实现类,主要为高性能网络服务器应用程序而设计,主要是为了避免在在异步套接字 I/O 量非常大时发生重复的对象分配和同步。
3536 0