反應式編程在客戶端編程當中的應用相當廣泛,而當前在服務端中的應用相對被提及較少。本篇將介紹如何在服務端編程中應用響應時編程來改進數據庫操作的性能。
開篇就是結論
利用 System.Reactive 配合 TaskCompelteSource ,可以將分散的單次數據庫插入請求合並會一個批量插入的請求。在確保正確性的前提下,實現數據庫插入性能的優化。
如果讀者已經了解了如何操作,那么剩下的內容就不需要再看了。
預設條件
現在,我們假設存在這樣一個 Repository 接口來表示一次數據庫的插入操作。
namespace Newbe.RxWorld.DatabaseRepository { public interface IDatabaseRepository { /// <summary> /// Insert one item and return total count of data in database /// </summary> /// <param name="item"></param> /// <returns></returns> Task<int> InsertData(int item); } }
接下來,我們在不改變該接口簽名的前提下,體驗一下不同的實現帶來的性能區別。
基礎版本
首先是基礎版本,采用的是最為常規的單次數據庫INSERT
操作來完成數據的插入。本示例采用的是SQLite
作為演示數據庫,方便讀者自行實驗。
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class NormalDatabaseRepository : IDatabaseRepository { private readonly IDatabase _database; public NormalDatabaseRepository( IDatabase database) { _database = database; } public Task<int> InsertData(int item) { return _database.InsertOne(item); } } }
常規操作。其中_database.InsertOne(item)
的具體實現就是調用了一次INSERT
。
基礎版本在同時插入小於20次時基本上可以較快的完成。但是如果數量級增加,例如需要同時插入一萬條數據庫,將會花費約20秒鍾,存在很大的優化空間。
TaskCompelteSource
TaskCompelteSource 是 TPL 庫中一個可以生成一個可操作 Task 的類型。對於 TaskCompelteSource 不太熟悉的讀者可以通過該實例代碼了解。
此處也簡單解釋一下該對象的作用,以便讀者可以繼續閱讀。
對於熟悉 javascript 的朋友,可以認為 TaskCompelteSource 相當於 Promise 對象。也可以相當於 jQuery 當中的 $.Deferred 。
如果都不了解的朋友,可以聽一下筆者吃麻辣燙時想到的生活化例子。
吃麻辣燙 | 技術解釋 |
---|---|
吃麻辣燙之前,需要先用盤子夾菜。 | 構造參數 |
夾好菜之后,拿到結賬處去結賬 | 調用方法 |
收銀員結賬完畢之后,會得到一個叫餐牌,會響鈴的那種 | 得到一個 Task 返回值 |
拿着菜牌找了一個位子坐下,玩手機等餐 | 正在 await 這個 Task ,CPU轉而處理其他事情 |
餐牌響了,去取餐,吃起來 | Task 完成,await 節數,繼續執行下一行代碼 |
那么 TaskCompelteSource 在哪兒呢?
首先,根據上面的例子,在餐牌響的時候,我們才會去取餐。那么餐牌什么時候才會響呢?當然是服務員手動按了一個在櫃台的手動開關才觸發了這個響鈴。
那么,櫃台的這個開關,可以被技術解釋為 TaskCompelteSource 。
餐台開關可以控制餐牌的響鈴。同樣, TaskCompelteSource 就是一種可以控制 Task 的狀態的對象。
解決思路
有了前面對 TaskCompelteSource 的了解,那么接下來就可以解決文章開頭的問題了。思路如下:
當調用 InsertData 時,可以創建一個 TaskCompelteSource 以及 item 的元組。為了方便說明,我們將這個元組命名為BatchItem
。
將 BatchItem 的 TaskCompelteSource 對應的 Task 返回出去。
調用 InsertData 的代碼會 await 返回的 Task,因此只要不操作 TaskCompelteSource ,調用者就一會一直等待。
然后,另外啟動一個線程,定時將 BatchItem 隊列消費掉。
這樣就完成了單次插入變為批量插入的操作。
筆者可能解釋的不太清楚,不過以下所有版本的代碼均基於以上思路。讀者可以結合文字和代碼進行理解。
ConcurrentQueue 版本
基於以上的思路,我們采用 ConcurrentQueue 作為 BatchItem 隊列進行實現,代碼如下(代碼很多,不必糾結,因為下面還有更簡單的):
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class ConcurrentQueueDatabaseRepository : IDatabaseRepository { private readonly ITestOutputHelper _testOutputHelper; private readonly IDatabase _database; private readonly ConcurrentQueue<BatchItem> _queue; // ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable private readonly Task _batchInsertDataTask; public ConcurrentQueueDatabaseRepository( ITestOutputHelper testOutputHelper, IDatabase database) { _testOutputHelper = testOutputHelper; _database = database; _queue = new ConcurrentQueue<BatchItem>(); // 啟動一個 Task 消費隊列中的 BatchItem _batchInsertDataTask = Task.Factory.StartNew(RunBatchInsert, TaskCreationOptions.LongRunning); _batchInsertDataTask.ConfigureAwait(false); } public Task<int> InsertData(int item) { // 生成 BatchItem ,將對象放入隊列。返回 Task 出去 var taskCompletionSource = new TaskCompletionSource<int>(); _queue.Enqueue(new BatchItem { Item = item, TaskCompletionSource = taskCompletionSource }); return taskCompletionSource.Task; } // 從隊列中不斷獲取 BatchItem ,並且一批一批插入數據庫,更新 TaskCompletionSource 的狀態 private void RunBatchInsert() { foreach (var batchItems in GetBatches()) { try { BatchInsertData(batchItems).Wait(); } catch (Exception e) { _testOutputHelper.WriteLine($"there is an error : {e}"); } } IEnumerable<IList<BatchItem>> GetBatches() { var sleepTime = TimeSpan.FromMilliseconds(50); while (true) { const int maxCount = 100; var oneBatchItems = GetWaitingItems() .Take(maxCount) .ToList(); if (oneBatchItems.Any()) { yield return oneBatchItems; } else { Thread.Sleep(sleepTime); } } IEnumerable<BatchItem> GetWaitingItems() { while (_queue.TryDequeue(out var item)) { yield return item; } } } } private async Task BatchInsertData(IEnumerable<BatchItem> items) { var batchItems = items as BatchItem[] ?? items.ToArray(); try { // 調用數據庫的批量插入操作 var totalCount = await _database