需求:現需要將多個數據源的數據導入到目標數據庫,這是一個經典的生產消費應用的例子。
直接上代碼,看下實現:
// 初始化列隊緩沖區 隊列大小為100 IDataCollection<List<T>> queue = new QueueCollection<List<T>>(100); //開啟X個后台任務,讀取RabbitMQ隊列信息, 把列隊信息插入緩沖區隊列 var count = 1; for (int i = 0; i < count; i++) { Task.Factory.StartNew(() => new Producer<List<T>>(queue).Start(new RabbitSource<List<T>>().Get)); } //開啟X個后台任務,主動獲取數據庫數據,作為數據生產者,插入到緩沖區隊列, for (int i = 0; i < count; i++) { Task.Factory.StartNew(() => new Producer<List<T>>(queue).Start(new DatabaseSource<List<T>>().Get)); } //開啟X個后台任務,主動獲取讀取緩沖區列隊,作為數據消息者,把數據插入到ES庫, for (int i = 0; i < count; i++) { Task.Factory.StartNew(() => new Customer<List<T>>(queue).Start(new Elastic().Insert)); }
隊列我們采用線程安全的ConcurrentQueue隊列:
/// <summary> /// 緩沖區隊列 /// ConcurrentQueue線程安全,不用考慮鎖的問題 /// </summary> public class QueueCollection<T> :IDataCollection<T> { //隊列最大值 private int _maxSize; /// <summary> /// 線程安全的隊列 /// </summary> private ConcurrentQueue<T> _queue; public QueueCollection(int maxSize) { this._maxSize = maxSize; _queue = new ConcurrentQueue<T>(); } public bool isPopWaiting() { return !_queue.Any(); } public bool isPushWaiting() { return this._maxSize == _queue.Count; } public T Pop() { T _obj = default(T); if (!_queue.IsEmpty) _queue.TryDequeue(out _obj); return _obj; } public void Push(T t) { if (this._maxSize > _queue.Count) { _queue.Enqueue(t); } } }
如果我們不使用這個隊列,只要滿足IDataCollection接口,也可以進行替換:
public interface IDataCollection<T> { /// <summary> /// 插入數據 /// </summary> /// <param name="t"></param> void Push(T t); /// <summary> /// 取出數據 /// </summary> /// <returns></returns> T Pop(); /// <summary> /// 是否插入數據等待 /// </summary> /// <returns></returns> bool isPushWaiting(); /// <summary> /// 是否取出數據等待 /// </summary> /// <returns></returns> bool isPopWaiting(); }
生產者:
public class Producer<T> : ITransientDependency { private int sleep; private IDataCollection<T> bufferQueue; public Producer(IDataCollection<T> queue) { sleep = 3000; bufferQueue = queue; } public void Start(Action<Action<T>> methodCall) { //入隊 methodCall((bills) => { this.Enqueue(bills); }); } private void Enqueue(T t) { var isWaiting = true; while (isWaiting) { if (!bufferQueue.isPushWaiting()) { this.bufferQueue.Push(t); isWaiting = false; } else { //生產者等待時間 Thread.Sleep(sleep); } } } }
消費者:
/// <summary> /// 消費者 /// </summary> public class Customer<T> { //產品緩存隊列 private IDataCollection<T> _queue; //消費者等待時間 private int Spead = 5000;//消費者等待時間 public Customer(IDataCollection<T> queue) { this._queue = queue; } public void Start(Action<T> method) { while (true) { if (!_queue.isPopWaiting()) { T box = this._queue.Pop(); method(box); } else { Thread.Sleep(Spead); } } } }
方法委托,也寫了個基類,其實意義並不大,只是為了規范, 防止方法命名隨意起。
public interface IDataSource<T> { void Get(Action<T> func); }
最后,在DataSource的get方法中,調用 func即可。