一 aspnetcore之task的任務狀態-CancellationToken
我有一篇文章講解了asp.net的線程方面的知識。我們知道.net的針對於多線程的一個亮點就是Task,net clr維護了一個線程池,自動的分派給task執行,執行完成,迅速返回線程池,並且維護異常和狀態,針對於基礎的thread和其他兩種異步編程,Task非常的靈巧,但是針對和應用生命周期關聯的異步任務,還是使用Workbackgroup比較合適,或者甚至是基礎的thread,因為Task比較高級的線程類,操作也比較簡化,人為控制比較弱。那這一節為什么要說線程尼?大家有沒有遇到過,部署或者人為的去重啟,往往會造成一些不必要的業務中斷,web api有這樣的情況,后台程序也有這樣的情況。異常和系統硬件的故障已經讓我們防不勝防了,那么就盡量的人為的情況少那么一點點,系統的健壯性也就高那么一點點。
目前有兩個技巧可以處理這一類事情,第一是讓主機graceful方式關閉,並且超時時間設置長一點,這樣就有足夠的時間,讓運行的請求執行完畢,看代碼:
public static async Task Main(string[] args) { var host = new HostBuilder() .Build(); await host.RunAsync(); }
這是官方上的一段話:IHostedService 是執行代碼的入口點。 每個 IHostedService
實現都按照 ConfigureServices 中服務注冊的順序執行。 主機啟動時,每個 IHostedService
上都會調用 StartAsync,主機正常關閉時,以反向注冊順序調用 StopAsync。
//關閉超時值 ShutdownTimeout 設置 StopAsync 的超時值。 默認值為 5 秒。 Program.Main 中的以下選項配置將默認值為 5 秒的關閉超時值增加至 20 秒: C# //復制 var host = new HostBuilder() .ConfigureServices((hostContext, services) => { services.Configure<HostOptions>(option => { option.ShutdownTimeout = System.TimeSpan.FromSeconds(20); }); }) .Build();
而我們看看源碼中StopAsync方法:
/// <summary> /// Attempts to gracefully stop the host with the given timeout. /// </summary> /// <param name="host"></param> /// <param name="timeout">The timeout for stopping gracefully. Once expired the /// server may terminate any remaining active connections.</param> /// <returns></returns> public static Task StopAsync(this IHost host, TimeSpan timeout) { return host.StopAsync(new CancellationTokenSource(timeout).Token); }
系統接受到Ctrl+c和sign,就會調用這個方法,以比較禮貌的方式關閉。
那么看源碼,這兩個都是具有阻塞功能的異步方法,對應的非異步方法,都是同步調用的這兩個方法:
/// <summary> /// Runs an application and returns a Task that only completes when the token is triggered or shutdown is triggered. /// </summary> /// <param name="host">The <see cref="IHost"/> to run.</param> /// <param name="token">The token to trigger shutdown.</param> public static async Task RunAsync(this IHost host, CancellationToken token = default) { using (host) { await host.StartAsync(token); await host.WaitForShutdownAsync(token); } } /// <summary> /// Returns a Task that completes when shutdown is triggered via the given token. /// </summary> /// <param name="host">The running <see cref="IHost"/>.</param> /// <param name="token">The token to trigger shutdown.</param> public static async Task WaitForShutdownAsync(this IHost host, CancellationToken token = default) { var applicationLifetime = host.Services.GetService<IApplicationLifetime>(); //當前token執行取消的時候,激發這個委托。 token.Register(state => { ((IApplicationLifetime)state).StopApplication(); //當進程取消的時候,通知注冊IApplicationLifetime的進程也取消。 }, applicationLifetime); var waitForStop = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously); //應用程序生命周期中的停止應用token激發的時候,執行這個委托,去釋放阻塞,執行host的停止方法。 applicationLifetime.ApplicationStopping.Register(obj => { var tcs = (TaskCompletionSource<object>)obj; tcs.TrySetResult(null); }, waitForStop); await waitForStop.Task;//阻塞,直到 tcs.TrySetResult(null);執行完畢。
// Host will use its default ShutdownTimeout if none is specified.
await host.StopAsync(); //調用關閉 }
具體原理就是Host使用這個applicationLifetime,去控制。而applicationLifetime主要的是用到了CancellationTokenSource這個類,使用這個類是可以控制task的取消執行的。
所以,兩個解決方案,如果是webapi,就將將超時時間設置大一點。
第二,如果在非webapi中,使用了超長執行的Task,就使用CancellationTokenSource吧,將它的Token傳進去,在外邊判斷是否執行中,如果不在執行中,就執行Cancel方法,當然在task內部,也可以
判斷token,是否自己主動取消掉。
這是官方的一個例子,了解CancellationTokenSource這個類,那么就會明白如何去處理Task
using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class Example { public static void Main() { // Define the cancellation token. CancellationTokenSource source = new CancellationTokenSource(); CancellationToken token = source.Token; Random rnd = new Random(); Object lockObj = new Object(); List<Task<int[]>> tasks = new List<Task<int[]>>(); TaskFactory factory = new TaskFactory(token); for (int taskCtr = 0; taskCtr <= 10; taskCtr++) { int iteration = taskCtr + 1; tasks.Add(factory.StartNew( () => { int value; int[] values = new int[10]; for (int ctr = 1; ctr <= 10; ctr++) { lock (lockObj) { value = rnd.Next(0,101); } if (value == 0) { source.Cancel(); Console.WriteLine("Cancelling at task {0}", iteration); break; } values[ctr-1] = value; } return values; }, token)); } try { Task<double> fTask = factory.ContinueWhenAll(tasks.ToArray(), (results) => { Console.WriteLine("Calculating overall mean..."); long sum = 0; int n = 0; foreach (var t in results) { foreach (var r in t.Result) { sum += r; n++; } } return sum/(double) n; } , token); Console.WriteLine("The mean is {0}.", fTask.Result); } catch (AggregateException ae) { foreach (Exception e in ae.InnerExceptions) { if (e is TaskCanceledException) Console.WriteLine("Unable to compute mean: {0}", ((TaskCanceledException) e).Message); else Console.WriteLine("Exception: " + e.GetType().Name); } } finally { source.Dispose(); } } } // Repeated execution of the example produces output like the following: // Cancelling at task 5 // Unable to compute mean: A task was canceled. // // Cancelling at task 10 // Unable to compute mean: A task was canceled. // // Calculating overall mean... // The mean is 5.29545454545455. // // Cancelling at task 4 // Unable to compute mean: A task was canceled. // // Cancelling at task 5 // Unable to compute mean: A task was canceled. // // Cancelling at task 6 // Unable to compute mean: A task was canceled. // // Calculating overall mean... // The mean is 4.97363636363636. // // Cancelling at task 4 // Unable to compute mean: A task was canceled. // // Cancelling at task 5 // Unable to compute mean: A task was canceled. // // Cancelling at task 4 // Unable to compute mean: A task was canceled. // // Calculating overall mean... // The mean is 4.86545454545455.
二 業務的事務一致性
因為微服務的理念中是犧牲了系統業務的一致性,我們知道事務的一致性都是靠的數據庫的本地事務,或者分布式事務來實現的,但是微服務是嚴禁使用分布式事務。那么如何保證整個系統的事務完整性尼?舉個例子:比如訂單服務中,新接受一個訂單,這個訂單需要同步到庫房的訂單子系統,那么在訂單服務中的這個訂單在最后更新自己訂單狀態的時候,是需要同時發送異步消息給庫房消息服務器的,如果這時候網絡斷了,本地訂單更新成功了,但是異步消息沒有發送過去,這樣就會引起業務的缺失,目前有兩個方法可以實現:
第一:為本地數據庫創建事件源表,記錄下消息和本地數據更新的全部狀態,比如訂單在更新前就可以添加事件,事件狀態可以有,准備更新訂單,訂單已更新,發送消息隊列,消息發送成功等。
這樣的好處就是最后跟蹤這個事務處理的時候,每個步驟都可以找到,而且完全不用事務。最后job去跟蹤失敗情況,然后根據情況處理。
第二:只是用本地事務,就是在訂單更新的時候,同時給事件源表添加消息內容,然后讓后台job去發送消息,這樣是最簡單和最穩定的方式。
當然,最合適的還是第一種方法,雖然代碼能復雜點,但是最后的效果是一樣的,而且效率是比第二種方法更高效,但是考慮打事件源表並不是並發頻繁操作的表,所以這個看自己的喜好了。
針對一個系統,業務的一致性,也並不是全部,針對於一些關鍵業務做好一致性,但是很多其實可以設計成為在用戶ui層面去補償操作,唯一的壞處就是一部分數據需要重新填寫。
三 事件源
這個事件源並不是為了解決業務的一致性,而是為了應對大數據量的請求,比如,客戶管理,一個分類下有上萬條記錄需要處理,那么往往我們需要對性能和實時反饋上有個折衷。
系統設計如下:
這樣看來,會增加1個api服務和一個后台服務,但是對於系統的問題,卻得到了一個緩沖,或許這個設計不是最好的,但是卻可以做一個拋磚引玉的案例,現實中案例非常多變,所以設計也會有很多方案。
因為目前我們看到的大部分app,請求的時候,某些功能確實會有少許等待事件,這個都是為了折衷,當然這一篇內容並不是討論雲或者分布式計算,但是在后台這塊處理越快,反饋也越快。
這套方案的設計理念其實就是異步處理,可以有自己的優化空間,而並不會消耗api這個輕量級服務,后台分布式計算越快,app反應也越快,到一定程度,就並不會感覺到有延遲,這就是大師比喻的耳朵與眼睛的關系。