線程的創建
Thread
1 var thread = new Thread(() => 2 { 3 Console.WriteLine("thread start:" + Thread.CurrentThread.ManagedThreadId); //ManagedThreadId為線程的id 4 Thread.Sleep(10000); 5 Console.WriteLine("thread end:" + Thread.CurrentThread.ManagedThreadId); 6 }); 7 //設置是否為后台線程: 8 // 前台線程:所有前台線程執行結束后,該進程才會關閉退出(主線程和通過Thread類創建的線程默認是前台線程) 9 // 后台線程:所有前台結束后,后台線程就會立即結束(不管是否執行完成都會結束) 10 thread.IsBackground = true; 11 thread.Start();//開啟線程,不傳遞參數 12 13 //傳遞參數的 14 var thread1 = new Thread(param => 15 { 16 Thread.Sleep(3000); 17 Console.WriteLine(param); 18 }); 19 thread1.Start("val"); 20 thread1.Join(); //等待線程執行完成(使當前調用Join的線程阻塞) 21 //暫停和恢復線程都標志為已過時了,不建議使用 22 //thread1.Suspend(); 23 //thread1.Resume(); 24 //設置線程的優先級,注意:在NT內核的Windows平台上建議不使用優先級來影響線程優先調度的行為,因為根本沒法預期一個高優先級的線程必然會先於一個低優先級的線程執行,所以也就失去了控制線程調度的價值 25 //thread1.Priority = ThreadPriority.Highest; 26 //thread1.Abort(); //暴力的終止線程,一般不建議使用
Sleep/ SpinWait
Sleep與SpinWait的區別:
使用Thread.Sleep()會導致等待過於進行切換,等待時間不准確,而且會由用戶模式切換到內核模式;使用SpinWait(一個輕量同步類型(結構體))來進行等待的處理,等待過程中會使用自旋等待,從而避免線程頻繁的用戶模式和內核模式切換,一般用於短時的等待操作:
1 //參數一為Func<bool>,就是自旋時的循環體,直到返回true或者過時為止 2 SpinWait.SpinUntil(() => 3 { 4 Console.WriteLine("Spin Waiting"); 5 return false; 6 }, 1000); 7 SpinWait.SpinUntil(() => false, 1000); //返回false會進入等待狀態,類似於Thread.Sleep()等待,但是會盤旋CPU周期,在短期內等待事件准確度都高於Sleep 8 SpinWait.SpinUntil(() => true, 1000); //返回true會自動跳出等待狀態,不再休眠,繼續執行下面的代碼
使用SpinWait做一些多線程的流程控制
1 int i = 0; 2 Task.Run(() => 3 { 4 Thread.Sleep(1000); //模擬一些操作 5 Interlocked.Increment(ref i); 6 }); 7 Task.Run(() => 8 { 9 Thread.Sleep(1000); //模擬一些操作 10 SpinWait.SpinUntil(() => i == 1); //等待1完成 11 Thread.Sleep(1000); //模擬一些操作 12 Interlocked.Increment(ref i); 13 }); 14 SpinWait.SpinUntil(() => i == 2); //等待所有流程完成 15 Console.WriteLine("Completed!");
ThreadPool
通過線程池創建線程,池中的線程都是后台線程
使用線程更應該使用線程池來創建:比如一個服務器需要處理成千上萬個客戶端鏈接,並處理不同的請求時,這種情況下如果簡單通過Thread來創建線程處理,那么就是需要創建成千上萬個線程了,那么多線程會頻繁的調度切換,資源浪費嚴重、性能十分低下,因此需要線程池來維護多線程(會動態調整線程數量)
1 ThreadPool.QueueUserWorkItem(param => 2 { 3 Console.WriteLine(param); //val,param為傳遞過來的參數 4 }, "val");
Task
通過Task來創建線程(線程也是由線程池維護,也是后台線程),比ThreadPool更加靈活方便
1 var tasks = new List<Task>(); 2 tasks.Add(Task.Factory.StartNew(param => 3 { 4 Thread.Sleep(5000); 5 Console.WriteLine(param); 6 }, "val")); 7 tasks.Add(Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId))); 8 Task.WaitAny(tasks.ToArray()); //等待(阻塞)只要有一個Task執行完畢就不再等待了 9 Task.WaitAll(tasks.ToArray()); //等待(阻塞)所有Task執行結束 10 11 //帶返回值的 12 var task = Task.Run<string>(() => 13 { 14 Thread.Sleep(3000); 15 return "rtn Val"; 16 }); 17 //task.Wait(); //等待執行結束 18 Console.WriteLine(task.Result); //獲取返回的結果,調用Result就會等待Task執行結束返回結果,因此也會造成阻塞
ConfigureAwait
1 Task.Run(() => 2 { 3 Thread.Sleep(1000); 4 Console.WriteLine("Async"); 5 6 //ConfigureAwait為false發生異常的時候不會回取捕捉原始Context(上下文), 7 //這樣子就是在線程池中運行,而不是在ASP.NET/UI的Context的上下文線程中運 8 //行了,這樣子性能上提高了 9 }).ConfigureAwait(false);
1 // Thread.Sleep是同步延遲, Task.Delay異步延遲; 2 // Thread.Sleep不能取消,Task.Delay可以。 3 Task.Run(async () => 4 { 5 //將任務延遲1000毫秒后運行,如果無限等待那么指定為-1 6 await Task.Delay(1000); 7 Console.WriteLine("Task Start"); 8 //CancellationToken設置為true就是標志Task任務取消,為false和 await Task.Delay(1000)一樣將任務延遲1000毫秒后運行 9 await Task.Delay(1000, new CancellationToken(true)); 10 Console.WriteLine("這里不會被執行,因為任務取消了~"); 11 });
Task與async/await
1 public class TaskTest 2 { 3 public Task DoAsync(string param) 4 { 5 return Task.Run(() => 6 { 7 //調用Result會阻塞直到獲取到返回值 8 NextDo(LongTimeDoAsync(param).Result); 9 }); 10 } 11 12 public async Task Do1Async(string param) 13 { 14 //對比上面的DoAsync方法,執行結果一樣,但是使用async/await配合Task使用,節省了代碼量,而且也方便外部的調用和等待處理等等 15 NextDo(await LongTimeDoAsync(param)); 16 } 17 18 async Task<object> LongTimeDoAsync(string param) 19 { 20 return await Task.Run<object>(() => 21 { 22 //執行一些耗時的操作 23 Thread.Sleep(10000); 24 return param + " ok"; 25 }); 26 } 27 28 void NextDo(object result) 29 { 30 Console.WriteLine(result); 31 } 32 }
調用:
1 var test = new TaskTest(); 2 test.DoAsync("DoAsync"); 3 test.Do1Async("Do1Async");
並發集合
在System.Collections.Concurrent下的集合類,都是些多線程安全集合,而ConcurrentXXX為並發集合,有不少方法帶有Try前綴,這些方法在多線程下執行過程中可能會失敗返回false,因此不要相信這些操作會一定完成任務,需要判斷返回的結果;還有BlockingCollection<T>是阻塞集合,就是添加/獲取元素的時候會阻塞線程直到操作完成。
ConcurrentDictionary
1 ConcurrentDictionary<string, string> dict = new ConcurrentDictionary<string, string>(); 2 dict.TryAdd("key1", "val1"); 3 string val; 4 dict.TryGetValue("key1", out val); 5 dict.TryUpdate("key1", "val2", val);//最后參數為比較的值,值不同才會更新 6 dict.TryRemove("key1", out val); 7 Console.WriteLine(val); //val2 8 9 val = dict.GetOrAdd("key1", "val3"); 10 val = dict.GetOrAdd("key1", "val4"); 11 Console.WriteLine(val); //val3 12 13 dict["key1"] = null; 14 //對於AddOrUpdate方法,如果指定的key已經存在,那么調用第三個參數進行UpdateValue 15 //如果不存在,那么調用第二個參數進行AddValue 16 val = dict.AddOrUpdate("key1", "val5", (key, oldVal) => 17 { 18 Console.WriteLine(oldVal); //null 19 return "val6"; 20 }); 21 Console.WriteLine(val); //val6 22 23 val = dict.AddOrUpdate("key2", key => 24 { 25 return "val7"; 26 }, (key, oldVal) => 27 { 28 Console.WriteLine(oldVal); 29 return "val8"; 30 }); 31 Console.WriteLine(val); //val7
ConcurrentQueue
1 ConcurrentQueue<string> q = new ConcurrentQueue<string>(); 2 q.Enqueue("val1"); 3 q.Enqueue("val2"); 4 string val; 5 q.TryPeek(out val); 6 Console.WriteLine(val); //val1 7 q.TryDequeue(out val); 8 Console.WriteLine(val); //val1
ConcurrentStack
1 ConcurrentStack<string> s = new ConcurrentStack<string>(); 2 s.Push("val1"); 3 s.Push("val2"); 4 string val; 5 s.TryPeek(out val); 6 Console.WriteLine(val); //val2 7 s.TryPop(out val); 8 Console.WriteLine(val); //val2
ConcurrentBag
1 //ConcurrentBag:無序的並發集合(相同元素可重復添加) 2 ConcurrentBag<object> bag = new ConcurrentBag<object>(); 3 var obj = new object(); 4 bag.Add(obj); 5 bag.Add(obj); 6 Console.WriteLine(bag.Count); //2 7 while (!bag.IsEmpty) //判斷集合是否為空 8 { 9 bag.TryTake(out obj); //獲取 10 }
並行計算
Parallel
For
1 //並行計算,調用的線程會等待直到並行執行完畢 2 Parallel.For(2, 10, i => 3 { 4 //i的值為[2, 10)(不包括10),就是執行次數為8次 5 Console.WriteLine(i); 6 });
1 //MaxDegreeOfParallelism為指定並行計算的最大線程數 2 Parallel.For(1, 10, new ParallelOptions { MaxDegreeOfParallelism = 3 }, i => 3 { 4 Console.WriteLine(Thread.CurrentThread.ManagedThreadId); 5 });
1 int result = 0; 2 Parallel.For(0, 100, new ParallelOptions { MaxDegreeOfParallelism = 4 }, 3 //初始化localState 4 () => 0, 5 //並行循環體(i為[0, 100),也就是會執行100次) 6 (i, loop, localState) => 7 { 8 //localState從0開始,不斷累加i的值 9 return localState + i; //循環體中返回的結果會在下面的回調中進行值的合並(結果的合並必須在下面進行) 10 }, 11 //合並計算的結果 12 localState => Interlocked.Add(ref result, localState) 13 ); 14 Console.WriteLine("真實結果: {0}. 預期結果:4950.", result);
ForEach
1 int aCount = 0; 2 //並行計算,會等待(阻塞)直到執行完成 3 Parallel.ForEach("aaaabbbbbcccc", 4 //設置並行計算的最大線程數 5 new ParallelOptions { MaxDegreeOfParallelism = 4 }, 6 c => 7 { 8 //計算'a'的個數 9 if (c == 'a') 10 { 11 Interlocked.Increment(ref aCount); 12 } 13 }); 14 Console.WriteLine(aCount); //4
1 //Partitioner為設置策略分區:例如值范圍為[0, 100],每個區域的大小為4 2 Parallel.ForEach(Partitioner.Create(0, 10, 4), 3 val => 4 { 5 Console.WriteLine(val); //val是一個Tuple<int, int>,分成的區間值有:(0, 4),(4, 8),(8, 10) 6 }); 7 8 int result = 0; 9 Parallel.ForEach(Partitioner.Create(1, 101, 10), 10 val => 11 { 12 for (int i = val.Item1; i < val.Item2; i++) 13 { 14 Interlocked.Add(ref result, i); 15 } 16 }); 17 Console.WriteLine(result); //輸出:5050
1 int[] vals = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; 2 int sum = 0; 3 Parallel.ForEach( 4 vals, 5 //localSum的初始值 6 () => 0, 7 //並行執行的循環體 8 (val, loopState, localSum) => 9 { 10 //val為集合vals中的值 11 //這里的操作是並行計算集合中值的總和 12 localSum += val; 13 return localSum; //循環體中返回的結果會在下面的回調中進行值的合並(結果的合並必須在下面進行) 14 }, 15 //合並計算的結果 16 (localSum) => Interlocked.Add(ref sum, localSum) 17 ); 18 Console.WriteLine(sum); //55
Invoke
1 int i = 0; 2 Action action = () => Interlocked.Increment(ref i); 3 Action action1 = () => Interlocked.Add(ref i, 2); 4 Action action2 = () => Interlocked.Add(ref i, 3); 5 //並行調用Action,調用的線程會等待直到並行執行完畢 6 Parallel.Invoke(action, action1, action2); 7 //Parallel.Invoke(new ParallelOptions { MaxDegreeOfParallelism = 3 }, action, action1, action2); 8 Console.WriteLine(i); //輸出:6
PLINQ
1 var list = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; 2 //PLINQ,進行並行計算,但是PLINQ不能限定並行計算時的最大線程數 3 list.AsParallel().ForAll(l => 4 { 5 Console.WriteLine(Thread.CurrentThread.ManagedThreadId); 6 }); 7 8 Console.WriteLine(list.AsParallel().Where(l => l > 5).Sum()); //40 9 Console.WriteLine(list.AsParallel().Aggregate((sum, val) => 10 { 11 return val + sum + 1; 12 })); //64 13 14 var list1 = new int[] { 1, 1, 1, 2, 2, 2, 3 }; 15 Console.WriteLine(list1.AsParallel().GroupBy(l => l).Count()); //3
線程同步
lock(Monitor) / SpinLock
lock
lock使用起來很簡單,為Monitor封裝的語法糖:
1 lock (obj) 2 { 3 //同步操作. 4 }
鎖的對象不要為類型Type,因為性能上會損失大:lock(typeof(Class))
Monitor
1 Monitor.Enter(obj); 2 try 3 { 4 //同步操作 5 } 6 finally 7 { 8 Monitor.Exit(obj); 9 }
使用Monitor的主要優點就是可設置等待的超時值:
1 bool lockTaken = false; 2 Monitor.TryEnter(obj, 1000, ref lockTaken); 3 if (lockTaken) 4 { 5 try 6 { 7 //同步操作 8 } 9 finally 10 { 11 Monitor.Exit(obj); 12 } 13 } 14 else 15 { 16 Console.WriteLine("超時了"); 17 }
SpinLock自旋鎖/細粒度鎖
自旋鎖:就是在等待的過程中會做自旋等待,避免線程頻繁的用戶模式和內核模式切換
msdn中的說明:
自旋鎖可用於葉級鎖定,此時在大小方面或由於垃圾回收壓力,使用Monitor(lock)所隱含的對象分配消耗過多。自旋鎖非常有助於避免阻塞,但是如果預期有大量阻塞,由於旋轉過多,您可能不應該使用自旋鎖。當鎖是細粒度的並且數量巨大(例如鏈接的列表中每個節點一個鎖)時以及鎖保持時間總是非常短時,旋轉可能非常有幫助。通常,在保持一個旋鎖時,應避免任何這些操作:
- 阻塞,
- 調用本身可能阻塞的任何內容,
- 一次保持多個自旋鎖,
- 進行動態調度的調用(接口和虛方法)
- 在某一方不擁有的任何代碼中進行動態調度的調用,或
- 分配內存。
簡單封裝:
1 public class SpinLockEx 2 { 3 SpinLock _slock = new SpinLock(); 4 public void Lock(Action action) 5 { 6 bool lockTaken = false; 7 try 8 { 9 _slock.Enter(ref lockTaken); 10 action(); 11 } 12 finally 13 { 14 if(lockTaken) _slock.Exit(); 15 } 16 } 17 }
使用:
1 int ival1 = 0, ival2 = 0; 2 List<Task> list = new List<Task>(); 3 var slock = new SpinLockEx(); 4 for (int i = 0; i < 10000; i++) 5 { 6 list.Add(Task.Run(() => 7 { 8 slock.Lock(() => 9 { 10 ival1++; //注意:這里只是模擬多線程操作共享資源,對於數值操作應該使用Interlocked 11 }); 12 })); 13 list.Add(Task.Run(() => 14 { 15 ival2++; 16 })); 17 } 18 Task.WaitAll(list.ToArray()); 19 Console.WriteLine(ival1); //值計算准確:10000 20 Console.WriteLine(ival2); //值計算可能會不准確,因為沒有做多線程安全
Mutex
Mutex互斥鎖(互斥對象)的使用作用和Monitor(lock)差不多,但是Mutex是內核對象,可以跨進程共享的,不過性能方面Monitor比較高,因為Mutex控制需要從用戶模式到內核模式,而Monitor是用戶模式下控制的。
1 bool isNew; 2 //參數一:主調線程是否初始擁有互斥對象 3 //參數二:定義互斥對象的名稱(命名互斥對象跨進程共享) 4 //參數三:該命名的互斥對象是否為新創建的 5 var m = new Mutex(false, "Tom123", out isNew); 6 if (m.WaitOne()) //等待互斥對象擁有權(一個線程擁有了,另一個線程等待擁有權,直到擁有的線程調用ReleaseMutex釋放) 7 { 8 try 9 { 10 //同步操作 11 Thread.Sleep(3000); 12 Console.WriteLine("do something"); 13 } 14 finally 15 { 16 m.ReleaseMutex(); //釋放擁有權 17 } 18 } 19 else 20 { 21 //等待失敗,如果WaitOne的時候有指定超時值,否則會一直等待 22 } 23 24 bool isNew; 25 //因為命名的互斥對象是跨進程的,因此通過第三個參數判斷互斥對象是否已經存在, 26 //可做一些檢測程序是否已經運行的操作 27 m = new Mutex(false, "Tom123", out isNew); 28 if (!isNew) 29 { 30 Console.WriteLine("該程序已經運行!"); 31 } 32 m.Dispose();//記住需要釋放資源
Event
事件對象也是內核對象,事件對象分為 人工重置 和 自動重置:
AutoResetEvent(自動重置)
1 AutoResetEvent e = new AutoResetEvent(true); //參數為是否初始化為有信號狀態 2 if (e.WaitOne()) //等待事件對象,直到有信號狀態(如果沒有指定超時值) 3 { 4 //對象自動重置的事件對象來說:等待成功,那么就會自動設置為無信號狀態(因此並不需要調用e.Reset()),因此性質和互斥對象差不多 5 try 6 { 7 //同步操作 8 Thread.Sleep(3000); 9 Console.WriteLine("do something"); 10 } 11 finally 12 { 13 e.Set();//設置為有信號狀態 14 } 15 } 16 else 17 { 18 //等待失敗,如果WaitOne的時候指定超時值,否則會一直等待 19 } 20 e.Dispose(); //使用完成需要釋放對象資源,因為是內核對象
ManualResetEvent/Slim (人工重置)
1 //人工重置的事件對象需要手動設置為無信號狀態,因此人工重置的事件對象不適合做多線程同步鎖, 2 //可用於做一些程序啟動時的初始化操作,例如:加載某些大文件,加載完成后通知加載完成,而這些過程通過人工重置事件對象控制 3 //帶Slim的事件對象,性能上會更好(運用了細粒度鎖),就是參數二中可以指定等待時自旋的次數,目的防止等待的過程中頻繁的切換線程 4 var e = new ManualResetEventSlim(false, 100); //參數一為是否初始化為有信號狀態 5 Task.Run(() => 6 { 7 Console.WriteLine("加載大文件開始"); 8 Thread.Sleep(10000); 9 Console.WriteLine("加載完成"); 10 e.Set();//設置為有信號狀態 11 }); 12 Task.Run(() => 13 { 14 e.Wait(); //等待文件加載完成 15 Console.WriteLine("加載完成后,do something..."); 16 });
ReaderWriterLock/Slim
ReaderWriterLock就是用於允許多個讀取器(讀取的時候不能寫入),而只能有一個寫入器(寫入時鎖定)來管理,對於ReaderWriterLockSlim的性能上會更好(運用了細粒度鎖),可避免潛在的死鎖的很多情況。
1 public class TestRW : IDisposable 2 { 3 ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); 4 StringBuilder _sb = new StringBuilder(); 5 public void Append(string val) 6 { 7 try 8 { 9 _rwLock.EnterWriteLock(); //獲取讀取器 10 _sb.Append(val); 11 } 12 finally 13 { 14 _rwLock.ExitWriteLock();//釋放讀取器 15 } 16 } 17 18 public override string ToString() 19 { 20 try 21 { 22 _rwLock.EnterReadLock(); //獲取寫入器 23 return _sb.ToString(); 24 } 25 finally 26 { 27 _rwLock.ExitReadLock();//釋放寫入器 28 } 29 } 30 31 public void Dispose() 32 { 33 _rwLock.Dispose(); //需要釋放資源 34 } 35 }
使用
1 List<Task> tasks = new List<Task>(); 2 TestRW rw = new TestRW(); 3 StringBuilder sb = new StringBuilder(); 4 for (int i = 0; i < 1000; i++) 5 { 6 tasks.Add(Task.Run(() => 7 { 8 sb.Append("1"); 9 })); 10 tasks.Add(Task.Run(() => 11 { 12 rw.Append("1"); 13 })); 14 } 15 Task.WaitAll(tasks.ToArray()); 16 Console.WriteLine(sb.ToString().Length); //StringBuilder不是多線程安全的,因此結果會失真 17 Console.WriteLine(rw.ToString().Length); //使用了讀寫鎖,因此多線程安全 18 rw.Dispose(); //記住需要釋放資源
Semaphore/Slim
信號量是一種計數互斥鎖,計數為0時阻塞線程,可用於多線程的限流操作,SemaphoreSlim運用了細粒度鎖,性能更優:
1 var tasks = new List<Task>(); 2 int ival = 0; 3 //參數一:初始化信號的數量 4 //參數二:信號的最大數量 5 var smp = new SemaphoreSlim(10, 10); 6 for (int i = 0; i < 100; i++) 7 { 8 tasks.Add(Task.Run(() => 9 { 10 if (smp.Wait(100)) //等待成功,會減少一個信號量,直到信號量為0 11 { 12 Thread.Sleep(1000); //模擬一些操作 13 Console.WriteLine(Interlocked.Increment(ref ival)); 14 //smp.Release(); //釋放一個信號量 15 } 16 else 17 { 18 //等待失敗,如果Wait的時候有指定超時值,否則會一直等待 19 Console.WriteLine("超時了~"); 20 } 21 })); 22 } 23 Task.WaitAll(tasks.ToArray()); 24 Console.WriteLine(smp.CurrentCount); //當前的信號量數量 25 smp.Dispose(); //記住需要釋放資源
CountdownEvent
CountdownEvent(倒計器)的性質和Semaphore相反,等待(阻塞)直到信號為0為止(當倒計數為0時,觸發完成,停止等待),可以做fork-join(並行計算)的控制:
1 var datas = new int[]{ 1,2,3,4,5,6,7,8,9,10 }; 2 //倒計器:當倒計數為0時,觸發完成,停止等待 3 using (var c = new CountdownEvent(1)) //參數為初始化倒計數(如果指定為0,那么初始狀態為完成,需要調用Reset重置,不然使用會拋異常) 4 { 5 Console.WriteLine(c.CurrentCount); //當前的倒計數 6 // fork work: 7 for (int i = 0; i < datas.Length; i++) 8 { 9 c.AddCount(); //增加倒計數(可指定增加的數量) 10 ThreadPool.QueueUserWorkItem(idx => 11 { 12 Interlocked.Increment(ref datas[(int)idx]); 13 c.Signal(); //注冊一個(可指定數量)信號,並且CurrentCount倒計數減一(可指定數量) 14 }, i); 15 } 16 c.Signal(); 17 // Join with work. 18 c.Wait(); //等待(阻塞)直到計數為0為止,可以指定等待的時間 19 Console.WriteLine("Completed!"); 20 //c.Reset(1); //重置初始化的倒計數,如果等待完成之后想繼續使用倒計器,那么就需要調用這個方法重置了,不然會拋異常 21 } 22 Console.WriteLine(JsonConvert.SerializeObject(datas)); //[2,3,4,5,6,7,8,9,10,11]
Barrier
Barrier(屏障/關卡)適用於並行操作分階段執行,並且每一階段中各任務需要進行同步,使用Barrier可以在並行操作中的所有任務都達到相應的關卡之前,阻止各個任務繼續執行:
1 int i = 0; 2 //參數一:初始化參與者 3 //參數二:關卡完成觸發的事件 4 var b = new Barrier(2, (bar) => 5 { 6 Console.WriteLine(i); 7 }); 8 //增加參與者數量 9 b.AddParticipant(); 10 b.AddParticipants(2); 11 //移除參與者數量 12 b.RemoveParticipant(); 13 //參與者數量 14 Console.WriteLine(b.ParticipantCount); //4 15 16 Action action = () => 17 { 18 Interlocked.Increment(ref i); 19 //設置關卡1: 20 //用信號通知有參與者(線程)已達到關卡,而且到達的參與者(線程)會一直等待(可指定超時值), 21 //直到所有參與者都達到關卡為止(就是通知的信號達到ParticipantCount的時候),並且觸發Barrier構造函數指定的事件 22 b.SignalAndWait(); //等待完成之后會觸發關卡完成事件,輸出:4 23 24 Interlocked.Add(ref i, 2); 25 //設置關卡2: 26 b.SignalAndWait(); //輸出:12 27 28 Interlocked.Add(ref i, 4); 29 //設置關卡3: 30 b.SignalAndWait(); //輸出:28 31 }; 32 33 //參與者(線程)的數量不能大於ParticipantCount,否則會拋異常 34 Task.Run(action); //設置一個參與者(線程) 35 Parallel.Invoke(action, action, action); //設置三個參與者(線程)
Timer
定時器,可以做一些定時的控制操作:
1 //參數一:定時觸發的事件 2 //參數二:傳遞到事件的參數 3 //參數三:為第一次觸發事件的定時時間 4 //參數四:為周期時間(第一次觸發事件之后,再次觸發事件的周期,指定了這個參數,那么就會按周期時間循環執行了,如果設為0就只會執行一次事件) 5 var timer = new Timer(val => 6 { 7 Console.WriteLine(val); 8 }, "val", 1000, 1000); //這里的設置為每秒執行一次事件 9 //timer.Dispose(); //記住需要釋放資源
Interlocked
使用原子操作保證值類型操作的原子性,以保證值類型線程間同步,性能方面比使用lock更優,因此保證值類型多線程安全更應該優先考慮使用Interlocked:
1 public class IntEx 2 { 3 int _val; 4 public int Val 5 { 6 get { return _val; } 7 set 8 { 9 Interlocked.CompareExchange(ref _val, value, _val); 10 } 11 } 12 13 public IntEx() { } 14 public IntEx(int ival) 15 { 16 _val = ival; 17 } 18 19 public int Add(int ival) 20 { 21 return Interlocked.Add(ref _val, ival); 22 } 23 24 public int Incre() 25 { 26 return Interlocked.Increment(ref _val); 27 } 28 29 public int Decre() 30 { 31 return Interlocked.Decrement(ref _val); 32 } 33 }
使用:
1 var tasks = new List<Task>(); 2 var itl = new IntEx(); 3 int ival = 0; 4 for (int i = 0; i < 1000; i++) 5 { 6 tasks.Add(Task.Run(() => 7 { 8 ival++; 9 })); 10 tasks.Add(Task.Run(() => 11 { 12 itl.Incre(); 13 //itl.Val++; //也不要這樣進行自增,就算使用了Interlocked.CompareExchange 14 })); 15 } 16 Task.WaitAll(tasks.ToArray()); 17 Console.WriteLine(ival); //沒有使用Interlocked以保證操作的原子性,因此數據會失真 18 Console.WriteLine(itl.Val); //數據正確
使用Interlocked進行多線程的控制:
1 public class OneDo 2 { 3 int _ival = 0; 4 public event Action Action; 5 6 public OneDo(Action action) 7 { 8 if (action == null) throw new ArgumentNullException("action"); 9 Action = action; 10 } 11 12 public void Release() 13 { 14 Interlocked.CompareExchange(ref _ival, 0, _ival); 15 } 16 17 public bool IsDoing 18 { 19 get { return _ival > 0; } 20 } 21 22 bool CanDo() 23 { 24 if (_ival <= 0) 25 { 26 if (Interlocked.Increment(ref _ival) <= 1) 27 { 28 return true; 29 } 30 } 31 return false; 32 } 33 34 public bool Do() 35 { 36 if (CanDo()) 37 { 38 Action(); 39 return true; 40 } 41 return false; 42 } 43 44 public bool DoAndRelease() 45 { 46 if (CanDo()) 47 { 48 Action(); 49 Release(); 50 return true; 51 } 52 return false; 53 } 54 55 public bool DoAsync() 56 { 57 if (CanDo()) 58 { 59 Task.Run(() => 60 { 61 Action(); 62 }); 63 return true; 64 } 65 return false; 66 } 67 68 public bool DoAndReleaseAsync() 69 { 70 if (CanDo()) 71 { 72 Task.Run(() => 73 { 74 Action(); 75 Release(); 76 }); 77 return true; 78 } 79 return false; 80 } 81 }
使用:
1 var onedo = new OneDo(() => Console.WriteLine("One do!")); 2 for (int i = 0; i < 100; i++) 3 { 4 Task.Run(() => 5 { 6 onedo.Do(); //只會執行一次,除非調用Release,因此可用於做一些初始化操作 7 }); 8 } 9 10 11 int val = 0; 12 var onedo1 = new OneDo(() => Console.WriteLine("One do! " + Interlocked.Increment(ref val))); 13 for (int i = 0; i < 100; i++) 14 { 15 Task.Run(() => 16 { 17 SpinWait.SpinUntil(() => onedo1.DoAndRelease()); //每次只被一個線程執行,使用SpinWait等待執行成功為止 18 }); 19 }
TPL Dataflow
TPL Dataflow是非常強大的多線程高並發數據流控制類庫,
.NET中使用需要在NuGet中搜索Microsoft.Tpl.Dataflow加載,.net core可直接使用。
1 //數據流Buffer 2 var buf = new BufferBlock<string>(); 3 //數據流處理器(處理器是異步處理數據的) 4 var action = new ActionBlock<string>(val => 5 { 6 //以FIFO形式處理數據 7 Console.WriteLine(val + ",tid=" + Thread.CurrentThread.ManagedThreadId); 8 }, new ExecutionDataflowBlockOptions() 9 { 10 //設置並行處理的線程數,默認為1 11 MaxDegreeOfParallelism = 1, 12 }); 13 //連接處理器 14 buf.LinkTo(action); 15 //添加數據到流中,連接到了處理器,數據就會流向處理器中進行處理 16 for (int i = 0; i < 100; i++) 17 { 18 buf.Post(i.ToString()); 19 }
1 var buf1 = new BufferBlock<string>(); 2 var action1 = new ActionBlock<string>(val => 3 { 4 Console.WriteLine(val); 5 }); 6 //buf1.LinkTo(action1); 7 Task.Run(() => 8 { 9 Thread.Sleep(3000); 10 buf1.LinkTo(action1); 11 }); 12 for (int i = 0; i < 10; i++) 13 { 14 buf1.Post(i.ToString()); 15 } 16 17 buf1.Complete(); //發送完成信號(不會阻塞),並且停止接受數據,就是后續的添加是不會被處理的 18 buf1.Post("asdf"); //這里添加不會被處理 19 //Completion屬性只有在調用了Complete()后才會有效 20 buf1.Completion.Wait(); //這里會等待(阻塞),直到連接到處理器為止(就是:如果沒有調用LinkTo(action),那么會一直等待,可設置超時值) 21 22 //對於Action的Complete(),需要Buffer調用Complete並且Completion.Wait才有效 23 action1.Complete(); 24 //Completion屬性只有在設置了Complete方法后才會有效 25 action1.Completion.Wait(); //這里會等待(阻塞),等待Action把所有數據處理完成為止(可設置超時值)
1 var buf = new BufferBlock<string>(); 2 //負載均衡:通過設置多個處理器,並且設置處理器的BoundedCapacity(限容值),那么處理器就會按照BoundedCapacity均衡處理數據了 3 //注意:每個處理器都應該設置BoundedCapacity,如果存在沒有設置BoundedCapacity(或設為-1)的處理器,那么數據流可能都只到該處理器進行處理了 4 buf.LinkTo(new ActionBlock<string>(val => 5 { 6 Console.WriteLine(val + ",Tom"); 7 }, new ExecutionDataflowBlockOptions() 8 { 9 BoundedCapacity = 10, 10 })); 11 buf.LinkTo(new ActionBlock<string>(val => 12 { 13 Console.WriteLine(val + ",Jane"); 14 }, new ExecutionDataflowBlockOptions() 15 { 16 BoundedCapacity = 10, 17 })); 18 for (int i = 0; i < 100; i++) 19 { 20 buf.Post(i.ToString()); 21 }
多播
1 //多播,就是連接的ActionBlock都會收到數據 2 BroadcastBlock<int> broadcast = new BroadcastBlock<int>(val => val); 3 broadcast.LinkTo(new ActionBlock<int>(val => Console.WriteLine(val + ",A"))); 4 broadcast.LinkTo(new ActionBlock<int>(val => Console.WriteLine(val + ",B"))); 5 for (int i = 0; i < 10; i++) 6 { 7 broadcast.Post(i); 8 }
1 BroadcastBlock<int> broadcast = new BroadcastBlock<int>(val => val); 2 broadcast.LinkTo(new ActionBlock<int>(val => Console.WriteLine(val + ",A"))); 3 for (int i = 0; i < 10; i++) 4 { 5 broadcast.Post(i); 6 } 7 Thread.Sleep(1000); 8 //連接ActionBlock應該在Post數據之前,不然在之后連接的ActionBlock就只接收到最后一個數據了 9 broadcast.LinkTo(new ActionBlock<int>(val => Console.WriteLine(val + ",B")));
TPL Dataflow更詳細使用說明可參考這篇博文:
http://www.cnblogs.com/haoxinyue/archive/2013/03/01/2938959.htm