[译]何时使用 Parallel.ForEach,何时使用 PLINQ

简介:

原作者: Pamela Vagata, Parallel Computing Platform Group, Microsoft Corporation

原文pdf:http://download.csdn.NET/detail/sqlchen/7509513

 

====================================================================

简介

当需要为多核机器进行优化的时候,最好先检查下你的程序是否有处理能够分割开来进行并行处理。(例如,有一个巨大的数据集合,其中的元素需要一个一个进行彼此独立的耗时计算)。

.net framework 4 中提供了 Parallel.ForEach 和 PLINQ 来帮助我们进行并行处理,本文探讨这两者的差别及适用的场景。

Parallel.ForEach

Parallel.ForEach 是 foreach 的多线程实现,他们都能对 IEnumerable<T> 类型对象进行遍历,Parallel.ForEach 的特殊之处在于它使用多线程来执行循环体内的代码段。

Parallel.ForEach 最常用的形式如下:

1
public  static  ParallelLoopResult ForEach<TSource>(  IEnumerable<TSource> source,        Action<TSource> body)   

PLINQ

PLINQ 也是一种对数据进行并行处理的编程模型,它通过 LINQ 的语法来实现类似 Parallel.ForEach 的多线程并行处理。


场景一:简单数据 之 独立操作的并行处理(使用 Parallel.ForEach)

示例代码:

1
2
3
4
public  static  void  IndependentAction(IEnumerable<T> source, Action<T> action) 
     Parallel.ForEach(source, element => action(element)); 

 理由:

1. 虽然 PLINQ 也提供了一个类似的 ForAll 接口,但它对于简单的独立操作太重量化了。
2. 使用 Parallel.ForEach 你还能够设定 ParallelOptions.MaxDegreeOfParalelism 参数(指定最多需要多少个线程),这样当 ThreadPool 资源匮乏(甚至当可用线程数<MaxDegreeOfParalelism)的时候, Parallel.ForEach 依然能够顺利运行,并且当后续有更多可用线程出现时,Parallel.ForEach 也能及时地利用这些线程。PLINQ 只能通过WithDegreeOfParallelism 方法来要求固定的线程数,即:要求了几个就是几个,不会多也不会少。

场景二:顺序数据 之 并行处理(使用 PLINQ 来维持数据顺序)

当输出的数据序列需要保持原始的顺序时采用 PLINQ 的 AsOrdered 方法非常简单高效。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public  static  void  GrayscaleTransformation(IEnumerable<Frame> Movie) 
     var  ProcessedMovie = 
         Movie 
         .AsParallel() 
         .AsOrdered() 
         .Select(frame => ConvertToGrayscale(frame)); 
   
     foreach  ( var  grayscaleFrame  in  ProcessedMovie) 
    
         // Movie frames will be evaluated lazily 
    

 理由:

1. Parallel.ForEach 实现起来需要绕一些弯路,首先你需要使用以下的重载在方法:

1
2
3
public  static  ParallelLoopResult ForEach<TSource >( 
     IEnumerable<TSource> source, 
     Action<TSource, ParallelLoopState, Int64> body) 

 这个重载的 Action 多包含了 index  参数,这样你在输出的时候就能利用这个值来维持原先的序列顺序。请看下面的例子:

1
2
3
4
5
6
7
8
public  static  double  [] PairwiseMultiply( double [] v1,  double [] v2) 
     var  length = Math.Min(v1.Length, v2.Lenth); 
     double [] result =  new  double [length]; 
     Parallel.ForEach(v1, (element, loopstate, elementIndex) => 
         result[elementIndex] = element * v2[elementIndex]); 
     return  result; 

 
你可能已经意识到这里有个明显的问题:我们使用了固定长度的数组。如果传入的是 IEnumerable 那么你有4个解决方案:

(1) 调用 IEnumerable.Count() 来获取数据长度,然后用这个值实例化一个固定长度的数组,然后使用上例的代码。

(2) The second option would be to materialize the original collection before using it; in the event that your input data set is prohibitively large, neither of the first two options will be feasible.(没看懂贴原文)

(3) 第三种方式是采用返回一个哈希集合的方式,这种方式下通常需要至少2倍于传入数据的内存,所以处理大数据时请慎用。

(4) 自己实现排序算法(保证传入数据与传出数据经过排序后次序一致)

2. 相比之下 PLINQ 的 AsOrdered 方法如此简单,而且该方法能处理流式的数据,从而允许传入数据是延迟实现的(lazy materialized)

场景三:流数据 之 并行处理(使用 PLINQ)

PLINQ 能输出流数据,这个特性在一下场合非常有用:

1. 结果集不需要是一个完整的处理完毕的数组,即:任何时间点下内存中仅保持数组中的部分信息

2. 你能够在一个单线程上遍历输出结果(就好像他们已经存在/处理完了)

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public  static  void  AnalyzeStocks(IEnumerable<Stock> Stocks) 
     var  StockRiskPortfolio = 
         Stocks 
         .AsParallel() 
         .AsOrdered() 
         .Select(stock =>  new  { Stock = stock, Risk = ComputeRisk(stock)}) 
         .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk)); 
   
     foreach  ( var  stockRisk  in  StockRiskPortfolio) 
    
         SomeStockComputation(stockRisk.Risk); 
         // StockRiskPortfolio will be a stream of results 
    

 

这里使用一个单线程的 foreach 来对 PLINQ 的输出进行后续处理,通常情况下 foreach 不需要等待 PLINQ 处理完所有数据就能开始运作。

PLINQ 也允许指定输出缓存的方式,具体可参照 PLINQ 的 WithMergeOptions 方法,及 ParallelMergeOptions 枚举

场景四:处理两个集合(使用 PLINQ)

PLINQ 的 Zip 方法提供了同时遍历两个集合并进行结合元算的方法,并且它可以与其他查询处理操作结合,实现非常复杂的机能。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public  static  IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) 
     return 
        
         .AsParallel() 
         .AsOrdered() 
         .Select(element => ExpensiveComputation(element)) 
         .Zip( 
            
             .AsParallel() 
             .AsOrdered() 
             .Select(element => DifferentExpensiveComputation(element)), 
             (a_element, b_element) => Combine(a_element,b_element)); 

 示例中的两个数据源能够并行处理,当双方都有一个可用元素时提供给 Zip 进行后续处理(Combine)。

Parallel.ForEach 也能实现类似的 Zip 处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
public  static  IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) 
     var  numElements = Math.Min(a.Count(), b.Count()); 
     var  result =  new  T[numElements]; 
     Parallel.ForEach(a, 
         (element, loopstate, index) => 
        
             var  a_element = ExpensiveComputation(element); 
             var  b_element = DifferentExpensiveComputation(b.ElementAt(index)); 
             result[index] = Combine(a_element, b_element); 
         }); 
     return  result; 

 当然使用 Parallel.ForEach 后你就得自己确认是否要维持原始序列,并且要注意数组越界访问的问题。

场景五:线程局部变量

Parallel.ForEach 提供了一个线程局部变量的重载,定义如下:

1
2
3
4
5
public  static  ParallelLoopResult ForEach<TSource, TLocal>( 
     IEnumerable<TSource> source, 
     Func<TLocal> localInit, 
     Func<TSource, ParallelLoopState, TLocal,TLocal> body, 
     Action<TLocal> localFinally) 

 使用的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public  static  List<R> Filtering<T,R>(IEnumerable<T> source) 
     var  results =  new  List<R>(); 
     using  (SemaphoreSlim sem =  new  SemaphoreSlim(1)) 
    
         Parallel.ForEach(source, 
             () =>  new  List<R>(), 
             (element, loopstate, localStorage) => 
            
                 bool  filter = filterFunction(element); 
                 if  (filter) 
                     localStorage.Add(element); 
                 return  localStorage; 
             }, 
             (finalStorage) => 
            
                 lock (myLock) 
                
                     results.AddRange(finalStorage) 
                 }; 
             }); 
    
     return  results; 

 线程局部变量有什么优势呢?请看下面的例子(一个网页抓取程序):

1
2
3
4
5
6
7
8
9
10
public  static  void  UnsafeDownloadUrls () 
     WebClient webclient =  new  WebClient(); 
     Parallel.ForEach(urls, 
         (url,loopstate,index) => 
        
             webclient.DownloadFile(url, filenames[index] +  ".dat" ); 
             Console.WriteLine( "{0}:{1}" , Thread.CurrentThread.ManagedThreadId, url); 
         }); 

 通常第一版代码是这么写的,但是运行时会报错“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。这是因为多个线程无法同时访问同一个 WebClient 对象。所以我们会把 WebClient 对象定义到线程中来:

1
2
3
4
5
6
7
8
9
10
public  static  void  BAD_DownloadUrls () 
     Parallel.ForEach(urls, 
         (url,loopstate,index) => 
        
             WebClient webclient =  new  WebClient(); 
             webclient.DownloadFile(url, filenames[index] +  ".dat" ); 
             Console.WriteLine( "{0}:{1}" , Thread.CurrentThread.ManagedThreadId, url); 
         }); 

 修改之后依然有问题,因为你的机器不是服务器,大量实例化的 WebClient 迅速达到你机器允许的虚拟连接上限数。线程局部变量可以解决这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
public  static  void  downloadUrlsSafe() 
     Parallel.ForEach(urls, 
         () =>  new  WebClient(), 
         (url, loopstate, index, webclient) => 
        
             webclient.DownloadFile(url, filenames[index]+ ".dat" ); 
             Console.WriteLine( "{0}:{1}" , Thread.CurrentThread.ManagedThreadId, url); 
             return  webclient; 
         }, 
             (webclient) => { }); 

 这样的写法保证了我们能获得足够的 WebClient 实例,同时这些 WebClient 实例彼此隔离仅仅属于各自关联的线程。

虽然 PLINQ 提供了 ThreadLocal<T> 对象来实现类似的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
public  static  void  downloadUrl() 
     var  webclient =  new  ThreadLocal<WebClient>(()=>  new  WebClient ()); 
     var  res = 
         urls 
         .AsParallel() 
         .ForAll( 
             url => 
            
                 webclient.Value.DownloadFile(url, host[url] + ".dat" )); 
                 Console.WriteLine( "{0}:{1}" , Thread.CurrentThread.ManagedThreadId, url); 
             }); 

 但是请注意:ThreadLocal<T> 相对而言开销更大!

场景五:退出操作 (使用 Parallel.ForEach)

Parallel.ForEach 有个重载声明如下,其中包含一个 ParallelLoopState 对象:

 

1
2
3
public  static  ParallelLoopResult ForEach<TSource >( 
     IEnumerable<TSource> source, 
     Action<TSource, ParallelLoopState> body) 

ParallelLoopState.Stop() 提供了退出循环的方法,这种方式要比其他两种方法更快。这个方法通知循环不要再启动执行新的迭代,并尽可能快的推出循环。

ParallelLoopState.IsStopped 属性可用来判定其他迭代是否调用了 Stop 方法。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public  static  boolean FindAny<T,T>(IEnumerable<T> TSpace, T match)  where  T: IEqualityComparer<T> 
     var  matchFound =  false
     Parallel.ForEach(TSpace, 
         (curValue, loopstate) => 
            
                 if  (curValue.Equals(match) ) 
                
                     matchFound =  true
                     loopstate.Stop(); 
                
             }); 
     return  matchFound; 

 ParallelLoopState.Break() 通知循环继续执行本元素前的迭代,但不执行本元素之后的迭代。最前调用 Break 的起作用,并被记录到 ParallelLoopState.LowestBreakIteration 属性中。这种处理方式通常被应用在一个有序的查找处理中,比如你有一个排序过的数组,你想在其中查找匹配元素的最小 index,那么可以使用以下的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public  static  int  FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match)  where  T: IEqualityComparer<T> 
     var  loopResult = Parallel.ForEach(source, 
         (curValue, loopState, curIndex) => 
        
             if  (curValue.Equals(match)) 
            
                 loopState.Break(); 
            
          }); 
     var  matchedIndex = loopResult.LowestBreakIteration; 
     return  matchedIndex.HasValue ? matchedIndex : -1; 

 虽然 PLINQ 也提供了退出的机制(cancellation token),但相对来说退出的时机并没有 Parallel.ForEach 那么及时。

 


本文转自yonghu86博客园博客,原文链接:http://www.cnblogs.com/huyong/p/6084456.html,如需转载请自行联系原作者

相关文章
|
数据可视化 数据处理
解决 FAILED: UDFArgumentException explode() takes an array or a map as a parameter 并理解炸裂函数和侧视图
解决 FAILED: UDFArgumentException explode() takes an array or a map as a parameter 并理解炸裂函数和侧视图
106 0
|
SQL 分布式计算 Spark
SPARK Expand问题的解决(由count distinct、group sets、cube、rollup引起的)
SPARK Expand问题的解决(由count distinct、group sets、cube、rollup引起的)
754 0
SPARK Expand问题的解决(由count distinct、group sets、cube、rollup引起的)
|
分布式计算 Spark
Spark - ReturnStatementInClosureException: Return statements aren‘t allowed in Spark closures
Spark 使用 RDD 调用 Filter 函数时,dirver 端卡住,报错 ReturnStatementInClosureException: Return statements aren't allowed in Spark closures,即闭包内无法使用 return 函数。
352 0
Spark - ReturnStatementInClosureException: Return statements aren‘t allowed in Spark closures
|
JSON Cloud Native 数据建模
Parallel 解析
从 Knative Eventing 0.8 开始,支持根据不同的过滤条件对事件进行选择处理。通过 Parallel 提供了这样的能力。本文就给大家介绍一下这个特性。
1207 0
Parallel 解析
|
安全 C#
C#并发实战Parallel.ForEach使用
C#并发实战Parallel.ForEach使用 前言:最近给客户开发一个伙食费计算系统,大概需要计算2000个人的伙食。需求是按照员工的预定报餐计划对消费记录进行检查,如有未报餐有刷卡或者有报餐没刷卡的要进行一定的金额扣减等一系列规则。
1119 0
|
分布式计算
SparkStreaming中foreachRDD、foreachPartition和foreach 及序列化问题
最近项目遇到报错序列化相关问题,于是把这三个拿出来分析一下,先来看下foreachRDD、foreachPartition和foreach的不同之处。不同主要在于它们的作用范围不同,foreachRDD作用于DStream中每一个时间间隔的RDD,foreachPartition作用于每一个时间间隔的RDD中的每一个partition,foreach作用于每一个时间间隔的RDD中的每一个元素。
3055 0

热门文章

最新文章