ASP.NET Core 6框架揭秘實例演示[03]:Dapr初體驗


也許你們可能沒有接觸過Dapr,但是一定對它“有所耳聞”,感覺今年它一下子就火了,一時間很多人都在談論它。我們從其命名(Dapr的全稱是“分布式應用運行時Distributed Application Runtime”)可以看出Dapr的定位,它並不是分布式應用的開發框架,它提供的是更底層的“運行時”。我們可以使用不同的編程語言,采用不同的開發框架在這個由Dapr提供的運行時上面構建分布式應用。我們接下來就來感受一下Dapr在.NET上面的開發體驗,關於Dapr的基本信息以及環境的安裝,請參閱官方文檔。(本篇提供的實例已經匯總到《ASP.NET Core 6框架揭秘-實例演示版》)

[117]Dapr-服務調用 (源代碼
[118]Dapr-狀態管理(源代碼
[119]Dapr-發布訂閱(源代碼
[120]Dapr-Actor模型(源代碼

[117]Dapr-服務調用

Dapr是一個采用Service Mesh設計的分布式微服務運行時。每一個部署在Dapr上的應用實例(獨立進程或者容器)都具有這一個專屬的Sidecar,具體體現為一個獨立的進程(daprd)或者容器。應用實例只會與它專屬的Sidecar進行通信,跨應用通信是在兩個應用實例的Sidecar之間進行的,具體的傳輸協議可以采用HTTP或者gRPC。正是因為應用實例和Sidecar是在各自的進程內獨立運行的,所以Dapr才對應用開發采用的技術棧沒有任何限制。我們接下來就通過一個簡單的例子來演示Dapr下的服務調用。我們創建了如圖1所示的解決方案。App1和App2代表兩個具有依賴關系的應用,App1會調用App2提供的服務。Shared是一個類庫項目,提供被App1和App2共享的數據類型。

image
圖1 Dapr應用解決方案

我們依然沿用上面演示的數學運算應用場景,並在Shared項目中定義了如下兩個數據類型。表示輸入的Input類型提供了兩個操作數(X和Y),表示輸出的Output類型除了通過其Result屬性表示運算結果外,還利用Timestamp屬性返回運算時間戳。

  1 public class Input
  2 {
  3     public int X { get; set; }
  4     public int Y { get; set; }
  5 }
  6 
  7 public class Output
  8 {
  9     public int 		Result { get; set; }
 10     public DateTimeOffset 	Timestamp { get; set; } = DateTimeOffset.Now;
 11 }
 12 

App2就是一個簡單的ASP.NET CORE應用,我們采用路由的方式注冊了執行數學運算的終結點。如下面的代碼片段所示,注冊的終結點采用的路徑模板為“/{method}”,路由參數“{method}”既表示運算操作類型,同時也作為Dapr服務的方法名。在作為終結點處理器的Calculate方法中,請求的主體內容被提取出來,經過反序列化后綁定為input參數。在根據提供的輸入執行對應的運算並生成Output對象后,將其序列化成JSON,並以此作為響應的內容。

  1 using Microsoft.AspNetCore.Mvc;
  2 using Shared;
  3 
  4 var app = WebApplication.Create(args);
  5 app.MapPost("{method}", Calculate);
  6 app.Run("http://localhost:9999");
  7 
  8 static IResult Calculate(string method, [FromBody] Input input)
  9 {
 10     var result = method.ToLower() switch
 11     {
 12         "add" => input.X + input.Y,
 13         "sub" => input.X - input.Y,
 14         "mul" => input.X * input.Y,
 15         "div" => input.X / input.Y,
 16         _ => throw new InvalidOperationException($"Invalid method {method}")
 17     };
 18     return Results.Json(new Output { Result = result });
 19 }

在調用WebApplication對象的Run方法啟動應用的時候,我們顯式指定了監聽地址,其目的是為了將端口(9999)固定下來。App2目前實際上與Dapr一點關系都沒有,我們必須以Dapr的方式啟動它才能將它部署到本機的Dapr環境中,具體來說我們可以執行“dapr run --app-id app2 --app-port 9999 -- dotnet run”這個命令來啟動Sidecar的同時以子進程的方式啟動應用。提供的命令行參數除了提供應用的啟動方式(dotnet run)之外,還提供了針對應用的表示(--app-id app2)和監聽的端口(--app-port 9999)。考慮到每次在控制台輸入這些繁瑣的命令有點麻煩,我們選擇在launchSettings.json文件中定義如下這個Profile來以Dapr的方式啟動應用。由於這種啟動方式會將輸出目錄作為當前工作目錄,我們選擇指定程序集的方式來啟動應用(dotnet App2.dll)。

  1 {
  2   "profiles": {
  3     "Dapr": {
  4       "commandName": "Executable",
  5       "executablePath": "dapr",
  6       "commandLineArgs": "run --app-id app2 --app-port 9999 -- dotnet App2.dll"
  7     }
  8   }
  9 }
 10 

App1是一個簡單的控制台應用,為了能夠采用上述這種方式來啟動它,我們還是將SDK從“Microsoft.NET.Sdk”改成“Microsoft.NET.Sdk.Web”。我們在launchSettings.json文件中定義了如下這個類似的Profile,應用的標識被設置成“app1”。由於App1僅僅涉及到對其他應用的調用,自身並不提供服務,所以我們不需要設置端口號。

  1 {
  2   "profiles": {
  3     "Dapr": {
  4       "commandName": "Executable",
  5       "executablePath": "dapr",
  6       "commandLineArgs": "run --app-id app1 -- dotnet App1.dll"
  7     }
  8   }
  9 }
 10 

由於App1涉及到針對Dapr服務的調用,需要使用到Dapr客戶端SDK提供的API,所以我們為它添加了“Dapr.Client”這個NuGet包的引用。具體的服務調用體現在下面的程序中,如代碼片段所示,我們調用DaprClient的靜態方法CreateInvokeHttpClient針對目標服務或者應用的標識“app2”創建了一個HttpClient對象,並利用該它完成了四個服務方法的調用。具體的服務調用實現在InvokeAsync這個本地方法中,在將作為輸入的Input對象序列化成JSON文本之后,該方法會將其作為請求的主體內容。在一個分布式環境下,我們不需要知道目標服務所在的位置,因為這是不確定的,確定的唯有目標服務/應用的標識,所以我們直接將此標識作為請求的目標地址。在得到調用結果之后,我們對它進行了簡單的格式化后直接輸出到控制台上。

  1 using Dapr.Client;
  2 using Shared;
  3 
  4 HttpClient client = DaprClient.CreateInvokeHttpClient(appId: "app2");
  5 var input = new Input(2, 1);
  6 
  7 await InvokeAsync("add", "+");
  8 await InvokeAsync("sub", "-");
  9 await InvokeAsync("mul", "*");
 10 await InvokeAsync("div", "/");
 11 
 12 async Task InvokeAsync(string method, string @operator)
 13 {
 14     var response = await client.PostAsync(method, JsonContent.Create(input));
 15     var output = await response.Content.ReadFromJsonAsync<Output>();
 16     Console.WriteLine( $"{input.X} {@operator} {input.Y} = {output.Result} ({output.Timestamp})");
 17 }
 18 

我們先后啟動App2和App1后,兩個應用所在的控制台上會產生如圖2所示的輸出。應用輸出的文本會采用“== App ==”作為前綴,其余內容為Sidecar輸出的日志。從App2所在控制台(前面)上輸出可以看出,它成功地完成了基於四種運算的服務調用。當筆者以Debug模式啟動App1的時候有時會“閃退”的現象,如果你也出現這樣的情況,可以選擇非Debug模式(在解決方案窗口中右鍵選擇該項目,選擇Debug => Start Without Debuging)啟動它。

image

圖2 基於Dapr的服務調用

[118]Dapr-狀態管理

我們可以借助Dapr提供的狀態管理組件創建“有狀態”的服務。這里的狀態並不是存儲在應用實例的進程中供其獨享,而是存儲在獨立的存儲中(比如Redis)共所有應用實例共享,所以並不是影響水平伸縮的能力。對於上面演示的實例,假設計算服務提供的是四個耗時的操作,那么我們就可以將計算結果緩存起來避免重復計算,我們現在就來實現這樣的功能。為了能夠使用到針對狀態管理的API,我們為App2添加針對“Dapr.AspNetCore”這個NuGet包的引用。我們將緩存相關的三個操作定義在如下這個IResultCache接口中。如代碼片段所示,該接口定義了三個方法,GetAsync方法根據指定的操作/方法名稱和輸入提取緩存的計算結果,SaveAsync方法負責將計算結果根據對應的方法名成和輸入緩存起來,ClearAsync方法則將指定方法的所有緩存結果全部清除掉。

  1 public interface IResultCache
  2 {
  3     Task<Output> GetAsync(string method, Input input);
  4     Task SaveAsync(string method, Input input, Output output);
  5     Task ClearAsync(params string[] methods);
  6 }
  7 

如下所示的IResultCache接口的實現類型ResultCache的定義。我們在構造函數中注入了DaprClient對象,並利用它來完成狀態管理的相關操作。計算結果緩存項的Key由方法名稱和輸入參數以 “Result_{method}_{X}_{Y}”這樣的格式生成,具體的格式化體現在_keyGenerator字段返回的委托上。由於涉及到對緩存計算結果的清除,我們不得不將所有計算結果緩存項的Key也一並緩存起來,該緩存項采用的Key為“ResultKeys”。

  1 public class ResultCache : IResultCache
  2 {
  3     private readonly DaprClient 			_daprClient;
  4     private readonly string 			_keyOfKeys = "ResultKeys";
  5     private readonly string 			_storeName = "statestore";
  6     private readonly Func<string, Input, string> 	_keyGenerator;
  7 
  8     public ResultCache(DaprClient daprClient)
  9     {
 10         _daprClient = daprClient;
 11         _keyGenerator = (method, input) => $"Result_{method}_{input.X}_{input.Y}";
 12 }
 13 
 14     public Task<Output> GetAsync(string method, Input input)
 15     {
 16         var key = _keyGenerator(method, input);
 17         return _daprClient.GetStateAsync<Output>(storeName: _storeName, key: key);
 18     }
 19 
 20     public async Task SaveAsync(string method, Input input, Output output)
 21     {
 22         var key = _keyGenerator(method, input);
 23         var keys = await _daprClient.GetStateAsync<HashSet<string>>(storeName: _storeName, key: _keyOfKeys) ?? new HashSet<string>();
 24         keys.Add(key);
 25 
 26         var operations = new StateTransactionRequest[2];
 27         var value = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(output));
 28         operations[0] = new StateTransactionRequest(key: key, value: value,  operationType: StateOperationType.Upsert);
 29 
 30         value = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(keys));
 31         operations[1] = new StateTransactionRequest(key: _keyOfKeys, value: value, operationType: StateOperationType.Upsert);
 32         await _daprClient.ExecuteStateTransactionAsync(storeName: _storeName,  operations: operations);
 33     }
 34 
 35     public async Task ClearAsync(params string[] methods)
 36     {
 37         var keys = await _daprClient.GetStateAsync<HashSet<string>>(storeName: _storeName, key: _keyOfKeys);
 38         if (keys != null)
 39         {
 40             var selectedKeys = keys .Where(it => methods.Any(m => it.StartsWith($"Result_{m}"))).ToArray();
 41             if (selectedKeys.Length > 0)
 42             {
 43                 var operations = selectedKeys .Select(it => new StateTransactionRequest(key: it, value: null, operationType: StateOperationType.Delete)) .ToList();
 44                 operations.ForEach(it => keys.Remove(it.Key));
 45                 var value = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(keys));
 46                 operations.Add(new StateTransactionRequest(key: _keyOfKeys, value: value, operationType: StateOperationType.Upsert));
 47                 await _daprClient.ExecuteStateTransactionAsync(storeName: _storeName, operations: operations);
 48             }
 49         }
 50     }
 51 }

在實現的GetAsync方法中,我們根據指定的方法名稱和輸入生成對應緩存項的Key,並調用DaprClient對象的GetStateAsync<TValue>提取對應緩存項的值。我們將Key作為該方法的第二個參數,第一個參數代表狀態存儲(State Store)組件的名稱。Dapr在初始化過程中會默認為我們設置一個針對Redis的狀態存儲組件,並將其命名為“statestore”,ResultCache使用正式這個狀態存儲組件。

對單一狀態值進行設置只需要調用DaprClient對象的SaveStateAsync<TValue>方法就可以了,但是我們實現的SaveAsync方法除了需要緩存計算結果外,還需要修正“ResultKeys”這個緩存的值。為了確保兩者的一致性,兩個緩存項的更新最好在同一個事務中進行,為此我們調用了DaprClient對象的ExecuteStateTransactionAsync方法。我們為此創建了兩個StateTransactionRequest對象來描述對這兩個緩存項的更新,具體來說需要設置緩存項的Key、Value和操作類型(Upsert)。這里設置的值必須是最原始的二進制數組,由於狀態管理組件默認采用JSON的序列化方式和UTF-8編碼,所以我們按照這樣的規則生成了作為緩存值的二進制數組。另一個實現的ClearAsync方法采用類似的方式刪除指定方法的計算結果緩存,並修正了“ResultKeys”緩存項的值。

接下來我們需要對計算服務的處理方法Calculate作必要的修改。如下面的代碼片段所示,我們直接在該方法中注入了IResultCache對象。如果能夠利用該對象提取出了緩存的計算結果,我們會直接將它返回給客戶端。只有在對應計算結果尚未緩存的情況下,我們才會真正實施計算。在返回計算結果之前,我們會對計算結果實施緩存。該方法中注入IResultCache和DaprClient對象對應的服務在WebApplication被構建之前進行了注冊。

  1 using App2;
  2 using Microsoft.AspNetCore.Mvc;
  3 using Shared;
  4 var builder = WebApplication.CreateBuilder(args);
  5 builder.Services
  6     .AddSingleton<IResultCache, ResultCache>()
  7     .AddDaprClient();
  8 var app = builder.Build();
  9 app.MapPost("/{method}", Calculate);
 10 app.Run("http://localhost:9999");
 11 
 12 static async Task<IResult> Calculate(string method, [FromBody] Input input, IResultCache resultCache)
 13 {
 14     var output = await resultCache.GetAsync(method, input);
 15     if (output == null)
 16     {
 17         var result = method.ToLower() switch
 18         {
 19             "add" => input.X + input.Y,
 20             "sub" => input.X - input.Y,
 21             "mul" => input.X * input.Y,
 22             "div" => input.X / input.Y,
 23             _ => throw new InvalidOperationException($"Invalid operation {method}")
 24         };
 25         output = new Output { Result = result };
 26         await resultCache.SaveAsync(method, input, output);
 27     }
 28     return Results.Json(output);
 29 }
 30 

由於兩輪服務調用使用相同的輸入。如果服務端對計算結果進行了緩存,那么針對同一個方法的調用就應該具有相同的時間戳,如圖3所示的輸出結果證實了這一點。

image
圖3 利用狀態管理緩存計算結果

[119]Dapr-發布訂閱

Dapr提供了“開箱即用”的發布訂閱(“Pub/Sub”)模塊,我們可以將其作為消息隊列來用。上面演示的實例利用狀態管理組件緩存了計算結果,現在我們采用發布訂閱的方法將指定方法的計算結果緩存清除掉。具體來說,我們在App2中訂閱“刪除緩存”的主題(Topic),當接收到發布的對應主題的消息時,我們從消息中提待刪除的方法列表,並將對應的計算結果緩存清除掉。至於“刪除緩存”的主題的發布,我們將它交給App1來完成。我們為此對App2再次做出修改。如下面的代碼片段所示,我們針對路徑“clear”注冊了一個作為“刪除緩存”主題的訂閱終結點,它對應的處理方法為ClearAsync。我們通過標注在該方法上的TopicAttribute來對訂閱的主題作相應設置。該特性構造函數的第一個參數為采用的發布訂閱組件名稱,我們采用的是初始化Dapr是設置的基於Redis的發布訂閱組件,該組件命名為“pubsub”。第二個參數表示訂閱主題的名稱,我們將其設置為“clearresult”。

  1 using App2;
  2 using Dapr;
  3 using Microsoft.AspNetCore.Mvc;
  4 using Shared;
  5 var builder = WebApplication.CreateBuilder(args);
  6 builder.Services
  7     .AddSingleton<IResultCache, ResultCache>()
  8     .AddDaprClient();
  9 var app = builder.Build();
 10 
 11 app.UseCloudEvents();
 12 app.MapPost("clear", ClearAsync);
 13 app.MapSubscribeHandler();
 14 
 15 app.MapPost("/{method}", Calculate);
 16 app.Run("http://localhost:9999");
 17 
 18 [Topic(pubsubName:"pubsub", name:"clearresult")]
 19 static Task ClearAsync(IResultCache cache, [FromBody] string[] methods) => cache.ClearAsync(methods);
 21 
 22 static async Task<IResult> Calculate(string method, [FromBody]Input input,    IResultCache resultCache) ...

ClearAsync方法定義了兩個參數,第一個參數會默認綁定為注冊的IResultCache服務,第二個參數表示待刪除的方法列表,上面標注的FromBodyAttribute特性將指導路由系統通過提取請求主體內容來綁定對應參數值。但是Dapr的發布訂閱組件默認采用Cloud Events消息格式,如果請求的主體為具有如此結構的消息,按照默認的綁定規則,針對input參數的綁定將會失敗。為此我們調用WebApplication對象的UseCloudEvents擴展方法額外注冊了一個CloudEventsMiddleware中間件,該中間件會提取出請求數據部分的內容,並使用它將整個請求主體部分的內容替換掉,那么針對methods參數的綁定就能成功了。我們還調用WebApplication對象的MapSubscribeHandler擴展方法注冊了一個額外的終結點。在應用啟動的時候,Sidecar會利用這個終結點收集當前應用提供的所有訂閱處理器的元數據信息,其中包括發布訂閱組件和主題名稱,以及調用的路由或路徑(對於本例就是“clear”)。當Sidecar接受到發布消息后,會根據這組元數據選擇匹配的訂閱處理器,並利用其提供的路徑完成對它的調用。

我們針對發布者的角色對App1做了相應的修改。如下面的代碼片段所示,我們利用創建的DaprClientBuilder構建了一個DaprClient對象。在兩輪針對計算服務的調用之間,我們調用了DaprClient的PublishEventAsync方法發布了一個名為“clearresult”的消息。從提供的第三個參數可以看出,我們僅僅清除“加法”和“減法”這兩個方法的計算結果緩存。圖4所示的是App1運行之后在控制台上的輸出。對於兩輪間隔為5秒的服務調用,加法和減法的結果由於緩存被清除,所以它們具有不同的時間戳,但乘法和除法的計算時間依舊是相同的。

image

圖4 利用發布訂閱組件刪除結果緩存

[120]Dapr-Actor模型

如果分布式系統待解決的功能可以分解成若干很小且獨立狀態邏輯單元,我們可以考慮使用Actor模型(Model)進行設計。具體來說,我們將上述這些狀態邏輯單元定義成單個的Actor,並在它們之間采用消息驅動的通信方法完成整個工作流程。每個Actor只需要考慮對接收的消息進行處理,並將后續的操作轉換成消息分發給另一個Actor就可以了。由於每個Actor以單線程模式執行,我們無需考慮多線程並發和同步的問題。由於Actor之間的交互是完全無阻塞的,一般能夠提高系統整體的吞吐量。

接下來我們依然通過對上面演示實例的修改來演示Dapr的Actor模型在.NET下的應用。這次我們將一個具有狀態的累加計數器設計成Actor。我們在Shared項目中為這個Actor定義了一個接口,如下面的代碼片段所示,這個名為IAccumulator的接口派生於IActor,由於后者來源於“Dapr.Actors”這個NuGet包,所以我們需要添加對應的包引用。IAccumulator接口定義了兩個方法,IncreaseAsync方法根據指定的數值進行累加並返回當前的值, ResetAsync方法將累加數值重置歸零。

public interface IAccumulator: IActor
{
    Task<int> IncreaseAsync(int count);
    Task ResetAsync();
}

我們將IAccumulator接口的實現類型Accumulator定義在App2中。如下面的代碼片段所示,除了實現對應的接口,Accumulator類型還繼承了Actor這個基類。由於每個Actor提供當前累加的值,所以它們是有狀態的。但是不能利用Accumulator實例的屬性來維持這個狀態,我們使用從基類繼承下來的StateManager屬性返回的IActorStateManager對象來管理當前Actor的狀態。具體來說,我們調用TryGetStateAsync方法提取當前Actor針對指定名稱(“__counter”)的狀態值,新的狀態值通過調用它的SetStateAsync方法進行設置。由於IActorStateManager對象的SetStateAsync方法對狀態所作的更新都是本地操作,我們最終還需要調用Actor對象自身的SaveStateAsync方法提交所有的狀態更新。Actor的狀態依舊是通過Dapr的狀態管理組件進行存儲的。

public class Accumulator : Actor, IAccumulator
{
    private readonly string _stateName = "__counter";
    public Accumulator(ActorHost host) : base(host)
    {
    }
    public async Task<int> IncreaseAsync(int count)
    {
        var counter = 0;
        var existing = await StateManager.TryGetStateAsync<int>(stateName: _stateName);
        if(existing.HasValue)
        {
            counter = existing.Value;
        }
        counter+= count;
        await StateManager.SetStateAsync(stateName: _stateName, value:counter);
        await SaveStateAsync();
        return counter;
    }
    public async Task ResetAsync()
    {
        await StateManager.TryRemoveStateAsync(stateName: _stateName);
        await SaveStateAsync();
    }
}

承載Actor相關的API由“Dapr.Actors.AspNetCore”這個NuGet包提供,所以我們需要添加該包的引用。Actor的承載方式與MVC框架類似,它們都是建立在路由系統上,MVC框架將所有Controller類型轉換成注冊的終結點,而Actor的終結點由WebApplication的MapActorsHandlers擴展方法進行注冊。在注冊中間件之前,我們還需要調用IServiceCollection接口的AddActors擴展方法將注冊的Actor類型添加到ActorRuntimeOptions配置選項上。

using App2;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddActors(options => options.Actors.RegisterActor<Accumulator>());
var app = builder.Build();
app.MapActorsHandlers();
app.Run("http://localhost:9999");

我們在App1中編寫了如下的程序來演示針對Actor的調用。如代碼片段所示,我們調用ActorProxy的靜態方法Create<TActor>創建了兩個IAccumulator對象。創建Actor對象(其實是調用Actor的代理)時需要指定唯一標識Actor的ID(“001”和“002”)和對應的類型(“Accumulator”)。

using Dapr.Actors;
using Dapr.Actors.Client;
using Shared;

var accumulator1 = ActorProxy.Create<IAccumulator>(new ActorId("001"), "Accumulator");
var accumulator2 = ActorProxy.Create<IAccumulator>(new ActorId("002"), "Accumulator");

while (true)
{
    var counter1 = await accumulator1.IncreaseAsync(1);
    var counter2 = await accumulator2.IncreaseAsync(2);
    await Task.Delay(5000);
    Console.WriteLine($"001: {counter1}");
    Console.WriteLine($"002: {counter2}\n");

    if (counter1 > 10)
    {
        await accumulator1.ResetAsync();
    }
    if (counter2 > 20)
    {
        await accumulator2.ResetAsync();
    }
}

Actor對象創建出來后,我們在一個循環中采用不同的步長(1和2)調用它們的IncreaseAsync實施累加操作。在計數器數值達到上限(10和20)時,我們調用它們的ResetAsync方法重置計數器。在先后啟動App2和App1之后,App1所在控制台上將會以如圖5所示的形式輸出兩個累加計數器提供的計數。

image

圖5 Actor模式實現的累加計數器


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM