浅谈.NET下的多线程和并行计算(十一).NET异步编程模型基础下

简介: 上次我们说了,要进行多线程编程,我们可以使用最原始的方式也是最灵活的方式进行,那就是Thread(ThreadPool)+信号量+锁+Control.Invoke。.NET的异步编程模型给我们提供了一种基于IAsyncResult的编程模式,它尤其适用于处理下面的应用场景: 1) 被阻止,正在等待...

上次我们说了,要进行多线程编程,我们可以使用最原始的方式也是最灵活的方式进行,那就是Thread(ThreadPool)+信号量+锁+Control.Invoke。.NET的异步编程模型给我们提供了一种基于IAsyncResult的编程模式,它尤其适用于处理下面的应用场景:

1) 被阻止,正在等待一个 IAsyncResult

2) 被阻止,正在等待多个 IAsyncResult 对象

3) 轮询 IAsyncResult 上的完成情形

.NET还提供了基于事件的异步编程模式(EAP),它能够提供:

1) 后台执行耗时任务(例如下载和数据库操作),但不会中断应用程序

2) 同时执行多个操作,每个操作完成时都会接到通知

下面是一个符合基于事件的异步编程模式的类:

public class AsyncExample
{
    public int Method1(string param);
    public void Method2(double param);

    public void Method1Async(string param);
    public void Method1Async(string param, object userState);
    public event Method1CompletedEventHandler Method1Completed;

    public void Method2Async(double param);
    public void Method2Async(double param, object userState);
    public event Method2CompletedEventHandler Method2Completed;

    public void CancelAsync(object userState);

    public bool IsBusy { get; }
}

我们看到它的操作一般提供同步和异步两种模式,异步操作提供一个完成事件,还提供了取消异步操作的方法。对于某些更高级的组件还提供了汇报进度的功能,通过一个进度汇报事件来完成,此事件通常将叫做 ProgressChanged 或 方法名称ProgressChanged,它对应的事件处理程序会带有一个 ProgressChangedEventArgs 参数。ProgressChanged 事件的事件处理程序可以检查 ProgressChangedEventArgs.ProgressPercentage 属性来确定异步任务完成的百分比。此属性的范围是 0 到 100,可用来更新 ProgressBar 的 Value 属性。

说到这里您可能想到了,BackgroundWorker就是遵从这种模式的组件。那么我们在设计组件的时候如何来选择实现基于事件的APM还是基于IAsyncResult的APM呢,MSDN上有这么一段指导原则:

1) 将基于事件的模式用作默认API 以公开类的异步行为。

2) 当类主要用在客户端应用程序(例如 Windows 窗体)中时,不要公开 IAsyncResult 模式。 (比如PictureBox的LoadAsync 方法以及LoadCompleted 事件

3) 仅在必须公开 IAsyncResult 模式才能满足要求时公开该模式。例如,需要与现有 API 兼容时可能需要公开 IAsyncResult 模式。

4) 不要在不公开基于事件的模式时公开 IAsyncResult 模式。 如果必须公开 IAsyncResult 模式,应将其作为高级选项公开。例如,如果生成一个代理对象,则应默认生成基于事件的模式,其中具有一个生成 IAsyncResult 模式的选项。

5) 在 IAsyncResult 模式实现上生成基于事件的模式实现。

6) 避免在同一个类上同时公开基于事件的模式和 IAsyncResult 模式。在“较高级别”的类上公开基于事件的模式,在“较低级别”的类上公开 IAsyncResult 模式。例如,比较 WebClient 组件上的基于事件的模式与 HttpRequest 类上的 IAsyncResult 模式。

来看一个WebClient的例子:

WebClient wc = new WebClient();
wc.Encoding = Encoding.UTF8;
wc.DownloadStringCompleted += new DownloadStringCompletedEventHandler(wc_DownloadStringCompleted);
wc.DownloadProgressChanged += new DownloadProgressChangedEventHandler(wc_DownloadProgressChanged);
wc.DownloadStringAsync(new Uri("http://www.cnblogs.com"), "test");
Console.WriteLine(DateTime.Now.ToString("mm:ss"));
Console.ReadLine();

进度更新事件处理方法:

static void wc_DownloadProgressChanged(object sender, DownloadProgressChangedEventArgs e)
{
    Console.WriteLine("{0} downloaded {1} of {2} bytes. {3} % complete...", (string)e.UserState, e.BytesReceived, e.TotalBytesToReceive, e.ProgressPercentage);
}

完成下载事件处理方法:

static void wc_DownloadStringCompleted(object sender, DownloadStringCompletedEventArgs e)
{
    Console.WriteLine(DateTime.Now.ToString("mm:ss"));
    Console.WriteLine(e.Result.Substring(0, 300));
}

程序输出结果:

image

我们可以看到WebClient的DownloadStringAsync方法在内部使用了WebRequest:

public void DownloadStringAsync(Uri address, object userToken)
{
    if (Logging.On)
    {
        Logging.Enter(Logging.Web, this, "DownloadStringAsync", address);
    }
    if (address == null)
    {
        throw new ArgumentNullException("address");
    }
    this.InitWebClientAsync();
    this.ClearWebClientState();
    AsyncOperation asyncOp = AsyncOperationManager.CreateOperation(userToken);
    this.m_AsyncOp = asyncOp;
    try
    {
        WebRequest request = this.m_WebRequest = this.GetWebRequest(this.GetUri(address));
        this.DownloadBits(request, null, new CompletionDelegate(this.DownloadStringAsyncCallback), asyncOp);
    }
    catch (Exception exception)
    {
        if (((exception is ThreadAbortException) || (exception is StackOverflowException)) || (exception is OutOfMemoryException))
        {
            throw;
        }
        if (!(exception is WebException) && !(exception is SecurityException))
        {
            exception = new WebException(SR.GetString("net_webclient"), exception);
        }
        this.DownloadStringAsyncCallback(null, exception, asyncOp);
    }
    catch
    {
        Exception exception2 = new WebException(SR.GetString("net_webclient"), new Exception(SR.GetString("net_nonClsCompliantException")));
        this.DownloadStringAsyncCallback(null, exception2, asyncOp);
    }
    if (Logging.On)
    {
        Logging.Exit(Logging.Web, this, "DownloadStringAsync", "");
    }
}

而且,使用了WebRequest的基于IAsyncResult的APM,可以看看DownloadBits的定义:

private byte[] DownloadBits(WebRequest request, Stream writeStream, CompletionDelegate completionDelegate, AsyncOperation asyncOp)
{
    WebResponse response = null;
    DownloadBitsState state = new DownloadBitsState(request, writeStream, completionDelegate, asyncOp, this.m_Progress, this);
    if (state.Async)
    {
        request.BeginGetResponse(new AsyncCallback(WebClient.DownloadBitsResponseCallback), state);
        return null;
    }
    response = this.m_WebResponse = this.GetWebResponse(request);
    int bytesRetrieved = state.SetResponse(response);
    while (!state.RetrieveBytes(ref bytesRetrieved))
    {
    }
    state.Close();
    return state.InnerBuffer;
}

在这里BeginGetResponse(),DownloadBitsResponseCallback回调方法如下:

private static void DownloadBitsResponseCallback(IAsyncResult result)
{
    DownloadBitsState asyncState = (DownloadBitsState)result.AsyncState;
    WebRequest request = asyncState.Request;
    Exception exception = null;
    try
    {
        WebResponse webResponse = asyncState.WebClient.GetWebResponse(request, result);
        asyncState.WebClient.m_WebResponse = webResponse;
        asyncState.SetResponse(webResponse);
    }
    catch (Exception exception2)
    {
        if (((exception2 is ThreadAbortException) || (exception2 is StackOverflowException)) || (exception2 is OutOfMemoryException))
        {
            throw;
        }
        exception = exception2;
        if (!(exception2 is WebException) && !(exception2 is SecurityException))
        {
            exception = new WebException(SR.GetString("net_webclient"), exception2);
        }
        AbortRequest(request);
        if ((asyncState != null) && (asyncState.WriteStream != null))
        {
            asyncState.WriteStream.Close();
        }
    }
    finally
    {
        if (exception != null)
        {
            asyncState.CompletionDelegate(null, exception, asyncState.AsyncOp);
        }
    }
}

很显然,在WebClient.GetWebResponse中我们会进行EndGetResponse()操作:

protected virtual WebResponse GetWebResponse(WebRequest request, IAsyncResult result)
{
    WebResponse response = request.EndGetResponse(result);
    this.m_WebResponse = response;
    return response;
}

那么继续看看SetResponse:

internal int SetResponse(WebResponse response)
       {
           this.ContentLength = response.ContentLength;
           if ((this.ContentLength == -1L) || (this.ContentLength > 0x10000L))
           {
               this.Length = 0x10000L;
           }
           else
           {
               this.Length = this.ContentLength;
           }
           if (this.WriteStream == null)
           {
               if (this.ContentLength > 0x7fffffffL)
               {
                   throw new WebException(SR.GetString("net_webstatus_MessageLengthLimitExceeded"), WebExceptionStatus.MessageLengthLimitExceeded);
               }
               this.SgBuffers = new ScatterGatherBuffers(this.Length);
           }
           this.InnerBuffer = new byte[(int)this.Length];
           this.ReadStream = response.GetResponseStream();
           if (this.Async && (response.ContentLength >= 0L))
           {
               this.Progress.TotalBytesToReceive = response.ContentLength;
           }
           if (this.Async)
           {
               if ((this.ReadStream == null) || (this.ReadStream == Stream.Null))
               {
                   WebClient.DownloadBitsReadCallbackState(this, null);
               }
               else
               {
                   this.ReadStream.BeginRead(this.InnerBuffer, this.Offset, ((int)this.Length) - this.Offset, new AsyncCallback(WebClient.DownloadBitsReadCallback), this);
               }
               return -1;
           }
           if ((this.ReadStream != null) && (this.ReadStream != Stream.Null))
           {
               return this.ReadStream.Read(this.InnerBuffer, this.Offset, ((int)this.Length) - this.Offset);
           }
           return 0;
       }

关注下ReadStream.BeginRead()的回调方法:

private static void DownloadBitsReadCallback(IAsyncResult result)
{
    DownloadBitsState asyncState = (DownloadBitsState)result.AsyncState;
    DownloadBitsReadCallbackState(asyncState, result);
}

继续看看DownloadBitsReadCallbackState:

private static void DownloadBitsReadCallbackState(DownloadBitsState state, IAsyncResult result)
{
    Stream readStream = state.ReadStream;
    Exception exception = null;
    bool flag = false;
    try
    {
        int bytesRetrieved = 0;
        if ((readStream != null) && (readStream != Stream.Null))
        {
            bytesRetrieved = readStream.EndRead(result);
        }
        flag = state.RetrieveBytes(ref bytesRetrieved);
    }
    catch (Exception exception2)
    {
        flag = true;
        if (((exception2 is ThreadAbortException) || (exception2 is StackOverflowException)) || (exception2 is OutOfMemoryException))
        {
            throw;
        }
        exception = exception2;
        state.InnerBuffer = null;
        if (!(exception2 is WebException) && !(exception2 is SecurityException))
        {
            exception = new WebException(SR.GetString("net_webclient"), exception2);
        }
        AbortRequest(state.Request);
        if ((state != null) && (state.WriteStream != null))
        {
            state.WriteStream.Close();
        }
    }
    finally
    {
        if (flag)
        {
            if (exception == null)
            {
                state.Close();
            }
            state.CompletionDelegate(state.InnerBuffer, exception, state.AsyncOp);
        }
    }
}

在这里EndRead(),再来看看再来看看RetrieveBytes()方法:

internal bool RetrieveBytes(ref int bytesRetrieved)
{
    if (bytesRetrieved > 0)
    {
        if (this.WriteStream != null)
        {
            this.WriteStream.Write(this.InnerBuffer, 0, bytesRetrieved);
        }
        else
        {
            this.SgBuffers.Write(this.InnerBuffer, 0, bytesRetrieved);
        }
        if (this.Async)
        {
            this.Progress.BytesReceived += (long)bytesRetrieved;
        }
        if (this.Offset != this.ContentLength)
        {
            if (this.Async)
            {
                this.WebClient.PostProgressChanged(this.AsyncOp, this.Progress);
                this.ReadStream.BeginRead(this.InnerBuffer, this.Offset, ((int)this.Length) - this.Offset, new AsyncCallback(WebClient.DownloadBitsReadCallback), this);
            }
            else
            {
                bytesRetrieved = this.ReadStream.Read(this.InnerBuffer, this.Offset, ((int)this.Length) - this.Offset);
            }
            return false;
        }
    }
    if (this.Async)
    {
        if (this.Progress.TotalBytesToReceive < 0L)
        {
            this.Progress.TotalBytesToReceive = this.Progress.BytesReceived;
        }
        this.WebClient.PostProgressChanged(this.AsyncOp, this.Progress);
    }
    if (this.ReadStream != null)
    {
        this.ReadStream.Close();
    }
    if (this.WriteStream != null)
    {
        this.WriteStream.Close();
    }
    else if (this.WriteStream == null)
    {
        byte[] dst = new byte[this.SgBuffers.Length];
        if (this.SgBuffers.Length > 0)
        {
            BufferOffsetSize[] buffers = this.SgBuffers.GetBuffers();
            int dstOffset = 0;
            for (int i = 0; i < buffers.Length; i++)
            {
                BufferOffsetSize size = buffers[i];
                Buffer.BlockCopy(size.Buffer, 0, dst, dstOffset, size.Size);
                dstOffset += size.Size;
            }
        }
        this.InnerBuffer = dst;
    }
    return true;
}

WebClient的PostProgressChanged方法,在汇报进度的时候调用了AsyncOperation的Post方法:

private void PostProgressChanged(AsyncOperation asyncOp, ProgressData progress)
{
    if ((asyncOp != null) && ((progress.BytesSent + progress.BytesReceived) > 0L))
    {
        int num;
        if (progress.HasUploadPhase)
        {
            if ((progress.TotalBytesToReceive < 0L) && (progress.BytesReceived == 0L))
            {
                num = (progress.TotalBytesToSend < 0L) ? 0 : ((progress.TotalBytesToSend == 0L) ? 50 : ((int)((50L * progress.BytesSent) / progress.TotalBytesToSend)));
            }
            else
            {
                num = (progress.TotalBytesToSend < 0L) ? 50 : ((progress.TotalBytesToReceive == 0L) ? 100 : ((int)(((50L * progress.BytesReceived) / progress.TotalBytesToReceive) + 50L)));
            }
            asyncOp.Post(this.reportUploadProgressChanged, new UploadProgressChangedEventArgs(num, asyncOp.UserSuppliedState, progress.BytesSent, progress.TotalBytesToSend, progress.BytesReceived, progress.TotalBytesToReceive));
        }
        else
        {
            num = (progress.TotalBytesToReceive < 0L) ? 0 : ((progress.TotalBytesToReceive == 0L) ? 100 : ((int)((100L * progress.BytesReceived) / progress.TotalBytesToReceive)));
            asyncOp.Post(this.reportDownloadProgressChanged, new DownloadProgressChangedEventArgs(num, asyncOp.UserSuppliedState, progress.BytesReceived, progress.TotalBytesToReceive));
        }
    }
}

MSDN中有这么一段描述:为了使类正确运行,应当使用给定应用程序模型(包括 ASP.NET 和 Windows 窗体应用程序)的适当线程或上下文调用客户端事件处理程序,这一点很重要。我们提供了两个重要的帮助器类,以确保您的异步类在任何应用程序模型中都能正确运行,这两个帮助器类是 AsyncOperation 和 AsyncOperationManager。AsyncOperationManager 提供了 CreateOperation 方法,该方法会返回一个 AsyncOperation。方法名称Async 方法调用 CreateOperation,类使用返回的 AsyncOperation 跟踪异步任务的生存期。若要向客户端报告进度、增量结果和完成,请调用 AsyncOperation 的 Post 和 OperationCompleted 方法。AsyncOperation 负责将对客户端事件处理程序的调用封送到适当的线程和上下文。

7) 当为了提供兼容性需要在同一个类上公开基于事件的模式和 IAsyncResult 模式时,同时公开这两种模式。例如,如果已经释放了一个使用 IAsyncResult 模式的 API,则需要保留 IAsyncResult 模式以提供向后兼容性。

8)如果得到的对象模型复杂性方面的优点大于分开实现的优点,则在同一个类上实现基于事件的模式和 IAsyncResult 模式。在一个类上同时公开两种模式比避免公开基于事件的模式效果更好。

9) 如果必须在同一个类上同时公开基于事件的模式和 IAsyncResult 模式,可使用设置为 Advanced 的 EditorBrowsableAttribute 将 IAsyncResult 模式实现标记为高级功能。这指示设计环境(如 Visual Studio IntelliSense)不显示 IAsyncResult 属性和方法。这些属性和方法仍然是完全可用的,但使用 IntelliSense 的开发人员能够更清楚地查看 API。

 

我们可以发现从自己实现异步编程,到使用基于IAsyncResult的APM到使用基于事件的APM,使用上越来越简单,封装的也越来越厉害(自己实现一个WebClient的工作量也很大)。之所以有APM这种模式是因为.NET的BCL不管是基于线程池的异步操作还是基于IOCP的异步操作都遵从了APM,对于开发人员来说不需要过多学习就可以掌握大多数提供了异步能力的类库的使用。而对于组件或类库设计师来说就更要遵循这个模式,并且要根据需要选择提供基于IAsyncResult的APM还是基于事件的APM还是两者都提供。通过这两篇文章的介绍您应该已经知道了如何去使用基于APM的类库,在之后高级系列文章中我们会介绍如何去实现自己的APM组件。

在这十篇文章中我们介绍了线程/线程池/线程同步/基于多线程的组件/异步编程模式,这些文章中用到的例子可以在这里下载(VS2008解决方案)。

作者: lovecindywang
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关实践学习
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
本场景将自定义告警信息同时分发至多个通知渠道的需求,例如短信、电子邮件及钉钉群组等。通过采用轻量消息队列(原 MNS)的主题模型的HTTP订阅方式,并结合应用实时监控服务提供的自定义集成能力,使得您能够以简便的配置方式实现上述多渠道同步通知的功能。
相关文章
|
6月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
364 2
|
11月前
|
安全 Java 调度
Netty源码—3.Reactor线程模型二
本文主要介绍了NioEventLoop的执行总体框架、Reactor线程执行一次事件轮询、Reactor线程处理产生IO事件的Channel、Reactor线程处理任务队列之添加任务、Reactor线程处理任务队列之执行任务、NioEventLoop总结。
|
11月前
|
安全 Java
Netty源码—2.Reactor线程模型一
本文主要介绍了关于NioEventLoop的问题整理、理解Reactor线程模型主要分三部分、NioEventLoop的创建和NioEventLoop的启动。
|
缓存 NoSQL 中间件
Redis的线程模型
Redis采用单线程模型确保操作的原子性,每次只执行一个操作,避免并发冲突。它通过MULTI/EXEC事务机制、Lua脚本和复合指令(如MSET、GETSET等)保证多个操作要么全成功,要么全失败,确保数据一致性。Redis事务在EXEC前失败则不执行任何操作,EXEC后失败不影响其他操作。Pipeline虽高效但不具备原子性,适合非热点时段的数据调整。Redis 7引入Function功能,支持函数复用,简化复杂业务逻辑。总结来说,Redis的单线程模型简单高效,适用于高并发场景,但仍需合理选择指令执行方式以发挥其性能优势。
324 6
|
人工智能 自动驾驶 数据可视化
D1net阅闻 | ChatGPT支持所有用户使用搜索功能之时,谷歌也开放了最强模型
D1net阅闻 | ChatGPT支持所有用户使用搜索功能之时,谷歌也开放了最强模型
|
人工智能 搜索推荐 机器人
D1net阅闻|据悉微软致力于在365 Copilot产品中添加非OpenAI模型
D1net阅闻|据悉微软致力于在365 Copilot产品中添加非OpenAI模型
|
人工智能 机器人 量子技术
D1net阅闻 | 李飞飞团队训练出媲美DeepSeek R1的推理模型 云计算费用不到50美元
D1net阅闻 | 李飞飞团队训练出媲美DeepSeek R1的推理模型 云计算费用不到50美元
|
人工智能 5G 数据库
D1net阅闻|谷歌被曝正使用Anthropic的Claude模型来改进其Gemini AI
D1net阅闻|谷歌被曝正使用Anthropic的Claude模型来改进其Gemini AI
|
开发框架 Java .NET
.net core 非阻塞的异步编程 及 线程调度过程
【11月更文挑战第12天】本文介绍了.NET Core中的非阻塞异步编程,包括其基本概念、实现方式及应用示例。通过`async`和`await`关键字,程序可在等待I/O操作时保持线程不被阻塞,提高性能。文章还详细说明了异步方法的基础示例、线程调度过程、延续任务机制、同步上下文的作用以及如何使用`Task.WhenAll`和`Task.WhenAny`处理多个异步任务的并发执行。
394 1
|
Java API 调度
【JavaEE】——多线程(join阻塞,计算,引用,状态)
【JavaEE】——多线程,join,sleep引起的线程阻塞,多线程提升计算效率,如何获取线程的引用和状态