第二十節: 深入理解並發機制以及解決方案(鎖機制、EF自有機制、隊列模式等)


一. 理解並發機制

1. 什么是並發,並發與多線程有什么關系?

①. 先從廣義上來說,或者從實際場景上來說.

  高並發通常是海量用戶同時訪問(比如:12306買票、淘寶的雙十一搶購),如果把一個用戶看做一個線程的話那么並發可以理解成多線程同時訪問,高並發即海量線程同時訪問。

      (ps:我們在這里模擬高並發可以for循環多個線程即可)

②.從代碼或數據的層次上來說.

  多個線程同時在一條相同的數據上執行多個數據庫操作。

2. 從代碼層次上來說,給並發分類。

①.積極並發(樂觀並發、樂觀鎖):無論何時從數據庫請求數據,數據都會被讀取並保存到應用內存中。數據庫級別沒有放置任何顯式鎖。數據操作會按照數據層接收到的先后順序來執行。

 積極並發本質就是允許沖突發生,然后在代碼本身采取一種合理的方式去解決這個並發沖突,常見的方式有:

a.忽略沖突強制更新:數據庫會保存最后一次更新操作(以更新為例),會損失很多用戶的更新操作。

b.部分更新:允許所有的更改,但是不允許更新完整的行,只有特定用戶擁有的列更新了。這就意味着,如果兩個用戶更新相同的記錄但卻不同的列,那么這兩個更新都會成功,而且來自這兩個用戶的更改都是可見的。(EF默認實現不了這種情況)

c.詢問用戶:當一個用戶嘗試更新一個記錄時,但是該記錄自從他讀取之后已經被別人修改了,這時應用程序就會警告該用戶該數據已經被某人更改了,然后詢問他是否仍然要重寫該數據還是首先檢查已經更新的數據。(EF可以實現這種情況,在后面詳細介紹)

d.拒絕修改:當一個用戶嘗試更新一個記錄時,但是該記錄自從他讀取之后已經被別人修改了,此時告訴該用戶不允許更新該數據,因為數據已經被某人更新了。

(EF可以實現這種情況,在后面詳細介紹)

②.消極並發(悲觀並發、悲觀鎖):無論何時從數據庫請求數據,數據都會被讀取,然后該數據上就會加鎖,因此沒有人能訪問該數據。這會降低並發出現問題的機會,缺點是加鎖是一個昂貴的操作,會降低整個應用程序的性能。

 消極並發的本質就是永遠不讓沖突發生,通常的處理凡是是只讀鎖和更新鎖。

a. 當把只讀鎖放到記錄上時,應用程序只能讀取該記錄。如果應用程序要更新該記錄,它必須獲取到該記錄上的更新鎖。如果記錄上加了只讀鎖,那么該記錄仍然能夠被想要只讀鎖的請求使用。然而,如果需要更新鎖,該請求必須等到所有的只讀鎖釋放。同樣,如果記錄上加了更新鎖,那么其他的請求不能再在這個記錄上加鎖,該請求必須等到已存在的更新鎖釋放才能加鎖。

總結,這里我們可以簡單理解把並發業務部分用一個鎖(如:lock,實質是數據庫鎖,后面章節單獨介紹)鎖住,使其同時只允許一個線程訪問即可。

b. 加鎖會帶來很多弊端:

 (1):應用程序必須管理每個操作正在獲取的所有鎖;

 (2):加鎖機制的內存需求會降低應用性能

 (3):多個請求互相等待需要的鎖,會增加死鎖的可能性。

總結:盡量不要使用消極並發,EF默認是不支持消極並發的!!!

注意:EF默認就是積極並發,當然EF也可以配置成消極並發。

二. 並發機制的解決方案

1. 從架構的角度去解決(大層次 如:12306買票)

  nginx負載均衡、數據庫讀寫分離、多個業務服務器、多個數據庫服務器、NoSQL, 使用隊列來處理業務,將高並發的業務依次放到隊列中,然后按照先進先出的原則, 逐個處理(隊列的處理可以采用 Redis、RabbitMq等等)

  (PS:在后面的框架篇章里詳細介紹該方案)

2. 從代碼的角度去解決(在服務器能承載壓力的情況下,並發訪問同一條數據)

  實際的業務場景:如進銷存類的項目,涉及到同一個物品的出庫、入庫、庫存,我們都知道庫存在數據庫里對應了一條記錄,入庫要查出現在庫存的數量,然后加上入庫的數量,假設兩個線程同時入庫,假設查詢出來的庫存數量相同,但是更新庫存數量在數據庫層次上是有先后,最終就保留了后更新的數據,顯然是不正確的,應該保留的是兩次入庫的數量和。

(該案例的實質:多個線程同時在一條相同的數據上執行多個數據庫操作)

事先准備一張數據庫表:

解決方案一:(最常用的方式)

  給入庫和出庫操作加一個鎖,使其同時只允許一個線程訪問,這樣即使兩個線程同時訪問,但在代碼層次上,由於鎖的原因,還是有先有后的,這樣就保證了入庫操作的線程唯一性,當然庫存量就不會出錯了.

總結:該方案可以說是適合處理小范圍的並發且鎖內的業務執行不是很復雜。假設一萬線程同時入庫,每次入庫要等2s,那么這一萬個線程執行完成需要的總時間非常多,顯然不適合。

    (這種方式的實質就是給核心業務加了個lock鎖,這里就不做測試了)

 

解決方案二:EF處理積極並發帶來的沖突

1. 配置准備

  (1). 針對DBFirst模式,可以給相應的表額外加一列RowVersion,數據庫中為timestamp類型,對應的類中為byte[]類型,並且在Edmx模型上給該字段的並發模式設置為fixed(默認為None),這樣該表中所有字段都監控並發。

如果不想監視所有列(在不添加RowVersion的情況下),只需在Edmx模型是給特定的字段的並發模式設置為fixed,這樣只有被設置的字段被監測並發。

  測試結果: (DBFirst模式下的並發測試)

  事先在UserInfor1表中插入一條id、userName、userSex、userAge均為1的數據(清空數據)。

測試情況1:

  在不設置RowVersion並發模式為Fixed的情況下,兩個線程修改不同字段(修改同一個字段一個道理),后執行的線程的結果覆蓋前面的線程結果.

  發現測試結果為:1,1,男,1 ; 顯然db1線程修改的結果被db2線程給覆蓋了. (修改同一個字段一個道理)

 1             {
 2                 //1.創建兩個EF上下文,模擬代表兩個線程
 3                 var db1 = new ConcurrentTestDBEntities();
 4                 var db2 = new ConcurrentTestDBEntities();
 5 
 6                 UserInfor1 user1 = db1.UserInfor1.Find("1");
 7                 UserInfor1 user2 = db2.UserInfor1.Find("1");
 8 
 9                 //2. 執行修改操作
10                 //(db1的線程先執行完修改操作,並保存)
11                 user1.userName = "ypf";
12                 db1.Entry(user1).State = EntityState.Modified;
13                 db1.SaveChanges();
14 
15                 //(db2的線程在db1線程修改完成后,執行修改操作)
16                 try
17                 {
18                     user2.userSex = "";
19                     db2.Entry(user2).State = EntityState.Modified;
20                     db2.SaveChanges();
21 
22                     Console.WriteLine("測試成功");
23                 }
24                 catch (Exception)
25                 {
26                     Console.WriteLine("測試失敗");
27                 }
28             }
View Code

測試情況2:

  設置RowVersion並發模式為Fixed的情況下,兩個線程修改不同字段(修改同一個字段一個道理),如果該條數據已經被修改,利用DbUpdateConcurrencyException可以捕獲異常,進行積極並發的沖突處理。測試結果如下:

  a.RefreshMode.ClientWins: 1,1,男,1

  b.RefreshMode.StoreWins: 1,ypf,1,1

  c.ex.Entries.Single().Reload(); 1,ypf,1,1

 1             {
 2                 //1.創建兩個EF上下文,模擬代表兩個線程
 3                 var db1 = new ConcurrentTestDBEntities();
 4                 var db2 = new ConcurrentTestDBEntities();
 5 
 6                 UserInfor1 user1 = db1.UserInfor1.Find("1");
 7                 UserInfor1 user2 = db2.UserInfor1.Find("1");
 8 
 9                 //2. 執行修改操作
10                 //(db1的線程先執行完修改操作,並保存)
11                 user1.userName = "ypf";
12                 db1.Entry(user1).State = EntityState.Modified;
13                 db1.SaveChanges();
14 
15                 //(db2的線程在db1線程修改完成后,執行修改操作)
16                 try
17                 {
18                     user2.userSex = "";
19                     db2.Entry(user2).State = EntityState.Modified;
20                     db2.SaveChanges();
21 
22                     Console.WriteLine("測試成功");
23                 }
24                 catch (DbUpdateConcurrencyException ex)
25                 {
26                     Console.WriteLine("測試失敗:" + ex.Message);
27 
28                     //1. 保留上下文中的現有數據(即最新,最后一次輸入)
29                     //var oc = ((IObjectContextAdapter)db2).ObjectContext;
30                     //oc.Refresh(RefreshMode.ClientWins, user2);
31                     //oc.SaveChanges();
32 
33                     //2. 保留原始數據(即數據源中的數據代替當前上下文中的數據)
34                     //var oc = ((IObjectContextAdapter)db2).ObjectContext;
35                     //oc.Refresh(RefreshMode.StoreWins, user2);
36                     //oc.SaveChanges();
37 
38                     //3. 保留原始數據(而Reload處理也就是StoreWins,意味着放棄當前內存中的實體,重新到數據庫中加載當前實體)
39                     ex.Entries.Single().Reload();
40                     db2.SaveChanges();
41                 }
42             }

測試情況3:

  在不設置RowVersion並發模式為Fixed的情況下(也不需要RowVersion這個字段),單獨設置userName字段的並發模式為Fixed,兩個線程同時修改該字段,利用DbUpdateConcurrencyException可以捕獲異常,進行積極並發的沖突處理,但如果是兩個線程同時修改userName以外的字段,將不能捕獲異常,將走EF默認的處理方式,后執行的覆蓋先執行的。

  a.RefreshMode.ClientWins: 1,ypf2,1,1

  b.RefreshMode.StoreWins: 1,ypf,1,1

  c.ex.Entries.Single().Reload(); 1,ypf,1,1

 1             {
 2                 //1.創建兩個EF上下文,模擬代表兩個線程
 3                 var db1 = new ConcurrentTestDBEntities();
 4                 var db2 = new ConcurrentTestDBEntities();
 5 
 6                 UserInfor1 user1 = db1.UserInfor1.Find("1");
 7                 UserInfor1 user2 = db2.UserInfor1.Find("1");
 8 
 9                 //2. 執行修改操作
10                 //(db1的線程先執行完修改操作,並保存)
11                 user1.userName = "ypf";
12                 db1.Entry(user1).State = EntityState.Modified;
13                 db1.SaveChanges();
14 
15                 //(db2的線程在db1線程修改完成后,執行修改操作)
16                 try
17                 {
18                     user2.userName = "ypf2";
19                     db2.Entry(user2).State = EntityState.Modified;
20                     db2.SaveChanges();
21 
22                     Console.WriteLine("測試成功");
23                 }
24                 catch (DbUpdateConcurrencyException ex)
25                 {
26                     Console.WriteLine("測試失敗:" + ex.Message);
27 
28                     //1. 保留上下文中的現有數據(即最新,最后一次輸入)
29                     var oc = ((IObjectContextAdapter)db2).ObjectContext;
30                     oc.Refresh(RefreshMode.ClientWins, user2);
31                     oc.SaveChanges();
32 
33                     //2. 保留原始數據(即數據源中的數據代替當前上下文中的數據)
34                     //var oc = ((IObjectContextAdapter)db2).ObjectContext;
35                     //oc.Refresh(RefreshMode.StoreWins, user2);
36                     //oc.SaveChanges();
37 
38                     //3. 保留原始數據(而Reload處理也就是StoreWins,意味着放棄當前內存中的實體,重新到數據庫中加載當前實體)
39                     //ex.Entries.Single().Reload();
40                     //db2.SaveChanges();
41                 }
42             }
View Code

  (2). 針對CodeFirst模式,需要有這樣的一個屬性 public byte[] RowVersion { get; set; },並且給屬性加上特性[Timestamp],這樣該表中所有字段都監控並發。如果不想監視所有列(在不添加RowVersion的情況下),只需給特定的字段加上特性 [ConcurrencyCheck],這樣只有被設置的字段被監測並發。

  除了再配置上不同於DBFirst模式以為,是通過加特性的方式來標記並發,其它捕獲並發和積極並發的幾類處理方式均同DBFirst模式相同。(這里不做測試了)

2. 積極並發處理的三種形式總結:

  利用DbUpdateConcurrencyException可以捕獲異常,然后:

    a. RefreshMode.ClientWins:保留上下文中的現有數據(即最新,最后一次輸入)

    b. RefreshMode.StoreWins:保留原始數據(即數據源中的數據代替當前上下文中的數據)

    c.ex.Entries.Single().Reload(); 保留原始數據(而Reload處理也就是StoreWins,意味着放棄當前內存中的實體,重新到數據庫中加載當前實體)

3. 該方案總結:

  這種模式實質上就是獲取異常告訴程序,讓開發人員結合需求自己選擇怎么處理,但這種模式是解決代碼層次上的並發沖突,並不是解決大數量同時訪問崩潰問題的。

解決方案三:利用隊列來解決業務上的並發(架構層次上其實也是這種思路解決的)

1.先分析:

  前面說過所謂的高並發,就是海量的用戶同時向服務器發送請求,進行某個業務處理(比如定時秒殺的搶單),而這個業務處理是需要 一定時間的。

2.處理思路:

  將海量用戶的請求放到一個隊列里(如:Queue),先不進行業務處理,然后另外一個服務器從線程中讀取這個請求(MVC框架可以放到Global全局里),依次進行業務處理,至於處理完成后,是否需要告訴客戶端,可以根據實際需求來定,如果需要的話(可以借助Socket、Signalr、推送等技術來進行).

  特別注意:讀取隊列的線程是一直在運行,只要隊列中有數據,就給他拿出來.

  這里使用Queue隊列,可以參考:http://www.cnblogs.com/yaopengfei/p/8322016.html

  (PS:架構層次上的處理方案無非隊列是單獨一台服務器,執行從隊列讀取的是另外一台業務服務器,處理思想是相同的)

隊列單例類的代碼:

 1  /// <summary>
 2     /// 單例類
 3     /// </summary>
 4     public class QueueUtils
 5     {
 6         /// <summary>
 7         /// 靜態變量:由CLR保證,在程序第一次使用該類之前被調用,而且只調用一次
 8         /// </summary>
 9         private static readonly QueueUtils _QueueUtils = new QueueUtils();
10 
11         /// <summary>
12         /// 聲明為private類型的構造函數,禁止外部實例化
13         /// </summary>
14         private QueueUtils()
15         {
16 
17         }
18         /// <summary>
19         /// 聲明屬性,供外部調用,此處也可以聲明成方法
20         /// </summary>
21         public static QueueUtils instanse
22         {
23             get
24             {
25                 return _QueueUtils;
26             }
27         }
28 
29 
30         //下面是隊列相關的
31          System.Collections.Queue queue = new System.Collections.Queue();
32 
33         private static object o = new object();
34 
35         public int getCount()
36         {
37             return queue.Count;
38         }
39 
40         /// <summary>
41         /// 入隊方法
42         /// </summary>
43         /// <param name="myObject"></param>
44         public void Enqueue(object myObject)
45         {
46             lock (o)
47             {
48                 queue.Enqueue(myObject);
49             }
50         }
51         /// <summary>
52         /// 出隊操作
53         /// </summary>
54         /// <returns></returns>
55         public object Dequeue()
56         {
57             lock (o)
58             {
59                 if (queue.Count > 0)
60                 {
61                     return queue.Dequeue();
62                 }
63             }
64             return null;
65         }
66 
67     }
View Code

PS:這里的入隊和出隊都要加鎖,因為Queue默認不是線程安全的,不加鎖會存在資源競用問題從而業務出錯,或者直接使用ConcurrentQueue線程安全的隊列,就不需要加鎖了,關於隊列線程安全問題詳見:http://www.cnblogs.com/yaopengfei/p/8322016.html

臨時存儲數據類的代碼:

 1     /// <summary>
 2     /// 該類用來存儲請求信息
 3     /// </summary>
 4     public class TempInfor
 5     {
 6         /// <summary>
 7         /// 用戶編號
 8         /// </summary>
 9         public string userId { get; set; }
10     }

模擬高並發入隊,單獨線程出隊的代碼:

 1  {
 2                 //3.1 模擬高並發請求 寫入隊列
 3                 {
 4                     for (int i = 0; i < 100; i++)
 5                     {
 6                         Task.Run(() =>
 7                         {
 8                             TempInfor tempInfor = new TempInfor();
 9                             tempInfor.userId = Guid.NewGuid().ToString("N");
10                             //下面進行入隊操作
11                             QueueUtils.instanse.Enqueue(tempInfor);
12 
13                         });
14                     }
15                 }        
16                 //3.2 模擬另外一個線程隊列中讀取數據請求標記,進行相應的業務處理(該線程一直運行,不停止)
17                 Task.Run(() =>
18                 {
19                     while (true)
20                     {
21                         if (QueueUtils.instanse.getCount() > 0)
22                         {
23                             //下面進行出隊操作
24                             TempInfor tempInfor2 = (TempInfor)QueueUtils.instanse.Dequeue();
25 
26                             //拿到請求標記,進行相應的業務處理
27                             Console.WriteLine("id={0}的業務執行成功", tempInfor2.userId);
28                         }
29                     }           
30                 });
31                 //3.3 模擬過了一段時間(6s后),又有新的請求寫入
32                 Thread.Sleep(6000);
33                 Console.WriteLine("6s的時間已經過去了");
34                 {
35                     for (int j = 0; j < 100; j++)
36                     {
37                         Task.Run(() =>
38                         {
39                             TempInfor tempInfor = new TempInfor();
40                             tempInfor.userId = Guid.NewGuid().ToString("N");
41                             //下面進行入隊操作
42                             QueueUtils.instanse.Enqueue(tempInfor);
43 
44                         });
45                     }
46                 }
47             }

3.下面案例的測試結果:

  一次輸出100條數據,6s過后,再一次輸出100條數據。

4. 總結:

  該方案是一種迂回的方式處理高並發,在業內這種思想也是非常常見,但該方案也有一個弊端,客戶端請求的實時性很難保證,或者即使要保證(比如引入實時通訊技術),

 也要付出不少代價.

 

解決方案四: 利用數據庫自有的鎖機制進行處理

   (在后面數據鎖機制章節進行介紹)

 

 

!

  • 作       者 : Yaopengfei(姚鵬飛)
  • 博客地址 : http://www.cnblogs.com/yaopengfei/
  • 聲     明1 : 本人才疏學淺,用郭德綱的話說“我是一個小學生”,如有錯誤,歡迎討論,請勿謾罵^_^。
  • 聲     明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM