本文內容 均參考自 《C#並行高級編程》
TPL 支持 數據並行(有大量數據要處理,必須對每個數據執行同樣的操作, 任務並行(有好多可以並發運行的操作),流水線(任務並行和數據並行的結合體)
在.net 4.0 引入新的 Task Parallel Library 處理 並行開發 。
Parallel類
關鍵詞 :
Parallel.For and Parallel.Foreach - 負載均衡的多任務
Parallel.Invoke - 並行運行多任務
ParallelOptions - 指定最大並行度 (實例化一個類並修改MaxDegreeOfParallelism 屬性的值 )
Environment.ProcessorCount - 內核最大數
命令式任務並行
關鍵詞 : Task類 , 一個task 類表示一個異步操作 (需要考慮到任務的開銷)
啟動任務使用Task類的Start 方法 , 等待線程完成使用WaitAll 方法 , 通過CancellationTokenSource 的Cancel方法來 中斷 Task的運行
怎樣表達任務間的父子關系 ? TaskCreationOption的AttachToParent來完成
怎樣來表達串行任務 ? Task.ContinueWith
並發集合
BlockingCollection
ConcurrentDictionary/ConcurrentQueue/ConcurrentStack
下面來一個例子是實操 C# 多任務並發。
場景 : 主進程 打開 一個 生產者線程 和 一個消費線程 。 他們之間可以相互對話, 如([動詞,名詞]) say,hello task,a . 生產者說一句話 消費者聽, 消費者或應答或提交新的任務或結束自己。
代碼
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
namespace TPLTest
{
class Program
{
public static readonly int MAX_DATA_LENGTH = 256;
private static BlockingCollection<Message> bcstr = new BlockingCollection<Message>(MAX_DATA_LENGTH) ;
public static readonly string SAY_THANKS = "thanks";
public static readonly string SAY_WELCOME = "welcome!";
public static readonly string BYE = "bye";
public static readonly string SAY = "say";
public static readonly string TASK = "task";
public static readonly string TIMEOUT = "timeout";
public static readonly string ONLINE = "ONLINE";
public static readonly string WHAT = "What?";
public static readonly int WAIT = 20000;
public static void Main(string[] args)
{
//消費者線程
ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = -1;
Parallel.Invoke(po,() => {
int selfID = Environment.CurrentManagedThreadId;
Message customer = new Message();
customer.CustomerThreadID = selfID ;
customer.content = ONLINE;
Console.WriteLine(customer.ToString(false));
while(true){
if (bcstr.TryTake(out customer, WAIT))
{
customer.CustomerThreadID = selfID ;
customer.doAction();
Console.WriteLine(" ");
Console.WriteLine(customer.ToString(false));
if (customer.endThread()){
break;
}
} else {
if (customer == null)
{
customer = new Message();
}
customer.CustomerThreadID = selfID ;
customer.content = TIMEOUT;
Console.WriteLine(customer.ToString(false));
}
}
},
() => {
int prdID = Environment.CurrentManagedThreadId;
Message productor = new Message();
productor.ProductorThreadID = prdID;
productor.content = ONLINE;
Console.WriteLine(productor.ToString(true));
while(true){
Console.Write("Productor Behavior (i.e. say,hello) : ");
string msgContent = Console.ReadLine();
productor = new Message();
productor.ProductorThreadID = prdID;
productor.key = msgContent.Split(',')[0];
productor.content = msgContent.Split(',')[1];
bcstr.Add(productor);
if (productor.endThread()) {
break;
}
}
});
}
}
class Message
{
public int ProductorThreadID {get; set;}
public int CustomerThreadID {get; set;}
public string key {get; set;}
public string content{get; set;}
public bool endThread()
{
return string.Compare(key, Program.BYE) == 0;
}
public string ToString(bool isProductor){
return string.Format("{0} Thread ID {1} : {2}", isProductor ? "Productor" : "Customer",
isProductor ? ProductorThreadID.ToString() : CustomerThreadID.ToString(),
content);
}
public void doAction(){
if (string.Compare(key, Program.SAY) == 0) {
content = string.Compare(content, Program.SAY_THANKS) == 0 ? Program.SAY_WELCOME : Program.WHAT;
}
if (string.Compare(key, Program.TASK) == 0) {
Task taskA = Task.Factory.StartNew(() => {
Console.WriteLine("task A begin ");
Task ChildOfFatehrA = Task.Factory.StartNew(() => {
Console.WriteLine("Sub task A begin ");
Thread.Sleep(1000);
Console.WriteLine("Sub task A end ");
});
ChildOfFatehrA.Wait();
Console.WriteLine("task A end ");
});
taskA.ContinueWith(taskB => {
Console.WriteLine("task B begin ");
Thread.Sleep(5000);
Console.WriteLine("task B end ");
});
taskA.Wait();
}
}
}
}
