在上一篇博客5天玩转C#并行和多线程编程 —— 第一天 认识Parallel中,我们学习了Parallel的用法。并行编程,本质上是多线程的编程,那么当多个线程同时处理一个任务的时候,必然会出现资源访问问题,及所谓的线程安全。就像现实中,我们开发项目,就是一个并行的例子,把不同的模块分给不同的人,同时进行,才能在短的时间内做出大的项目。如果大家都只管自己写自己的代码,写完后发现合并不到一起,那么这种并行就没有了意义。 并行算法的出现,随之而产生的也就有了并行集合,及线程安全集合;微软向的也算周到,没有忘记linq,也推出了linq的并行版本,plinq – Parallel Linq. 一、并行集合 —— 线程安全集合 并行计算使用的多个线程同时进行计算,所以要控制每个线程对资源的访问,我们先来看一下平时常用的List<T>集合,在并行计算下的表现,新建一个控制台应用程序,添加一个PEnumerable类(当然你也直接写到main方法里面测试,建议分开写),写如下方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace ThreadPool { public class PEnumerable { public static void ListWithParallel() { List<int> list = new List<int>(); Parallel.For(0, 10000, item => { list.Add(item); }); Console.WriteLine("List's count is {0}",list.Count()); } } } |
点击F5运行,得到如下结果: 看到结果中显示的5851,但是我们循环的是10000次啊!怎么结果不对呢?这是因为List<T>是非线程安全集合,意思就是说所有的线程都可以修改他的值。 下面我们来看下并行集合 —— 线程安全集合,在System.Collections.Concurrent命名空间中,首先来看一下ConcurrentBag<T>泛型集合,其用法和List<T>类似,先来写个方法测试一下:
1 2 3 4 5 6 7 8 9 |
public static void ConcurrentBagWithPallel() { ConcurrentBag<int> list = new ConcurrentBag<int>(); Parallel.For(0, 10000, item => { list.Add(item); }); Console.WriteLine("ConcurrentBag's count is {0}", list.Count()); } |
同时执行两个方法,结果如下: 可以看到,ConcurrentBag集合的结果是正确的。下面我们修改代码看看ConcurrentBag里面的数据到底是怎么存放的,修改代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public static void ConcurrentBagWithPallel() { ConcurrentBag<int> list = new ConcurrentBag<int>(); Parallel.For(0, 10000, item => { list.Add(item); }); Console.WriteLine("ConcurrentBag's count is {0}", list.Count()); int n = 0; foreach(int i in list) { if (n > 10) break; n++; Console.WriteLine("Item[{0}] = {1}",n,i); } Console.WriteLine("ConcurrentBag's max item is {0}", list.Max()); } |
先来看一下运行结果: 可以看到,ConcurrentBag中的数据并不是按照顺序排列的,顺序是乱的,随机的。我们平时使用的Max、First、Last等linq方法都还有。其时分类似Enumerable的用法,大家可以参考微软的MSDN了解它的具体用法。 关于线程安全的集合还有很多,和我们平时用的集合都差不多,比如类似Dictionary的ConcurrentDictionary,还有ConcurrentStack,ConcurrentQueue等。 二、Parallel Linq的用法及性能 1、AsParallel 前面了解了并行的For和foreach,今天就来看一下Linq的并行版本是怎么样吧?为了测试,我们添加一个Custom类,代码如下:
1 2 3 4 5 6 |
public class Custom { public string Name { get; set; } public int Age { get; set; } public string Address { get; set; } } |
写如下测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public static void TestPLinq() { Stopwatch sw = new Stopwatch(); List<Custom> customs = new List<Custom>(); for (int i = 0; i < 2000000; i++) { customs.Add(new Custom() { Name = "Jack", Age = 21, Address = "NewYork" }); customs.Add(new Custom() { Name = "Jime", Age = 26, Address = "China" }); customs.Add(new Custom() { Name = "Tina", Age = 29, Address = "ShangHai" }); customs.Add(new Custom() { Name = "Luo", Age = 30, Address = "Beijing" }); customs.Add(new Custom() { Name = "Wang", Age = 60, Address = "Guangdong" }); customs.Add(new Custom() { Name = "Feng", Age = 25, Address = "YunNan" }); } sw.Start(); var result = customs.Where<Custom>(c => c.Age > 26).ToList(); sw.Stop(); Console.WriteLine("Linq time is {0}.",sw.ElapsedMilliseconds); sw.Restart(); sw.Start(); var result2 = customs.AsParallel().Where<Custom>(c => c.Age > 26).ToList(); sw.Stop(); Console.WriteLine("Parallel Linq time is {0}.", sw.ElapsedMilliseconds); } |
其实也就是加了一个AsParallel()方法,下面来看下运行结果: 时间相差了一倍,不过有时候不会相差这么多,要看系统当前的资源利用率。大家可以多测试一下。 其实,AsParallel()这个方法可以应用与任何集合,包括List<T>集合,从而提高查询速度和系统性能。 2、GroupBy方法 在项目中,我们经常要对数据做处理,比如分组统计,我们知道在linq中也可以实现,今天来学习一下新的ToLookup方法,写一个测试方法,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
public static void OrderByTest() { Stopwatch stopWatch = new Stopwatch(); List<Custom> customs = new List<Custom>(); for (int i = 0; i < 2000000; i++) { customs.Add(new Custom() { Name = "Jack", Age = 21, Address = "NewYork" }); customs.Add(new Custom() { Name = "Jime", Age = 26, Address = "China" }); customs.Add(new Custom() { Name = "Tina", Age = 29, Address = "ShangHai" }); customs.Add(new Custom() { Name = "Luo", Age = 30, Address = "Beijing" }); customs.Add(new Custom() { Name = "Wang", Age = 60, Address = "Guangdong" }); customs.Add(new Custom() { Name = "Feng", Age = 25, Address = "YunNan" }); } stopWatch.Restart(); var groupByAge = customs.GroupBy(item => item.Age).ToList(); foreach (var item in groupByAge) { Console.WriteLine("Age={0},count = {1}", item.Key, item.Count()); } stopWatch.Stop(); Console.WriteLine("Linq group by time is: " + stopWatch.ElapsedMilliseconds); stopWatch.Restart(); var lookupList = customs.ToLookup(i => i.Age); foreach (var item in lookupList) { Console.WriteLine("LookUP:Age={0},count = {1}", item.Key, item.Count()); } stopWatch.Stop(); Console.WriteLine("LookUp group by time is: " + stopWatch.ElapsedMilliseconds); } |
运行结果如下: ToLookup方法是将集合转换成一个只读集合,所以在大数据量分组时性能优于List.大家可以查阅相关资料,这里由于篇幅问题,不再细说。 from:http://www.cnblogs.com/yunfeifei/p/3998783.html
View Details随着多核时代的到来,并行开发越来越展示出它的强大威力!使用并行程序,充分的利用系统资源,提高程序的性能。在.net 4.0中,微软给我们提供了一个新的命名空间:System.Threading.Tasks。这里面有很多关于并行开发的东西,今天第一篇就介绍下最基础,最简单的——认识和使用Parallel。 一、 Parallel的使用 在Parallel下面有三个常用的方法invoke,For和ForEach。 1、 Parallel.Invoke 这是最简单,最简洁的将串行的代码并行化。 在这里先讲一个知识点,就是StopWatch的使用,最近有一些人说找不到StopWatch,StopWatch到底是什么东西,今天就来说明一下。 StopWatch在System.Diagnostics命名控件,要使用它就要先引用这个命名空间。 其使用方法如下: var stopWatch = new StopWatch(); //创建一个Stopwatch实例 stopWatch.Start(); //开始计时 stopWatch.Stop(); //停止计时 stopWatch.Reset(); //重置StopWatch stopWatch.Restart(); //重新启动被停止的StopWatch stopWatch.ElapsedMilliseconds //获取stopWatch从开始到现在的时间差,单位是毫秒 本次用到的就这么多知识点,想了解更多关于StopWatch的,去百度一下吧,网上有很多资料。 下面进入整体,开始介绍Parallel.Invoke方法,废话不多说了,首先新建一个控制台程序,添加一个类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public class ParallelDemo { private Stopwatch stopWatch = new Stopwatch(); public void Run1() { Thread.Sleep(2000); Console.WriteLine("Task 1 is cost 2 sec"); } public void Run2() { Thread.Sleep(3000); Console.WriteLine("Task 2 is cost 3 sec"); } public void ParallelInvokeMethod() { stopWatch.Start(); Parallel.Invoke(Run1, Run2); stopWatch.Stop(); Console.WriteLine("Parallel run " + stopWatch.ElapsedMilliseconds + " ms."); stopWatch.Restart(); Run1(); Run2(); stopWatch.Stop(); Console.WriteLine("Normal run " + stopWatch.ElapsedMilliseconds + " ms."); } } |
代码很简单,首先新加一个类,在类中写了两个方法,Run1和Run2,分别等待一定时间,输出一条信息,然后写了一个测试方法ParallelInvokeMethod,分别用两种方法调用Run1和Run2,然后在main方法中调用,下面来看一下运行时间如何: 大家应该能够猜到,正常调用的话应该是5秒多,而Parallel.Invoke方法调用用了只有3秒,也就是耗时最长的那个方法,可以看出方法是并行执行的,执行效率提高了很多。 2、Parallel.For 这个方法和For循环的功能相似,下面就在类中添加一个方法来测试一下吧。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
public void ParallelForMethod() { stopWatch.Start(); for (int i = 0; i < 10000; i++) { for (int j = 0; j < 60000; j++) { int sum = 0; sum += i; } } stopWatch.Stop(); Console.WriteLine("NormalFor run " + stopWatch.ElapsedMilliseconds + " ms."); stopWatch.Reset(); stopWatch.Start(); Parallel.For(0, 10000, item => { for (int j = 0; j < 60000; j++) { int sum = 0; sum += item; } }); stopWatch.Stop(); Console.WriteLine("ParallelFor run " + stopWatch.ElapsedMilliseconds + " ms."); } |
写了两个循环,做了一些没有意义的事情,目的主要是为了消耗CPU时间,同理在main方法中调用,运行结果如下图: 可以看到,Parallel.For所用的时间比单纯的for快了1秒多,可见提升的性能是非常可观的。那么,是不是Parallel.For在任何时候都比for要快呢?答案当然是“不是”,要不然微软还留着for干嘛? 下面修改一下代码,添加一个全局变量num,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
public void ParallelForMethod() { var obj = new Object(); long num = 0; ConcurrentBag<long> bag = new ConcurrentBag<long>(); stopWatch.Start(); for (int i = 0; i < 10000; i++) { for (int j = 0; j < 60000; j++) { //int sum = 0; //sum += item; num++; } } stopWatch.Stop(); Console.WriteLine("NormalFor run " + stopWatch.ElapsedMilliseconds + " ms."); stopWatch.Reset(); stopWatch.Start(); Parallel.For(0, 10000, item => { for (int j = 0; j < 60000; j++) { //int sum = 0; //sum += item; lock (obj) { num++; } } }); stopWatch.Stop(); Console.WriteLine("ParallelFor run " + stopWatch.ElapsedMilliseconds + " ms."); } |
Parallel.For由于是并行运行的,所以会同时访问全局变量num,为了得到正确的结果,要使用lock,此时来看看运行结果: 是不是大吃一惊啊?Parallel.For竟然用了15秒多,而for跟之前的差不多。这主要是由于并行同时访问全局变量,会出现资源争夺,大多数时间消耗在了资源等待上面。 一直说并行,那么从哪里可以看出来Parallel.For是并行执行的呢?下面来写个测试代码:
1 2 3 4 |
Parallel.For(0, 100, i => { Console.Write(i + "\t"); }); |
从0输出到99,运行后会发现输出的顺序不对,用for顺序肯定是对的,并行同时执行,所以会出现输出顺序不同的情况。 2、Parallel.Foreach 这个方法跟Foreach方法很相似,想具体了解的,可以百度些资料看看,这里就不多说了,下面给出其使用方法:
1 2 3 4 5 6 |
List<int> list = new List<int>(); list.Add(0); Parallel.ForEach(list, item => { DoWork(item); }); |
二、 Parallel中途退出循环和异常处理 1、当我们使用到Parallel,必然是处理一些比较耗时的操作,当然也很耗CPU和内存,如果我们中途向停止,怎么办呢? 在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个ParallelLoopState, 该实例提供了Break和Stop方法来帮我们实现。 Break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。 Stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。 下面来写一段代码测试一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public void ParallelBreak() { ConcurrentBag<int> bag = new ConcurrentBag<int>(); stopWatch.Start(); Parallel.For(0, 1000, (i, state) => { if (bag.Count == 300) { state.Stop(); return; } bag.Add(i); }); stopWatch.Stop(); Console.WriteLine("Bag count is " + bag.Count + ", " + stopWatch.ElapsedMilliseconds); } |
这里使用的是Stop,当数量达到300个时,会立刻停止;可以看到结果"Bag count is 300",如果用break,可能结果是300多个或者300个,大家可以测试一下。 2、异常处理 首先任务是并行计算的,处理过程中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的Exception并不能获取到异常,然而为并行诞生的AggregateExcepation就可以获取到一组异常。 这里我们修改Parallel.Invoke的代码,修改后代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
public class ParallelDemo { private Stopwatch stopWatch = new Stopwatch(); public void Run1() { Thread.Sleep(2000); Console.WriteLine("Task 1 is cost 2 sec"); throw new Exception("Exception in task 1"); } public void Run2() { Thread.Sleep(3000); Console.WriteLine("Task 2 is cost 3 sec"); throw new Exception("Exception in task 2"); } public void ParallelInvokeMethod() { stopWatch.Start(); try { Parallel.Invoke(Run1, Run2); } catch (AggregateException aex) { foreach (var ex in aex.InnerExceptions) { Console.WriteLine(ex.Message); } } stopWatch.Stop(); Console.WriteLine("Parallel run " + stopWatch.ElapsedMilliseconds + " ms."); stopWatch.Reset(); stopWatch.Start(); try { Run1(); Run2(); } catch(Exception ex) { Console.WriteLine(ex.Message); } stopWatch.Stop(); Console.WriteLine("Normal run " + stopWatch.ElapsedMilliseconds + " ms."); } } |
顺序调用方法我把异常处理写一起了,这样只能捕获Run1的异常信息,大家可以分开写。捕获AggregateException 异常后,用foreach循环遍历输出异常信息,可以看到两个异常信息都显示了。 点击这里,下载源码 from:http://www.cnblogs.com/yunfeifei/p/3993401.html
View Details