8天玩轉並行開發——第五天 同步機制(下)


   

     承接上一篇,我們繼續說下.net4.0中的同步機制,是的,當出現了並行計算的時候,輕量級別的同步機制應運而生,在信號量這一塊

出現了一系列的輕量級,今天繼續介紹下面的3個信號量 CountdownEvent,SemaphoreSlim,ManualResetEventSlim。

 

一:CountdownEvent

     這種采用信號狀態的同步基元非常適合在動態的fork,join的場景,它采用“信號計數”的方式,就比如這樣,一個麻將桌只能容納4個

人打麻將,如果后來的人也想搓一把碰碰運氣,那么他必須等待直到麻將桌上的人走掉一位。好,這就是簡單的信號計數機制,從技術角

度上來說它是定義了最多能夠進入關鍵代碼的線程數。

     但是CountdownEvent更牛X之處在於我們可以動態的改變“信號計數”的大小,比如一會兒能夠容納8個線程,一下又4個,一下又10個,

這樣做有什么好處呢?還是承接上一篇文章所說的,比如一個任務需要加載1w條數據,那么可能出現這種情況。

 

加載User表:         根據user表的數據量,我們需要開5個task。

加載Product表:    產品表數據相對比較多,計算之后需要開8個task。

加載order表:       由於我的網站訂單豐富,計算之后需要開12個task。

 

先前的文章也說了,我們需要協調task在多階段加載數據的同步問題,那么如何應對這里的5,8,12,幸好,CountdownEvent給我們提供了

可以動態修改的解決方案。

 

  1 using System.Collections.Concurrent;
  2 using System.Threading.Tasks;
  3 using System;
  4 using System.Diagnostics;
  5 using System.Collections.Generic;
  6 using System.Linq;
  7 using System.Threading;
  8 
  9 class Program
 10 {
 11     //默認的容納大小為“硬件線程“數
 12     static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount);
 13 
 14     static void Main(string[] args)
 15     {
 16         //加載User表需要5個任務
 17         var userTaskCount = 5;
 18 
 19         //重置信號
 20         cde.Reset(userTaskCount);
 21 
 22         for (int i = 0; i < userTaskCount; i++)
 23         {
 24             Task.Factory.StartNew((obj) =>
 25             {
 26                 LoadUser(obj);
 27             }, i);
 28         }
 29 
 30         //等待所有任務執行完畢
 31         cde.Wait();
 32 
 33         Console.WriteLine("\nUser表數據全部加載完畢!\n");
 34 
 35         //加載product需要8個任務
 36         var productTaskCount = 8;
 37 
 38         //重置信號
 39         cde.Reset(productTaskCount);
 40 
 41         for (int i = 0; i < productTaskCount; i++)
 42         {
 43             Task.Factory.StartNew((obj) =>
 44             {
 45                 LoadProduct(obj);
 46             }, i);
 47         }
 48 
 49         cde.Wait();
 50 
 51         Console.WriteLine("\nProduct表數據全部加載完畢!\n");
 52 
 53         //加載order需要12個任務
 54         var orderTaskCount = 12;
 55 
 56         //重置信號
 57         cde.Reset(orderTaskCount);
 58 
 59         for (int i = 0; i < orderTaskCount; i++)
 60         {
 61             Task.Factory.StartNew((obj) =>
 62             {
 63                 LoadOrder(obj);
 64             }, i);
 65         }
 66 
 67         cde.Wait();
 68 
 69         Console.WriteLine("\nOrder表數據全部加載完畢!\n");
 70 
 71         Console.WriteLine("\n(*^__^*) 嘻嘻,恭喜你,數據全部加載完畢\n");
 72 
 73         Console.Read();
 74     }
 75 
 76     static void LoadUser(object obj)
 77     {
 78         try
 79         {
 80             Console.WriteLine("當前任務:{0}正在加載User部分數據!", obj);
 81         }
 82         finally
 83         {
 84             cde.Signal();
 85         }
 86     }
 87 
 88     static void LoadProduct(object obj)
 89     {
 90         try
 91         {
 92             Console.WriteLine("當前任務:{0}正在加載Product部分數據!", obj);
 93         }
 94         finally
 95         {
 96             cde.Signal();
 97         }
 98     }
 99 
100     static void LoadOrder(object obj)
101     {
102         try
103         {
104             Console.WriteLine("當前任務:{0}正在加載Order部分數據!", obj);
105         }
106         finally
107         {
108             cde.Signal();
109         }
110     }
111 }

 


我們看到有兩個主要方法:Wait和Signal。每調用一次Signal相當於麻將桌上走了一個人,直到所有人都搓過麻將wait才給放行,這里同樣要

注意也就是“超時“問題的存在性,尤其是在並行計算中,輕量級別給我們提供了”取消標記“的機制,這是在重量級別中不存在的,比如下面的

重載public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken),具體使用可以看前一篇文章的介紹。

 

二:SemaphoreSlim

     在.net 4.0之前,framework中有一個重量級的Semaphore,人家可以跨進程同步,咋輕量級不行,msdn對它的解釋為:限制可同時訪問

某一資源或資源池的線程數。關於它的重量級demo,我的上一個系列有演示,你也可以理解為CountdownEvent是 SemaphoreSlim的功能加

強版,好了,舉一個輕量級使用的例子。

 1 using System.Collections.Concurrent;
 2 using System.Threading.Tasks;
 3 using System;
 4 using System.Diagnostics;
 5 using System.Collections.Generic;
 6 using System.Linq;
 7 using System.Threading;
 8 
 9 class Program
10 {
11     static SemaphoreSlim slim = new SemaphoreSlim(Environment.ProcessorCount, 12);
12 
13     static void Main(string[] args)
14     {
15         for (int i = 0; i < 12; i++)
16         {
17             Task.Factory.StartNew((obj) =>
18             {
19                 Run(obj);
20             }, i);
21         }
22 
23         Console.Read();
24     }
25 
26     static void Run(object obj)
27     {
28         slim.Wait();
29 
30         Console.WriteLine("當前時間:{0}任務 {1}已經進入。", DateTime.Now, obj);
31 
32         //這里busy3s中
33         Thread.Sleep(3000);
34 
35         slim.Release();
36     }
37 }

 


同樣,防止死鎖的情況,我們需要知道”超時和取消標記“的解決方案,像SemaphoreSlim這種定死的”線程請求范圍“,其實是降低了擴展性,

所以說,試水有風險使用需謹慎,在覺得有必要的時候使用它。

 

三: ManualResetEventSlim

     相信它的重量級別大家都知道是ManualReset,而這個輕量級別采用的是"自旋等待“+”內核等待“,也就是說先采用”自旋等待的方式“等待,

直到另一個任務調用set方法來釋放它。如果遲遲等不到釋放,那么任務就會進入基於內核的等待,所以說如果我們知道等待的時間比較短,采

用輕量級的版本會具有更好的性能,原理大概就這樣,下面舉個小例子。

 1 using System.Collections.Concurrent;
 2 using System.Threading.Tasks;
 3 using System;
 4 using System.Diagnostics;
 5 using System.Collections.Generic;
 6 using System.Linq;
 7 using System.Threading;
 8 
 9 class Program
10 {
11     //2047:自旋的次數
12     static ManualResetEventSlim mrs = new ManualResetEventSlim(false, 2047);
13 
14     static void Main(string[] args)
15     {
16 
17         for (int i = 0; i < 12; i++)
18         {
19             Task.Factory.StartNew((obj) =>
20             {
21                 Run(obj);
22             }, i);
23         }
24 
25         Console.WriteLine("當前時間:{0}我是主線程{1},你們這些任務都等2s執行吧:\n",
26         DateTime.Now,
27         Thread.CurrentThread.ManagedThreadId);
28         Thread.Sleep(2000);
29 
30         mrs.Set();
31 
32         Console.Read();
33     }
34 
35     static void Run(object obj)
36     {
37         mrs.Wait();
38 
39         Console.WriteLine("當前時間:{0}任務 {1}已經進入。", DateTime.Now, obj);
40     }
41 }

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM