1、Parallel.Invoke 主要用於任務的並行
這個函數的功能和Task有些相似,就是並發執行一系列任務,然后等待所有完成。和Task比起來,省略了Task.WaitAll這一步,自然也缺少了Task的相關管理功能。它有兩種形式:
Parallel.Invoke( params Action[] actions);
Parallel.Invoke(Action[] actions,TaskManager manager,TaskCreationOptions options);
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
var actions = new Action[]
{
() => ActionTest("test 1"),
() => ActionTest("test 2"),
() => ActionTest("test 3"),
() => ActionTest("test 4")
};
Console.WriteLine("Parallel.Invoke 1 Test");
Parallel.Invoke(actions);
Console.WriteLine("結束!");
}
static void ActionTest(object value)
{
Console.WriteLine(">>> thread:{0}, value:{1}",
Thread.CurrentThread.ManagedThreadId, value);
}
}
}
2、For方法,主要用於處理針對數組元素的並行操作(數據的並行)
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
Parallel.For(0, nums.Length, (i) =>
{
Console.WriteLine("針對數組索引{0}對應的那個元素{1}的一些工作代碼……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId);
});
Console.ReadKey();
}
}
}
3、Foreach方法,主要用於處理泛型集合元素的並行操作(數據的並行)
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
Parallel.ForEach(nums, (item) =>
{
Console.WriteLine("針對集合元素{0}的一些工作代碼……ThreadId={1}", item, Thread.CurrentThread.ManagedThreadId);
});
Console.ReadKey();
}
}
}
數據的並行的方式二(AsParallel()):
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
var evenNumbers = nums.AsParallel().Select(item => Calculate(item));
//注意這里是個延遲加載,也就是不用集合的時候 這個Calculate里面的算法 是不會去運行 可以屏蔽下面的代碼看效果;
Console.WriteLine(evenNumbers.Count());
//foreach (int item in evenNumbers)
// Console.WriteLine(item);
Console.ReadKey();
}
static int Calculate(int number)
{
Console.WriteLine("針對集合元素{0}的一些工作代碼……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
return number * 2;
}
}
}
.AsOrdered() 對結果進行排序:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp
{
class Program
{
static void Main(string[] args)
{
List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
var evenNumbers = nums.AsParallel().AsOrdered().Select(item => Calculate(item));
//注意這里是個延遲加載,也就是不用集合的時候 這個Calculate里面的算法 是不會去運行 可以屏蔽下面的代碼看效果;
//Console.WriteLine(evenNumbers.Count());
foreach (int item in evenNumbers)
Console.WriteLine(item);
Console.ReadKey();
}
static int Calculate(int number)
{
Console.WriteLine("針對集合元素{0}的一些工作代碼……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
return number * 2;
}
}
}
ForEach的獨到之處就是可以將數據進行分區,每一個小區內實現串行計算,分區采用Partitioner.Create實現。
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
for (int j = 1; j < 4; j++)
{
ConcurrentBag<int> bag = new ConcurrentBag<int>();
var watch = Stopwatch.StartNew();
watch.Start();
Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
{
for (int m = i.Item1; m < i.Item2; m++)
{
bag.Add(m);
}
});
Console.WriteLine("並行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);
GC.Collect();
}
}
}
}
ParallelOptions類
ParallelOptions options = new ParallelOptions();
//指定使用的硬件線程數為4
options.MaxDegreeOfParallelism = 4;
有時候我們的線程可能會跑遍所有的內核,為了提高其他應用程序的穩定性,就要限制參與的內核,正好ParallelOptions提供了MaxDegreeOfParallelism屬性。
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class Student
{
public int ID { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public DateTime CreateTime { get; set; }
}
class Program
{
static void Main(string[] args)
{
var dic = LoadData();
Stopwatch watch = new Stopwatch();
watch.Start();
var query2 = (from n in dic.Values.AsParallel()
where n.Age > 20 && n.Age < 25
select n).ToList();
watch.Stop();
Console.WriteLine("並行計算耗費時間:{0}", watch.ElapsedMilliseconds);
Console.Read();
}
public static ConcurrentDictionary<int, Student> LoadData()
{
ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
ParallelOptions options = new ParallelOptions();
//指定使用的硬件線程數為4
options.MaxDegreeOfParallelism = 4;
//預加載1500w條記錄
Parallel.For(0, 15000000, options, (i) =>
{
var single = new Student()
{
ID = i,
Name = "hxc" + i,
Age = i % 151,
CreateTime = DateTime.Now.AddSeconds(i)
};
dic.TryAdd(i, single);
});
return dic;
}
}
}
常見問題的處理
<1> 如何中途退出並行循環?
是的,在串行代碼中我們break一下就搞定了,但是並行就不是這么簡單了,不過沒關系,在並行循環的委托參數中提供了一個ParallelLoopState,該實例提供了Break和Stop方法來幫我們實現。
Break: 當然這個是通知並行計算盡快的退出循環,比如並行計算正在迭代100,那么break后程序還會迭代所有小於100的。
Stop:這個就不一樣了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
ConcurrentBag<int> bag = new ConcurrentBag<int>();
Parallel.For(0, 20000000, (i, state) =>
{
if (bag.Count == 1000)
{
//state.Break();
state.Stop();
return;
}
bag.Add(i);
});
Console.WriteLine("當前集合有{0}個元素。", bag.Count);
}
}
}
取消(cancel)
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
public static void Main()
{
var cts = new CancellationTokenSource();
var ct = cts.Token;
Task.Factory.StartNew(() => fun(ct));
Console.ReadKey();
//Thread.Sleep(3000);
cts.Cancel();
Console.WriteLine("任務取消了!");
}
static void fun(CancellationToken token)
{
Parallel.For(0, 100000,
new ParallelOptions { CancellationToken = token },
(i) =>
{
Console.WriteLine("針對數組索引{0}的一些工作代碼……ThreadId={1}", i, Thread.CurrentThread.ManagedThreadId);
});
}
}
}
<2> 並行計算中拋出異常怎么處理?
首先任務是並行計算的,處理過程中可能會產生n多的異常,那么如何來獲取到這些異常呢?普通的Exception並不能獲取到異常,然而為並行誕生的AggregateExcepation就可以獲取到一組異常。
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
try
{
Parallel.Invoke(Run1, Run2);
}
catch (AggregateException ex)
{
foreach (var single in ex.InnerExceptions)
{
Console.WriteLine(single.Message);
}
}
Console.WriteLine("結束了!");
//Console.Read();
}
static void Run1()
{
Thread.Sleep(3000);
throw new Exception("我是任務1拋出的異常");
}
static void Run2()
{
Thread.Sleep(5000);
throw new Exception("我是任務2拋出的異常");
}
}
}
注意Parallel里面 不建議拋出異常 因為在極端的情況下比如進去的第一批線程先都拋異常了 此時AggregateExcepation就只能捕獲到這一批的錯誤,然后程序就結束了
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class TestClass
{
public static List<int> NumberList = null;
private static readonly object locker = new object();
public void Test(int Number)
{
throw new Exception("1111");
//lock (locker)
//{
// if (NumberList == null)
// {
// Console.WriteLine("執行添加");
// NumberList = new List<int>();
// NumberList.Add(1);
// //Thread.Sleep(1000);
// }
//}
//if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
//Console.WriteLine(Number);
}
}
class Program
{
private static readonly object locker = new object();
static void Main(string[] args)
{
List<string> errList = new List<string>();
try
{
Parallel.For(0, 10, (i) =>
{
try
{
TestClass a = new TestClass();
a.Test(i);
}
catch (Exception ex)
{
lock (locker)
{
errList.Add(ex.Message);
throw ex;
}
}
});
}
catch (AggregateException ex)
{
foreach (var single in ex.InnerExceptions)
{
Console.WriteLine(single.Message);
}
}
int Index = 1;
foreach (string err in errList)
{
Console.WriteLine("{0}、的錯誤:{1}", Index++, err);
}
}
}
}
可以向下面這樣來處理一下
不在AggregateExcepation中來處理 而是在Parallel里面的try catch來記錄錯誤,或處理錯誤
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class TestClass
{
public static List<int> NumberList = null;
private static readonly object locker = new object();
public void Test(int Number)
{
throw new Exception("1111");
//lock (locker)
//{
// if (NumberList == null)
// {
// Console.WriteLine("執行添加");
// NumberList = new List<int>();
// NumberList.Add(1);
// //Thread.Sleep(1000);
// }
//}
//if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
//Console.WriteLine(Number);
}
}
class Program
{
private static readonly object locker = new object();
static void Main(string[] args)
{
List<string> errList = new List<string>();
Parallel.For(0, 10, (i) =>
{
try
{
TestClass a = new TestClass();
a.Test(i);
}
catch (Exception ex)
{
lock (locker)
{
errList.Add(ex.Message);
}
//Console.WriteLine(ex.Message);
//注:這里不再將錯誤拋出.....
//throw ex;
}
});
int Index = 1;
foreach (string err in errList)
{
Console.WriteLine("{0}、的錯誤:{1}", Index++, err);
}
}
}
}
轉載 https://www.cnblogs.com/scmail81/archive/2018/08/22/9521096.html

