asp.net core microservices 架構之Task 事務一致性 事件源 詳解


一 aspnetcore之task的任務狀態-CancellationToken                   

     我有一篇文章講解了asp.net的線程方面的知識。我們知道.net的針對於多線程的一個亮點就是Task,net clr維護了一個線程池,自動的分派給task執行,執行完成,迅速返回線程池,並且維護異常和狀態,針對於基礎的thread和其他兩種異步編程,Task非常的靈巧,但是針對和應用生命周期關聯的異步任務,還是使用Workbackgroup比較合適,或者甚至是基礎的thread,因為Task比較高級的線程類,操作也比較簡化,人為控制比較弱。那這一節為什么要說線程尼?大家有沒有遇到過,部署或者人為的去重啟,往往會造成一些不必要的業務中斷,web api有這樣的情況,后台程序也有這樣的情況。異常和系統硬件的故障已經讓我們防不勝防了,那么就盡量的人為的情況少那么一點點,系統的健壯性也就高那么一點點。

   目前有兩個技巧可以處理這一類事情,第一是讓主機graceful方式關閉,並且超時時間設置長一點,這樣就有足夠的時間,讓運行的請求執行完畢,看代碼:

    

public static async Task Main(string[] args)
{
    var host = new HostBuilder()
        .Build();

    await host.RunAsync();
}

這是官方上的一段話:IHostedService 是執行代碼的入口點。 每個 IHostedService 實現都按照 ConfigureServices 中服務注冊的順序執行。 主機啟動時,每個 IHostedService 上都會調用 StartAsync,主機正常關閉時,以反向注冊順序調用 StopAsync

//關閉超時值

ShutdownTimeout 設置 StopAsync 的超時值。 默認值為 5 秒。
Program.Main 中的以下選項配置將默認值為 5 秒的關閉超時值增加至 20 秒:
C#

//復制
var host = new HostBuilder()
    .ConfigureServices((hostContext, services) =>
    {
        services.Configure<HostOptions>(option =>
        {
            option.ShutdownTimeout = System.TimeSpan.FromSeconds(20);
        });
    })
    .Build();

而我們看看源碼中StopAsync方法:

/// <summary>
        /// Attempts to gracefully stop the host with the given timeout.
        /// </summary>
        /// <param name="host"></param>
        /// <param name="timeout">The timeout for stopping gracefully. Once expired the
        /// server may terminate any remaining active connections.</param>
        /// <returns></returns>
        public static Task StopAsync(this IHost host, TimeSpan timeout)
        {
            return host.StopAsync(new CancellationTokenSource(timeout).Token);
        }

系統接受到Ctrl+c和sign,就會調用這個方法,以比較禮貌的方式關閉。

那么看源碼,這兩個都是具有阻塞功能的異步方法,對應的非異步方法,都是同步調用的這兩個方法:

/// <summary>
        /// Runs an application and returns a Task that only completes when the token is triggered or shutdown is triggered.
        /// </summary>
        /// <param name="host">The <see cref="IHost"/> to run.</param>
        /// <param name="token">The token to trigger shutdown.</param>
        public static async Task RunAsync(this IHost host, CancellationToken token = default)
        {
            using (host)
            {
                await host.StartAsync(token);

                await host.WaitForShutdownAsync(token);
            }
        }

        /// <summary>
        /// Returns a Task that completes when shutdown is triggered via the given token.
        /// </summary>
        /// <param name="host">The running <see cref="IHost"/>.</param>
        /// <param name="token">The token to trigger shutdown.</param>
        public static async Task WaitForShutdownAsync(this IHost host, CancellationToken token = default)
        {
            var applicationLifetime = host.Services.GetService<IApplicationLifetime>();
        //當前token執行取消的時候,激發這個委托。
            token.Register(state =>
            {
                ((IApplicationLifetime)state).StopApplication(); //當進程取消的時候,通知注冊IApplicationLifetime的進程也取消。
            },
            applicationLifetime);

            var waitForStop = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
            //應用程序生命周期中的停止應用token激發的時候,執行這個委托,去釋放阻塞,執行host的停止方法。
            applicationLifetime.ApplicationStopping.Register(obj =>
            {
                var tcs = (TaskCompletionSource<object>)obj;
                tcs.TrySetResult(null);
            }, waitForStop);

            await waitForStop.Task;//阻塞,直到 tcs.TrySetResult(null);執行完畢。
// Host will use its default ShutdownTimeout if none is specified.

await host.StopAsync(); //調用關閉 }

具體原理就是Host使用這個applicationLifetime,去控制。而applicationLifetime主要的是用到了CancellationTokenSource這個類,使用這個類是可以控制task的取消執行的。

所以,兩個解決方案,如果是webapi,就將將超時時間設置大一點。

第二,如果在非webapi中,使用了超長執行的Task,就使用CancellationTokenSource吧,將它的Token傳進去,在外邊判斷是否執行中,如果不在執行中,就執行Cancel方法,當然在task內部,也可以

判斷token,是否自己主動取消掉。

這是官方的一個例子,了解CancellationTokenSource這個類,那么就會明白如何去處理Task

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class Example
{
   public static void Main()
   {
      // Define the cancellation token.
      CancellationTokenSource source = new CancellationTokenSource();
      CancellationToken token = source.Token;

      Random rnd = new Random();
      Object lockObj = new Object();
      
      List<Task<int[]>> tasks = new List<Task<int[]>>();
      TaskFactory factory = new TaskFactory(token);
      for (int taskCtr = 0; taskCtr <= 10; taskCtr++) {
         int iteration = taskCtr + 1;
         tasks.Add(factory.StartNew( () => {
                                       int value;
                                       int[] values = new int[10];
                                       for (int ctr = 1; ctr <= 10; ctr++) {
                                          lock (lockObj) {
                                             value = rnd.Next(0,101);
                                          }
                                          if (value == 0) { 
                                             source.Cancel();
                                             Console.WriteLine("Cancelling at task {0}", iteration);
                                             break;
                                          }   
                                          values[ctr-1] = value; 
                                       }
                                       return values;
                                    }, token));   
         
      }
      try {
         Task<double> fTask = factory.ContinueWhenAll(tasks.ToArray(), 
                                                      (results) => {
                                                         Console.WriteLine("Calculating overall mean...");
                                                         long sum = 0;
                                                         int n = 0; 
                                                         foreach (var t in results) {
                                                            foreach (var r in t.Result) {
                                                                  sum += r;
                                                                  n++;
                                                               }
                                                         }
                                                         return sum/(double) n;
                                                      } , token);
         Console.WriteLine("The mean is {0}.", fTask.Result);
      }   
      catch (AggregateException ae) {
         foreach (Exception e in ae.InnerExceptions) {
            if (e is TaskCanceledException)
               Console.WriteLine("Unable to compute mean: {0}", 
                                 ((TaskCanceledException) e).Message);
            else
               Console.WriteLine("Exception: " + e.GetType().Name);
         }
      }
      finally {
         source.Dispose();
      }
   }
}
// Repeated execution of the example produces output like the following:
//       Cancelling at task 5
//       Unable to compute mean: A task was canceled.
//       
//       Cancelling at task 10
//       Unable to compute mean: A task was canceled.
//       
//       Calculating overall mean...
//       The mean is 5.29545454545455.
//       
//       Cancelling at task 4
//       Unable to compute mean: A task was canceled.
//       
//       Cancelling at task 5
//       Unable to compute mean: A task was canceled.
//       
//       Cancelling at task 6
//       Unable to compute mean: A task was canceled.
//       
//       Calculating overall mean...
//       The mean is 4.97363636363636.
//       
//       Cancelling at task 4
//       Unable to compute mean: A task was canceled.
//       
//       Cancelling at task 5
//       Unable to compute mean: A task was canceled.
//       
//       Cancelling at task 4
//       Unable to compute mean: A task was canceled.
//       
//       Calculating overall mean...
//       The mean is 4.86545454545455.

 

二   業務的事務一致性                                                                

       因為微服務的理念中是犧牲了系統業務的一致性,我們知道事務的一致性都是靠的數據庫的本地事務,或者分布式事務來實現的,但是微服務是嚴禁使用分布式事務。那么如何保證整個系統的事務完整性尼?舉個例子:比如訂單服務中,新接受一個訂單,這個訂單需要同步到庫房的訂單子系統,那么在訂單服務中的這個訂單在最后更新自己訂單狀態的時候,是需要同時發送異步消息給庫房消息服務器的,如果這時候網絡斷了,本地訂單更新成功了,但是異步消息沒有發送過去,這樣就會引起業務的缺失,目前有兩個方法可以實現:

      第一:為本地數據庫創建事件源表,記錄下消息和本地數據更新的全部狀態,比如訂單在更新前就可以添加事件,事件狀態可以有,准備更新訂單,訂單已更新,發送消息隊列,消息發送成功等。

這樣的好處就是最后跟蹤這個事務處理的時候,每個步驟都可以找到,而且完全不用事務。最后job去跟蹤失敗情況,然后根據情況處理。

      第二:只是用本地事務,就是在訂單更新的時候,同時給事件源表添加消息內容,然后讓后台job去發送消息,這樣是最簡單和最穩定的方式。

      當然,最合適的還是第一種方法,雖然代碼能復雜點,但是最后的效果是一樣的,而且效率是比第二種方法更高效,但是考慮打事件源表並不是並發頻繁操作的表,所以這個看自己的喜好了。

針對一個系統,業務的一致性,也並不是全部,針對於一些關鍵業務做好一致性,但是很多其實可以設計成為在用戶ui層面去補償操作,唯一的壞處就是一部分數據需要重新填寫。

三     事件源                                                                                                           

     這個事件源並不是為了解決業務的一致性,而是為了應對大數據量的請求,比如,客戶管理,一個分類下有上萬條記錄需要處理,那么往往我們需要對性能和實時反饋上有個折衷。

     系統設計如下:

                                   

  這樣看來,會增加1個api服務和一個后台服務,但是對於系統的問題,卻得到了一個緩沖,或許這個設計不是最好的,但是卻可以做一個拋磚引玉的案例,現實中案例非常多變,所以設計也會有很多方案。

  因為目前我們看到的大部分app,請求的時候,某些功能確實會有少許等待事件,這個都是為了折衷,當然這一篇內容並不是討論雲或者分布式計算,但是在后台這塊處理越快,反饋也越快。

  這套方案的設計理念其實就是異步處理,可以有自己的優化空間,而並不會消耗api這個輕量級服務,后台分布式計算越快,app反應也越快,到一定程度,就並不會感覺到有延遲,這就是大師比喻的耳朵與眼睛的關系。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM