回顧上文
作為單體程序,依賴的第三方服務雖不多,但是2C的程序還是有不少內容可講; 作為一個常規互聯網系統,無外乎就是接受請求、處理請求,輸出響應。
由於業務漸漸增長,單機多核的共享內存模式帶來的問題很多,編程也困難,隨着多核時代和分布式系統的到來,共享模型已經不太適合並發編程,因此Actor模型又重新受到了人們的重視。

-----調試多線程都懂------
* 傳統的編程模型通常使用回調和同步對象(如鎖)來協調任務和訪問共享數據, 從宏觀看傳統模型: 任務是一步步緊接着完成的,資源是需要搶占的。
* Actor模式是一種並發模型,與另一種模型共享內存完全相反,Actor模型share nothing。所有的線程(或進程)通過消息傳遞的方式進行合作,這些線程(或進程)稱為Actor, 預先定義了任務的流水線后,不關注數據什么時候流到這個任務 ,專注完成工序任務。
.Net TPL Dataflow組件幫助我們快速實現Actor模型。

TPL Dataflow是微軟前幾年給出的數據處理庫, 內置常見的處理塊,可將這些塊組裝成一個處理管道,"塊"對應處理管道中的"階段", 可類比AspNetCore 中Middleware 和pipeline.。
-
TPL Dataflow庫為消息傳遞和並行化CPU密集型和I / O密集型應用程序提供了編程基礎,這些應用程序具有高吞吐量和低延遲。它還可以讓您明確控制數據的緩沖方式並在系統中移動。
- 為了更好地理解數據流編程模型,請考慮從磁盤異步加載圖像並創建這些圖像的應用程序。
-
傳統的編程模型通常使用回調和同步對象(如鎖)來協調任務和訪問共享數據,
-
通過使用數據流編程模型,您可以創建在從磁盤讀取圖像時處理圖像的數據流對象。在數據流模型下,您可以聲明數據在可用時的處理方式以及數據之間的依賴關系。 由於運行時管理數據之間的依賴關系,因此通常可以避免同步訪問共享數據的要求。此外,由於運行時調度基於數據的異步到達而工作,因此數據流可以通過有效地管理底層線程來提高響應性和吞吐量。
-
- 需要注意的是:TPL Dataflow 非分布式數據流,消息在進程內傳遞, 使用nuget引用 System.Threading.Tasks.Dataflow 包。
TPL Dataflow 核心概念
Buffer & Block
TPL Dataflow 內置的Block覆蓋了常見的應用場景,當然如果內置塊不能滿足你的要求,你也可以自定“塊”。
Block可以划分為下面3類:
- Buffering Only 【Buffer不是緩存Cache的概念, 而是一個緩沖區的概念】
-
Execution
-
Grouping
使用以上塊混搭處理管道, 大多數的塊都會執行一個操作,有些時候需要將消息分發到不同Block,這時可使用特殊類型的緩沖塊給管道“”分叉”。
Execution Block
-
輸入、輸出消息的緩沖區(一般稱為Input,Output隊列)
-
在消息上執行動作的委托

消息在輸入和輸出時能夠被緩沖:當Func委托的運行速度比輸入的消息速度慢時,后續消息將在到達時進行緩沖;當下一個塊的輸入緩沖區中沒有容量時,將在輸出時緩沖。
每個塊我們可以配置:
-
緩沖區的總容量, 默認無上限
-
執行操作委托的並發度, 默認情況下塊按照順序處理消息,一次一個。
我們將塊鏈接在一起形成一個處理管道,生產者將消息推向管道。
TPL Dataflow有一個基於pull的機制(使用Receive和TryReceive方法),但我們將在管道中使用塊連接和推送機制。
-
TransformBlock(Execution category)-- 由輸入輸出緩沖區和一個Func<TInput, TOutput>委托組成,消費的每個消息,都會輸出另外一個,你可以使用這個Block去執行輸入消息的轉換,或者轉發輸出的消息到另外一個Block。
-
TransformManyBlock (Execution category) -- 由輸入輸出緩沖區和一個Func<TInput, IEnumerable<TOutput>>委托組成, 它為輸入的每個消息輸出一個 IEnumerable<TOutput>
-
BroadcastBlock (Buffering category)-- 由只容納1個消息的緩沖區和Func<T, T>委托組成。緩沖區被每個新傳入的消息所覆蓋,委托僅僅為了讓你控制怎樣克隆這個消息,不做消息轉換。
該塊可以鏈接到多個塊(管道的分叉),雖然它一次只緩沖一條消息,但它一定會在該消息被覆蓋之前將該消息轉發到鏈接塊(鏈接塊還有緩沖區)。
-
ActionBlock (Execution category)-- 由緩沖區和Action<T>委托組成,他們一般是管道的結尾,他們不再給其他塊轉發消息,他們只會處理輸入的消息。
-
BatchBlock (Grouping category)-- 告訴它你想要的每個批處理的大小,它將累積消息,直到它達到那個大小,然后將它作為一組消息轉發到下一個塊。
還有一下其他的Block類型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,我們暫時不會深入。
Pipeline Chain React
當輸入緩沖區達到上限容量,為其供貨的上游塊的輸出緩沖區將開始填充,當輸出緩沖區已滿時,該塊必須暫停處理,直到緩沖區有空間,這意味着一個Block的處理瓶頸可能導致所有前面的塊的緩沖區被填滿。
但是不是所有的塊變滿時,都會暫停,BroadcastBlock 有允許1個消息的緩沖區,每個消息都會被覆蓋, 因此如果這個廣播塊不能將消息轉發到下游,則在下個消息到達的時候消息將丟失,這在某種意義上是一種限流(比較生硬).
編程實踐
生產者投遞消息
可使用Post或者SendAsync 方法向首塊投遞消息
-
Post方法即時返回true/false, True意味着消息被block接收(緩沖區有空余), false意味着拒絕了消息(緩沖區已滿或者Block已經出錯了)。
-
SendAsync方法返回一個Task<bool>, 將會以異步的方式阻塞直到塊接收、拒絕、塊出錯。
定義流水線
按照上圖工作流定義 流水線
public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory) { _httpClient = httpClientFactory.CreateClient("bce-request"); _redisDB0 = redisCache[0]; _redisDB = redisCache; _logger = loggerFactory.CreateLogger(nameof(EqidPairHandler)); var option = new DataflowLinkOptions { PropagateCompletion = true }; publisher = _redisDB.RedisConnection.GetSubscriber(); _eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel> ( // redis piublih 沒有做在TransformBlock fun里面, 因為publih失敗可能影響后續的block傳遞 eqidPair => EqidResolverAsync(eqidPair), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism") } ); // https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline _logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory); _logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) ); _broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容納一個消息的緩存區和拷貝函數組成
_broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);
_broadcastBlock.LinkTo(_logPublishBlock, option);
_eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);
}
public class LogBatchBlock<T> : ILogDestination<T> where T : IModelBase { private readonly string _dirPath; private readonly Timer _triggerBatchTimer; private readonly Timer _openFileTimer; private DateTime? _nextCheckpoint; private TextWriter _currentWriter; private readonly LogHead _logHead; private readonly object _syncRoot = new object(); private readonly ILogger _logger; private readonly BatchBlock<T> _packer; private readonly ActionBlock<T[]> batchWriterBlock; private readonly TimeSpan _logFileIntervalTimeSpan; /// <summary> /// Generate request log file. /// </summary> public LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory) { _logger = loggerFactory.CreateLogger<LogBatchBlock<T>>(); _dirPath = logConfig.DirPath; if (!Directory.Exists(_dirPath)) { Directory.CreateDirectory(_dirPath); } _logHead = logConfig.LogHead; _packer = new BatchBlock<T>(logConfig.BatchSize); batchWriterBlock = new ActionBlock<T[]>(models => WriteToFile(models)); // 形成pipeline必須放在LinkTo前面 _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion = true }); // 防止BatchPacker一直不滿足10條數據,無法打包,故設定間隔15s強制寫入 _triggerBatchTimer = new Timer(state => { _packer.TriggerBatch(); }, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period)); // 實時寫文件流能確保隨時生成文件,但存在極端情況:某小時沒有需要寫入的數據,導致該小時不會創建文件,以下定時任務確保創建文件 _logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval); _openFileTimer = new Timer(state => { AlignCurrentFileTo(DateTime.Now); }, null, TimeSpan.Zero, _logFileIntervalTimeSpan); } public ITargetBlock<T> InputBlock => _packer; private void AlignCurrentFileTo(DateTime dt) { if (!_nextCheckpoint.HasValue) { OpenFile(dt); } if (dt >= _nextCheckpoint.Value) { CloseFile(); OpenFile(dt); } } private void OpenFile(DateTime now, string fileSuffix = null) { string filePath = null; try { var currentHour = now.Date.AddHours(now.Hour); _nextCheckpoint = currentHour.Add(_logFileIntervalTimeSpan); int hourConfiguration = _logFileIntervalTimeSpan.Hours; int minuteConfiguration = _logFileIntervalTimeSpan.Minutes; filePath = $"{_dirPath}/u_ex{now.ToString("yyMMddHH")}{fileSuffix}.log"; var appendHead = !File.Exists(filePath); if (filePath != null) { var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write); var sw = new StreamWriter(stream, Encoding.Default); if (appendHead) { sw.Write(GenerateHead()); } _currentWriter = sw; _logger.LogDebug($"{DateTime.Now} TextWriter has been created."); } } catch (Exception e) { if (fileSuffix == null) { _logger.LogWarning($"OpenFile failed:{e.StackTrace.ToString()}:{e.Message}." ); OpenFile(now, $"-{Guid.NewGuid()}"); } else { _logger.LogError($"OpenFile failed after retry: {filePath}", e); } } } private void CloseFile() { if (_currentWriter != null) { _currentWriter.Flush(); _currentWriter.Dispose(); _currentWriter = null; _logger.LogDebug($"{DateTime.Now} TextWriter has been disposed."); } _nextCheckpoint = null; } private string GenerateHead() { StringBuilder head = new StringBuilder(); head.AppendLine("#Software: " + _logHead.Software) .AppendLine("#Version: " + _logHead.Version) .AppendLine($"#Date: {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}") .AppendLine("#Fields: " + _logHead.Fields); return head.ToString(); } private void WriteToFile(T[] models) { try { lock (_syncRoot) { var flag = false; foreach (var model in models) { if (model == null) continue; flag = true; AlignCurrentFileTo(model.ServerLocalTime); _currentWriter.WriteLine(model.ToString()); } if (flag) _currentWriter.Flush(); } } catch (Exception ex) { _logger.LogError("WriteToFile Error : {0}", ex.Message); } } public bool AcceptLogModel(T model) { return _packer.Post(model); } public string GetDirPath() { return _dirPath; } public async Task CompleteAsync() { _triggerBatchTimer.Dispose(); _openFileTimer.Dispose(); _packer.TriggerBatch(); _packer.Complete(); await InputBlock.Completion; lock (_syncRoot) { CloseFile(); } } }
注意事項 :異常處理
上述程序在部署時就遇到相關的坑位,在測試環境_eqid2ModelTransformBlock 內Func委托穩定執行,程序並未出現異樣;
部署到生產之后, 該Pipeline運行一段時間就停止工作,一直很困惑, 后來通過監測_eqid2ModelTransformBlock.Completion 屬性,發現該塊在執行某次Func委托時報錯,提前進入完成態
官方資料表明: 某塊進入Fault、Cancel狀態,都會導致該塊提前進入“完成態”,但因Fault、Cancle進入的“完成態”會導致 輸入buffer和輸出buffer 被清空。
After Fault has been called on a dataflow block, that block will complete, and its Completion task will enter a final state. Faulting a block, as with canceling a block, causes buffered messages (unprocessed input messages as well as unoffered output messages) to be lost.
當TPL Dataflow不再處理消息並且能保證不再處理消息的時候,就被定義為 "完成態", IDataflow.Completion屬性(Task對象)標記了該狀態, Task對象的TaskStatus枚舉值描述了此Block進入完成態的真實原因
- TaskStatus.RanToCompletion "成功完成" 在Block中定義的任務
- TaskStatus.Fault 因未處理的異常 導致"過早的完成"
- TaskStatus.Cancled 因取消操作 導致 "過早的完成"
故需要小心處理異常, 一般情況下我們使用try、catch包含所有的執行代碼以確保所有的異常都被處理。
本文作為TPL Dataflow的入門指南,微軟技術棧的同事可持續關注這個基於Actor模型的流水線處理組件,處理單體程序中高並發,低延遲場景相當巴適。
碼甲拙見,如有問題請下方留言大膽斧正;碼字+Visio制圖,均為原創,看官請不吝好評+關注, ~。。~
本文歡迎轉載,請轉載頁面明顯位置注明原作者及原文鏈接。
