項目情形
最近做項目發現有個業務邏輯性能效率巨慢, 實際上是掃描cosmos上面16個文件夾下面的數據, 每個folder下面大概分為100來個對應user的fodler, 然后對應user folder下面存放的是user的數據. 原邏輯是一個folder一個folder去scan, 然后將統計的數據按照 user和size存放到一個dictionary中, 最后匯總統計並且發郵件. 其中影響效率的部分有當前運行環境與cosmos的交互上, 不同的環境快慢不同. 另外一個就是code邏輯是串行的. 這樣會導致效率很差. 整體運行完一遍, 在慢的環境上要48小時, 快的環境也得將近20小時. 於是我開始想優化code邏輯. 使用並行方式嘗試提升運行效率.
使用ConcurrentDictionary
我之前大概了解過一些關於ConcurrentDictionary的概念, 知道它是一個線程安全的字典緩存. 可以用於並發場景. 對於我目前來說作為解決方案應該是適配的. 我准備使用一個核心ConcurrentDictionary作為數據緩存, 然后啟用16個Task去掃描cosmos上面的16個root folder. 每個task將掃描得到的數據記錄至ConcurrentDictionary當中. 最后當所有的task運行完畢后, 將ConcurrentDictionary中的值完善一下發出郵件. 初步做了一下, 在一個快的環境上, 運行了2個小時, 程序完成業務邏輯. 也就是說效率從原來的20小時提升至2小時, 提升了10倍. 效果還是非常顯著的. 下面我來介紹一下具體的實現和深入探究一下ConcurrentDictionary的底層實現.
核心緩存只一行代碼:
public static ConcurrentDictionary<string, UserCosmosInfo> aggregate = new ConcurrentDictionary<string, UserCosmosInfo>();
其中string為user的名字, 不會重復, 作為key正好. 然后UserCosmosInfo
是我封裝的一個類, 類里面的屬性是user后面需要用的:
public class UserCosmosInfo
{
public string Name { get; set; }
public long TotalStorage { get; set; }
public long Last1WeekAddedStorage { get; set; }
public long Last2WeekAddedStorage { get; set; }
public long Last1MonthAddedStorage { get; set; }
public long Last6MonthsAddedStorage { get; set; }
public long OtherMoreThan6Months { get; set; }
public List<CosmosStreamInfo> TopBigFiles { get; set; }
}
啟動16個Task分別掃描數據:
public static void AggregateCosmosFolder(string baseFolder, string[] filter, bool recursive = true)
{
var folderCount = 16;
var tasks = new Task[folderCount];
for (int i = 0; i < folderCount; i++)
{
int param = i;
tasks[i] = Task.Run(() => AggregateCosmosFolderInTask(param, baseFolder, filter, true));
}
Task.WaitAll(tasks);
}
實際每個task運行的業務類(簡略版):
private static void AggregateCosmosFolderInTask(int currentFolder, string baseFolder, string[] filter, bool recursive = true)
{
var threadId = Thread.CurrentThread.ManagedThreadId;
for (int j = 0; j < filter.Length; j++)
{
string u = filter[j];
string prefix = $"{baseFolder}/_{currentFolder.ToString("x")}/{u}/";
if (CosmosDirExist(prefix))
{
IEnumerable<StreamInfo> streams = GetFiles(prefix, recursive);
if (streams != null)
{
foreach (StreamInfo s in streams)
{
//調用 tryAdd, 將key u 和 value 加入緩存, 此處 tryAdd的底層實現了並發控制, 稍后我們看看底層的代碼實現....
Program.aggregate.TryAdd(u, new UserCosmosInfo()
{
///省略代碼篇幅
});
// new 一個新的 value, 這個value是需要更新到緩存中的.
var newValue = new UserCosmosInfo()
{
///省略代碼篇幅
};
// AddOrUpdate 方法用於更新緩存的數據, 底層同樣是並發控制做的很到位了, 所以我們直接放心使用......
// 注意它的參數, key 傳 u, 將上面的 newValue 傳進去, 在寫一個 Fun<> 委托, 委托在執行時也是線程安全的, 至於實現方式也需要看底層源碼.
Program.aggregate.AddOrUpdate(u, newValue, (key, existingValue) =>
{
existingValue.TotalStorage += newValue.TotalStorage;
return existingValue;
});
}
}
}
}
}
新的業務類的實現大致是這樣得. 主要是使用了 ConcurrentDictionary 的一些API, 下一步我將探究底層的實現
今天沒時間了, 下次繼續寫.
參考鏈接
https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/how-to-add-and-remove-items
https://stackoverflow.com/questions/30225476/task-run-with-parameters