.Net并行编程(一)-TPL之数据并行

简介: 前言 许多个人计算机和工作站都有多个CPU核心,可以同时执行多个线程。利用硬件的特性,使用并行化代码以在多个处理器之间分配工作。 应用场景 文件批量上传并行上传单个文件。也可以把一个文件拆成几段分开上传,加快上传速度。

前言

许多个人计算机和工作站都有多个CPU核心,可以同时执行多个线程。利用硬件的特性,使用并行化代码以在多个处理器之间分配工作。
.NetFramework并行编程架构图

应用场景

  • 文件批量上传

并行上传单个文件。也可以把一个文件拆成几段分开上传,加快上传速度。

  • 数据分批计算

如几百万数据可以拆成许多无关联的部分,并行计算处理。最后聚合。

  • 数据推送

也是需要将数据拆解后,并行推送。

任务并行库-数据并行

如果在一个循环内在每次迭代只执行少量工作或者它没有运行多次迭代,那么并行化的开销可能会导致代码运行的更慢。使用并行之前,应该对线程(锁,死锁,竞争条件)应该有基本的了解。

Parallel.For

        /// <summary>
        /// 正常循环
        /// </summary>
        public void FormalDirRun()
        {
            long totalSize = 0;
            var dir = @"E:\LearnWall\orleans";//args[1];
            String[] files = Directory.GetFiles(dir);
            stopwatch.Restart();
            for (var i = 0; i < files.Length; i++)
            {
                FileInfo fi = new FileInfo(files[i]);
                long size = fi.Length;
                Interlocked.Add(ref totalSize, size);
            }
            stopwatch.Stop();
            Console.WriteLine($"FormalDirRun------{files.Length} files, {totalSize} bytes,time:{stopwatch.ElapsedMilliseconds},Dir:{dir}");
        }
        /// <summary>
        /// 并行循环
        /// </summary>
        public void ParallelForDirRun()
        {
            long totalSize = 0;
            var dir = @"E:\LearnWall\orleans";//args[1];
            String[] files = Directory.GetFiles(dir);
            stopwatch.Restart();
            Parallel.For(0, files.Length,
                         index =>
                         {
                             FileInfo fi = new FileInfo(files[index]);
                             long size = fi.Length;
                             Interlocked.Add(ref totalSize, size);
                         });
            stopwatch.Stop();
            Console.WriteLine($"ParallelForDirRun-{files.Length} files, {totalSize} bytes,time:{stopwatch.ElapsedMilliseconds},Dir:{dir}");
        }
        

从下图对比接口可以看出当循环体内方法执行时间很短时,并行时间反而更长。这块会有更细致的补充。

FormalDirRun------20 files, 255618 bytes,time:0,Dir:E:\LearnWall\orleans
ParallelForDirRun-20 files, 255618 bytes,time:6,Dir:E:\LearnWall\orleans

我们追加一些延时操作如Thread.Sleep,但这应该不是好好例子...但我只想演示效果就行了。

Thread.Sleep(1000);

查看结果得到,当方法内有阻塞延时一秒后,两者速度错了七倍。

FormalDirRun------20 files, 255618 bytes,time:20011,Dir:E:\LearnWall\orleans
ParallelForDirRun-20 files, 255618 bytes,time:3007,Dir:E:\LearnWall\orleans

矩阵和秒表示例

Parallel.ForEach

为了并行速度的最大化,我们应该尽量减少在并行内对共享资源的访问,如Console.Write,文件日志等...但这里为了显示效果,就用了。

 public void ParallelForEachDirRun()
        {
            long totalSize = 0;
            var dir = @"E:\LearnWall\orleans";//args[1];
            String[] files = Directory.GetFiles(dir);
            stopwatch.Restart();
            Parallel.ForEach(files, (current) =>
            {
                FileInfo fi = new FileInfo(current);
                long size = fi.Length;
                Interlocked.Add(ref totalSize, size);
                Console.WriteLine($"name:{fi.Name}");
            });
            stopwatch.Stop();
            Console.WriteLine($"ParallelForEachDirRun-{files.Length} files, {totalSize} bytes,Time:{stopwatch.ElapsedMilliseconds}");
        }        
name:.gitignore
name:build.sh
.
.
.
name:TestAll.cmd
ParallelForEachDirRun-20 files, 255618 bytes,Time:17

Parallel.For 线程局部变量

  public void ParallelForForThreadLocalVariables()
        {
            int[] nums = Enumerable.Range(0, 1000000).ToArray();
            long total = 0;

            // Use type parameter to make subtotal a long, not an int
            Parallel.For<long>(0, nums.Length, () => 0, (j,loop, subtotal) =>
            {
                subtotal += nums[j];
                return subtotal;
            },
                (x) => Interlocked.Add(ref total, x)
            );

            Console.WriteLine("The total is {0:N0}", total);
            Console.WriteLine("Press any key to exit");
            Console.ReadKey();
        }

结果如下:

The total is 499,999,509,000

每个For方法的前两个参数指定开始和结束迭代值。在此方法的重载中,第三个参数是初始化本地状态的位置。在此上下文中,本地状态表示一个变量,其生命周期从当前线程上的循环的第一次迭代之前延伸到最后一次迭代之后。

第三个参数的类型是Func ,其中TResult是将存储线程本地状态的变量的类型。它的类型由调用泛型For (Int32,Int32,Func ,Func ,Action )方法时提供的泛型类型参数定义,在这种情况下是Int64。type参数告诉编译器将用于存储线程局部状态的临时变量的类型。在此示例中,表达式() => 0(或Function() 0在Visual Basic中)将线程局部变量初始化为零。如果泛型类型参数是引用类型或用户定义的值类型,则表达式如下所示:

() => new MyClass()  

这块内容比较繁琐,一句话来说:前两个参数是开始和结束值,第三个是根据For泛型而初始化的值。我其实也没看太懂这块。.net Framework源码如下,.netcore的不知道:

 public static ParallelLoopResult For<TLocal>(
            int fromInclusive, int toExclusive,
            Func<TLocal> localInit,
            Func<int, ParallelLoopState, TLocal, TLocal> body,
            Action<TLocal> localFinally)
        {
            if (body == null)
            {
                throw new ArgumentNullException("body");
            }
            if (localInit == null)
            {
                throw new ArgumentNullException("localInit");
            }
            if (localFinally == null)
            {
                throw new ArgumentNullException("localFinally");
            }
 
            return ForWorker(
                fromInclusive, toExclusive, s_defaultParallelOptions,
                null, null, body, localInit, localFinally);
        }
        
        /// </summary>
        /// <typeparam name="TLocal">本地数据的类型.</typeparam>
        /// <param name="fromInclusive">循环开始数</param>
        /// <param name="toExclusive">循环结束数</param>
        /// <param name="parallelOptions">选项</param>
        /// <param name="body">循环执行体</param>
        /// <param name="bodyWithState">ParallelState的循环体重载。</param>
        /// <param name="bodyWithLocal">线程局部状态的循环体重载。</param>
        /// <param name="localInit">一个返回新线程本地状态的选择器函数。</param>
        /// <param name="localFinally">清理线程本地状态的清理函数。</param>
        /// <remarks>只能提供一个身体参数(即它们是独占的)。</remarks>
        /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns>
        private static ParallelLoopResult ForWorker<TLocal>(
            int fromInclusive, int toExclusive,
            ParallelOptions parallelOptions,
            Action<int> body,
            Action<int, ParallelLoopState> bodyWithState,
            Func<int, ParallelLoopState, TLocal, TLocal> bodyWithLocal,
            Func<TLocal> localInit, Action<TLocal> localFinally)
        {
        .
        .
        .
        }

Parallel.ForEach线程局部变量

       /// <summary>
        /// 
        /// </summary>
        public void ParallelForEachThreadLocalVariables()
        {
            int[] nums = Enumerable.Range(0, 1000000).ToArray();
            long total = 0;

            // First type parameter is the type of the source elements
            // Second type parameter is the type of the thread-local variable (partition subtotal)
            Parallel.ForEach<int, long>(nums, // source collection
                                        () => 0, // method to initialize the local variable
                                        (j, loop, subtotal) => // method invoked by the loop on each iteration
                                     {
                                         subtotal += j; //modify local variable
                                         return subtotal; // value to be passed to next iteration
                                     },
                                        // Method to be executed when each partition has completed.
                                        // finalResult is the final value of subtotal for a particular partition.
                                        (finalResult) => Interlocked.Add(ref total, finalResult)
                                        );

            Console.WriteLine("The total from Parallel.ForEach is {0:N0}", total);
        }

ForEach的源码如下

        /// <summary>
        /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 
        /// in which iterations may run in parallel.
        /// </summary>
        /// <typeparam name="TSource">The type of the data in the source.</typeparam>
        /// <param name="source">An enumerable data source.</param>
        /// <param name="body">The delegate that is invoked once per iteration.</param>
        /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 
        /// argument is null.</exception>
        /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 
        /// argument is null.</exception>
        /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception
        /// thrown from one of the specified delegates.</exception>
        /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure
        /// that contains information on what portion of the loop completed.</returns>
        /// <remarks>
        /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 
        /// enumerable.  It is provided with the current element as a parameter.
        /// </remarks>
        public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
        {
            if (source == null)
            {
                throw new ArgumentNullException("source");
            }
            if (body == null)
            {
                throw new ArgumentNullException("body");
            }
 
            return ForEachWorker<TSource, object>(
                source, s_defaultParallelOptions, body, null, null, null, null, null, null);
        }

取消 Parallel.ForEach或Parallel.For

通过CancellationTokenSource来获取token

CancellationTokenSource cts = new CancellationTokenSource();

通过ParallelOptions.CancellationToken属性来控制取消状态。

ParallelOptions po = new ParallelOptions();

po.CancellationToken = cts.Token;

通过Parallel.For或Foreach的ParallelOptions值来控制并行内方法的取消。

代码如下:

 int[] nums = Enumerable.Range(0, 10000000).ToArray();
            CancellationTokenSource cts = new CancellationTokenSource();

            // Use ParallelOptions instance to store the CancellationToken
            ParallelOptions po = new ParallelOptions();
            po.CancellationToken = cts.Token;
            po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;
            Console.WriteLine("Press any key to start. Press 'c' to cancel.");
            Console.ReadKey();

            // Run a task so that we can cancel from another thread.
            Task.Factory.StartNew(() =>
            {
                var s = Console.ReadKey().KeyChar;
                if (s == 'c')
                    cts.Cancel();
                Console.WriteLine("press any key to exit111");
            });

            try
            {
                Parallel.ForEach(nums, po, (num) =>
                {
                    double d = Math.Sqrt(num);
                    Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId);
                    po.CancellationToken.ThrowIfCancellationRequested();
                });
            }
            catch (OperationCanceledException e)
            {
                Console.WriteLine(e.Message);
            }
            finally
            {
                cts.Dispose();
            }

            Console.ReadKey();

运行结果如下,键盘输入c时,并行取消。

1937.41838537782 on 7
2739.95711645274 on 8
2501.40660429287 on 9
2958.47798707376 on 10
.
.
.
press any key to exit111
The operation was canceled.

捕获并行体内的异常

示例方法采用ConcurrentQueue来接收异常集合,最后抛出一个聚合异常AggregateException。

var exceptions = new ConcurrentQueue();

exceptions.Enqueue(e);

外部调用AggregateException.Flatten方法获取异常信息。

这为我以后捕获异常提供了一个好思路。

        /// <summary>
        /// 捕获并行体内的异常
        /// </summary>
        public void HandleExceptionParallelLoop()
        {
            // Create some random data to process in parallel.
            // There is a good probability this data will cause some exceptions to be thrown.
            byte[] data = new byte[5000];
            Random r = new Random();
            r.NextBytes(data);

            try
            {
                ProcessDataInParallel(data);
            }
            catch (AggregateException ae)
            {
                var ignoredExceptions = new List<Exception>();
                // This is where you can choose which exceptions to handle.
                foreach (var ex in ae.Flatten().InnerExceptions)
                {
                    if (ex is ArgumentException)
                        Console.WriteLine(ex.Message);
                    else
                        ignoredExceptions.Add(ex);
                }
                if (ignoredExceptions.Count > 0) throw new AggregateException(ignoredExceptions);
            }

            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }
        private  void ProcessDataInParallel(byte[] data)
        {
            // Use ConcurrentQueue to enable safe enqueueing from multiple threads.
            var exceptions = new ConcurrentQueue<Exception>();

            // Execute the complete loop and capture all exceptions.
            Parallel.ForEach(data, d =>
            {
                try
                {
                    // Cause a few exceptions, but not too many.
                    if (d < 3)
                        throw new ArgumentException($"Value is {d}. Value must be greater than or equal to 3.");
                    else
                        Console.Write(d + " ");
                }
                // Store the exception and continue with the loop.                    
                catch (Exception e)
                {
                    exceptions.Enqueue(e);
                }
            });
            Console.WriteLine();

            // Throw the exceptions here after the loop completes.
            if (exceptions.Count > 0) throw new AggregateException(exceptions);
        }

对微小执行体提速

当Parallel.For循环有一个很快的执行体,它可能比同等顺序循环执行更慢。较慢的性能是由分区数据所涉及的开销和每次循环迭代调用委托的成本引起的。为了解决这种情况,Partitioner类提供了Partitioner.Create方法,该方法使您能够为委托主体提供顺序循环,以便每个分区仅调用一次委托,而不是每次迭代调用一次。

var rangePartitioner = Partitioner.Create(0, source.Length);

        /// <summary>
        /// 提速
        /// </summary>
        public void SpeedUpMicroParallelBody() {
            // Source must be array or IList.
            var source = Enumerable.Range(0, 100000).ToArray();

            // Partition the entire source array.
            var rangePartitioner = Partitioner.Create(0, source.Length);

            double[] results = new double[source.Length];

            // Loop over the partitions in parallel.
            Parallel.ForEach(rangePartitioner, (range, loopState) =>
            {
                // Loop over each range element without a delegate invocation.
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    results[i] = source[i] * Math.PI;
                }
            });

            Console.WriteLine("Operation complete. Print results? y/n");
            char input = Console.ReadKey().KeyChar;
            if (input == 'y' || input == 'Y')
            {
                foreach (double d in results)
                {
                    Console.Write("{0} ", d);
                }
            }
        }

源码地址

CsharpFanDemo

总结

本篇文章沿着微软官方文档步骤熟悉了第一部分数据并行的用法。

Parallel.For和Parallel.ForEach实现并行。

Parallel.For和Parallel.ForEach线程局部变量。

取消并行ParallelOptions.CancellationToken

捕捉异常ConcurrentQueue累加并行体内的异常,外部接收。

加速Partitioner.Create

感谢观看!

相关文章
|
4月前
|
存储 JSON 开发工具
Visual Studio编程效率提升技巧集(提高.NET编程效率)
Visual Studio编程效率提升技巧集(提高.NET编程效率)
Visual Studio编程效率提升技巧集(提高.NET编程效率)
|
16天前
|
传感器 数据采集 物联网
探索.NET nanoFramework:为嵌入式设备编程的新途径
探索.NET nanoFramework:为嵌入式设备编程的新途
32 7
|
18天前
|
SQL XML 关系型数据库
入门指南:利用NHibernate简化.NET应用程序的数据访问
【10月更文挑战第13天】NHibernate是一个面向.NET的开源对象关系映射(ORM)工具,它提供了从数据库表到应用程序中的对象之间的映射。通过使用NHibernate,开发者可以专注于业务逻辑和领域模型的设计,而无需直接编写复杂的SQL语句来处理数据持久化问题。NHibernate支持多种数据库,并且具有高度的灵活性和可扩展性。
36 2
|
3月前
|
大数据 开发工具 开发者
从零到英雄:.NET核心技术带你踏上编程之旅,构建首个应用,开启你的数字世界探险!
【8月更文挑战第28天】本文带领读者从零开始,使用强大的.NET平台搭建首个控制台应用。无论你是新手还是希望扩展技能的开发者,都能通过本文逐步掌握.NET的核心技术。从环境搭建到创建项目,再到编写和运行代码,详细步骤助你轻松上手。通过计算两数之和的小项目,你不仅能快速入门,还能为未来开发更复杂的应用奠定基础。希望本文为你的.NET学习之旅开启新篇章!
33 1
|
3月前
|
开发框架 .NET 数据库连接
闲话 Asp.Net Core 数据校验(三)EF Core 集成 FluentValidation 校验数据例子
闲话 Asp.Net Core 数据校验(三)EF Core 集成 FluentValidation 校验数据例子
|
3月前
|
存储 C#
揭秘C#.Net编程秘宝:结构体类型Struct,让你的数据结构秒变高效战斗机,编程界的新星就是你!
【8月更文挑战第4天】在C#编程中,结构体(`struct`)是一种整合多种数据类型的复合数据类型。与类不同,结构体是值类型,意味着数据被直接复制而非引用。这使其适合表示小型、固定的数据结构如点坐标。结构体默认私有成员且不可变,除非明确指定。通过`struct`关键字定义,可以包含字段、构造函数及方法。例如,定义一个表示二维点的结构体,并实现计算距离原点的方法。使用时如同普通类型,可通过实例化并调用其成员。设计时推荐保持结构体不可变以避免副作用,并注意装箱拆箱可能导致的性能影响。掌握结构体有助于构建高效的应用程序。
93 7
|
3月前
|
Java Spring 自然语言处理
Spring 框架里竟藏着神秘魔法?国际化与本地化的奇妙之旅等你来揭开谜底!
【8月更文挑战第31天】在软件开发中,国际化(I18N)与本地化(L10N)对于满足不同地区用户需求至关重要。Spring框架提供了强大支持,利用资源文件和`MessageSource`实现多语言文本管理。通过配置日期格式和货币符号,进一步完善本地化功能。合理应用这些特性,可显著提升应用的多地区适应性和用户体验。
39 0
|
3月前
|
传感器 数据采集 物联网
探索未来:.NET nanoFramework引领嵌入式设备编程革新之旅
【8月更文挑战第28天】.NET nanoFramework 是一款专为资源受限的嵌入式设备设计的轻量级、高性能框架,基于 .NET Core,采用 C# 进行开发,简化了传统底层硬件操作的复杂性,极大提升了开发效率。开发者可通过 Visual Studio 或 Visual Studio Code 快速搭建环境并创建项目,利用丰富的库和驱动程序轻松实现从基础 LED 控制到网络通信等多种功能,显著降低了嵌入式开发的门槛。
53 0
|
4月前
|
开发框架 JSON 前端开发
利用查询条件对象,在Asp.net Web API中实现对业务数据的分页查询处理
利用查询条件对象,在Asp.net Web API中实现对业务数据的分页查询处理
|
3月前
|
开发框架 前端开发 算法
分享 .NET EF6 查询并返回树形结构数据的 2 个思路和具体实现方法
分享 .NET EF6 查询并返回树形结构数据的 2 个思路和具体实现方法