多線程數據寫入隊列,異步線程進行批量處理


設計目的:

在多線程環境中,多線程處理數據時,如果每線程都單獨寫數據庫,性能低下。因此,為提高性能,數據需批量寫到數據庫中。出於此目的,進行了數據隊列的設計:

 

實現代碼:

 

  1 /// <summary>
  2     /// 多線程異步推送數據,一線程異步批量處理數據
  3     /// </summary>
  4     /// <typeparam name="T"></typeparam>
  5     public abstract class SingleThreadQueue<T>
  6     {
  7         protected List<T> queue;
  8         private Action _beforAction;//在批量執行前執行的方法
  9         private Action _afterAction;//在批量執行完成后執行的方法  
 10         private bool _runing = false;
 11         private int _maxCount, _maxSize;
 12 
 13         /// <summary>
 14         /// 
 15         /// </summary> 
 16         /// <param name="maxCount">每次最大處理數</param>
 17         /// <param name="maxSize">緩沖最大數</param>
 18         /// <param name="beforAction">開始執行處理前的動作</param>
 19         /// <param name="afterAction">所有數據處理完成后的動作</param>
 20         public SingleThreadQueue(int maxCount, int maxSize, Action beforAction = null, Action afterAction = null)
 21         {
 22             this.queue = new List<T>();
 23             this._beforAction = beforAction;
 24             this._afterAction = afterAction;
 25             this._maxCount = maxCount;
 26             this._maxSize = maxSize;
 27         }
 28 
 29         /// <summary>
 30         /// 異步推送數據
 31         /// </summary>
 32         /// <param name="item"></param>
 33         /// <returns>數據成功加入待處理隊列,返回true,數據達到隊列的最大緩沖,數據不會進入隊列,返回false</returns>
 34         public bool AsyncEnqueue(T item)
 35         {
 36             lock (this)
 37             {
 38                 if (this.queue.Count >= this._maxSize)
 39                     return false;
 40 
 41                 this.queue.Add(item);
 42                 this.Activate();
 43                 return true;
 44             }
 45         }
 46 
 47         protected T[] DoDequeue(List<T> queue)
 48         {
 49             if (queue.Count > this._maxCount)
 50             {
 51                 var ds = queue.Take(this._maxCount).ToArray();
 52                 queue.RemoveRange(0, this._maxCount);
 53                 return ds;
 54             }
 55             else
 56             {
 57                 var ds = queue.ToArray();
 58                 queue.Clear();
 59                 return ds;
 60             }
 61         }
 62 
 63         /// <summary>
 64         /// 實現此方法,批量執行處理時的方法
 65         /// </summary>
 66         /// <param name="items"></param>
 67         protected abstract void OnExecute(T[] items);
 68 
 69         private void Activate()
 70         {
 71             if (this._runing)
 72             {
 73                 return;
 74             }
 75 
 76             this._runing = true;
 77             ThreadPool.QueueUserWorkItem((obj) =>
 78             {
 79                 try
 80                 {
 81                     this._beforAction?.Invoke();
 82 
 83                     T[] items;
 84                     //管理線程
 85                     while (true)
 86                     {
 87                         lock (this)
 88                         {
 89                             if (queue.Count < 1)
 90                             {
 91                                 this._afterAction?.Invoke();
 92                                 this._runing = false;
 93                                 break;
 94                             }
 95 
 96                             items = this.DoDequeue(queue);
 97                         }
 98 
 99                         try
100                         {
101                             this.OnExecute(items);
102                         }
103                         catch (Exception ex)
104                         {
105                             //TODO:異常預警
106                             // Loger.Exception(ex.Message, ex);
107                         }
108                     }
109                 }
110                 catch (Exception ex)
111                 {
112                     //TODO:異常預警
113                     // Loger.Exception(ex.Message, ex);
114                 }
115             });
116         }
117 
118     }

 經測試,多線程單獨寫入DB,在開發環境,100/秒,但批量處理可達5000/秒。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM