設計目的:
在多線程環境中,多線程處理數據時,如果每線程都單獨寫數據庫,性能低下。因此,為提高性能,數據需批量寫到數據庫中。出於此目的,進行了數據隊列的設計:
實現代碼:
1 /// <summary> 2 /// 多線程異步推送數據,一線程異步批量處理數據 3 /// </summary> 4 /// <typeparam name="T"></typeparam> 5 public abstract class SingleThreadQueue<T> 6 { 7 protected List<T> queue; 8 private Action _beforAction;//在批量執行前執行的方法 9 private Action _afterAction;//在批量執行完成后執行的方法 10 private bool _runing = false; 11 private int _maxCount, _maxSize; 12 13 /// <summary> 14 /// 15 /// </summary> 16 /// <param name="maxCount">每次最大處理數</param> 17 /// <param name="maxSize">緩沖最大數</param> 18 /// <param name="beforAction">開始執行處理前的動作</param> 19 /// <param name="afterAction">所有數據處理完成后的動作</param> 20 public SingleThreadQueue(int maxCount, int maxSize, Action beforAction = null, Action afterAction = null) 21 { 22 this.queue = new List<T>(); 23 this._beforAction = beforAction; 24 this._afterAction = afterAction; 25 this._maxCount = maxCount; 26 this._maxSize = maxSize; 27 } 28 29 /// <summary> 30 /// 異步推送數據 31 /// </summary> 32 /// <param name="item"></param> 33 /// <returns>數據成功加入待處理隊列,返回true,數據達到隊列的最大緩沖,數據不會進入隊列,返回false</returns> 34 public bool AsyncEnqueue(T item) 35 { 36 lock (this) 37 { 38 if (this.queue.Count >= this._maxSize) 39 return false; 40 41 this.queue.Add(item); 42 this.Activate(); 43 return true; 44 } 45 } 46 47 protected T[] DoDequeue(List<T> queue) 48 { 49 if (queue.Count > this._maxCount) 50 { 51 var ds = queue.Take(this._maxCount).ToArray(); 52 queue.RemoveRange(0, this._maxCount); 53 return ds; 54 } 55 else 56 { 57 var ds = queue.ToArray(); 58 queue.Clear(); 59 return ds; 60 } 61 } 62 63 /// <summary> 64 /// 實現此方法,批量執行處理時的方法 65 /// </summary> 66 /// <param name="items"></param> 67 protected abstract void OnExecute(T[] items); 68 69 private void Activate() 70 { 71 if (this._runing) 72 { 73 return; 74 } 75 76 this._runing = true; 77 ThreadPool.QueueUserWorkItem((obj) => 78 { 79 try 80 { 81 this._beforAction?.Invoke(); 82 83 T[] items; 84 //管理線程 85 while (true) 86 { 87 lock (this) 88 { 89 if (queue.Count < 1) 90 { 91 this._afterAction?.Invoke(); 92 this._runing = false; 93 break; 94 } 95 96 items = this.DoDequeue(queue); 97 } 98 99 try 100 { 101 this.OnExecute(items); 102 } 103 catch (Exception ex) 104 { 105 //TODO:異常預警 106 // Loger.Exception(ex.Message, ex); 107 } 108 } 109 } 110 catch (Exception ex) 111 { 112 //TODO:異常預警 113 // Loger.Exception(ex.Message, ex); 114 } 115 }); 116 } 117 118 }
經測試,多線程單獨寫入DB,在開發環境,100/秒,但批量處理可達5000/秒。