手把手教你學Dapr - 7. Actors


目錄

手把手教你學Dapr - 1. .Net開發者的大時代

手把手教你學Dapr - 2. 必須知道的概念

手把手教你學Dapr - 3. 使用Dapr運行第一個.Net程序

手把手教你學Dapr - 4. 服務調用

手把手教你學Dapr - 5. 狀態管理

手把手教你學Dapr - 6. 發布訂閱

手把手教你學Dapr - 7. Actors

手把手教你學Dapr - 8. 綁定

手把手教你學Dapr - 9. 可觀測性

介紹

Actor模式將Actor描述為最低級別的“計算單元”。換句話說,您在一個獨立的單元(稱為actor)中編寫代碼,該單元接收消息並一次處理一個消息,沒有任何並發或線程。

再換句話說,根據ActorId划分獨立計算單元后,相同的ActorId重入要排隊,可以理解為lock(ActorId)

:這里有個反例,就是重入性的引入,這個概念目前還是Preview,它允許同一個鏈內可以重復進入,判斷的標准不止是ActorId這么簡單,即自己調自己是被允許的。這個默認是關閉的,需要手動開啟,即默認不允許自己調自己

當您的代碼處理一條消息時,它可以向其他參與者發送一條或多條消息,或者創建新的參與者。底層運行時管理每個參與者運行的方式、時間和地點,並在參與者之間路由消息。

大量的Actor可以同時執行,Actor彼此獨立執行。

Dapr 包含一個運行時,它專門實現了 Virtual Actor 模式。 通過 Dapr 的實現,您可以根據 Actor 模型編寫 Dapr Actor,而 Dapr 利用底層平台提供的可擴展性和可靠性保證。

什么時候用Actors

Actor 設計模式非常適合許多分布式系統問題和場景,但您首先應該考慮的是該模式的約束。一般來說,如果出現以下情況,請考慮使用Actors模式來為您的問題或場景建模:

  • 您的問題空間涉及大量(數千個或更多)小的、獨立且孤立的狀態和邏輯單元
  • 您希望使用需要與外部組件進行大量交互的單線程對象,包括跨一組Actors查詢狀態。
  • 您的 Actor 實例會通過發出 I/O 操作來阻塞具有不可預測延遲的調用者。

Dapr Actor

每個Actor都被定義為Actor類型的實例,就像對象是類的實例一樣。 例如,可能有一個執行計算器功能的Actor類型,並且可能有許多該類型的Actor分布在集群的各個節點上。每個這樣的Actor都由一個Acotr ID唯一標識。

actor_background_game_example.png

生命周期

Dapr Actors是虛擬的,這意味着他們的生命周期與他們的內存表現無關。因此,它們不需要顯式創建或銷毀。Dapr Actors運行時在第一次收到對該Actor ID 的請求時會自動激活該Actor。如果一個Actor在一段時間內沒有被使用,Dapr Actors運行時就會對內存中的對象進行垃圾回收。如果稍后需要重新激活,它還將保持對參與者存在的了解。如果稍后需要重新激活,它還將保持對 Actor 的一切原有數據。

調用 Actor 方法和提醒會重置空閑時間,例如提醒觸發將使Actor保持活躍。無論Actor是活躍還是不活躍,Actor提醒都會觸發,如果為不活躍的Actor觸發,它將首先激活演員。Actor 計時器不會重置空閑時間,因此計時器觸發不會使 Actor 保持活動狀態。計時器僅在Actor處於活動狀態時觸發。

Reminders 和 Timers 最大的區別就是Reminders會保持Actor的活動狀態,而Timers不會

Dapr 運行時用來查看Actor是否可以被垃圾回收的空閑超時和掃描間隔是可配置的。當 Dapr 運行時調用 Actor 服務以獲取支持的 Actor 類型時,可以傳遞此信息。

由於Virtual Actor模型的存在,這種Virtual Actor生命周期抽象帶來了一些注意事項,事實上,Dapr Actors實現有時會偏離這個模型。

第一次將消息發送到Actor ID時,Actor被自動激活(導致構建Actor對象)。 經過一段時間后,Actor對象將被垃圾回收。被回收后再次使用Actor ID將導致構造一個新的Actor對象。 Actor 的狀態比對象的生命周期長,因為狀態存儲在 Dapr 運行時配置的狀態管理組件中。

:Actor被垃圾回收之前,Actor對象是會復用的。這里會導致一個問題,在.Net Actor類中,構造函數在Actor存活期間只會被調用一次。

分發和故障轉移

為了提供可擴展性和可靠性,Actor 實例分布在整個集群中,Dapr 根據需要自動將它們從故障節點遷移到健康節點。

Actors 分布在 Actor 服務的實例中,而這些實例分布在集群中的節點之間。 對於給定的Actor類型,每個服務實例都包含一組Actor。

Dapr安置服務(Placement Service)

Dapr Actor 運行時為您管理分發方案和密鑰范圍設置。這是由Actor Placement 服務完成的。創建服務的新實例時,相應的 Dapr 運行時會注冊它可以創建的Actor類型,並且安置服務會計算給定Actor類型的所有實例的分區。每個Actor類型的分區信息表被更新並存儲在環境中運行的每個Dapr實例中,並且可以隨着Actor服務的新實例的創建和銷毀而動態變化。這如下圖所示:

actors_background_placement_service_registration.png

當客戶端調用具有特定ID的Actor(例如,Actor ID 123)時,客戶端的 Dapr 實例會Hash Actor類型和 ID,並使用該信息調用可以為特定Actor ID的請求提供服務的相應Dapr實例。因此,始終為任何給定的Actor ID 調用相同的分區(或服務實例)。這如下圖所示:

actors_background_id_hashing_calling.png

這簡化了一些選擇,但也帶來了一些考慮:

  • 默認情況下,Actor 隨機放置到 pod 中,從而實現均勻分布。
  • 因為Actor是隨機放置的,應該可以預料到Actor操作總是需要網絡通信,包括方法調用數據的序列化和反序列化,產生延遲和開銷。

:Dapr Actor 放置服務僅用於 Actor 放置,因此如果您的服務不使用 Dapr Actors,則不需要。 放置服務可以在所有托管環境中運行,包括自托管和 Kubernetes。

Actor通訊

您可以通過HTTP/gRPC調用Actor,當然也可以用SDK。

POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/<method/state/timers/reminders>

並發

Dapr Actor 運行時為訪問 Actor 方法提供了一個簡單的回合制(turn-basesd)的訪問模型。這意味着在任何時候,Actor 對象的代碼中都不能有超過一個線程處於活動狀態。

單個Actor實例一次不能處理多個請求。如果預期要處理並發請求,Actor 實例可能會導致吞吐量瓶頸。

單個Actor實例指每個Actor ID對應的Actor對象。單個Actor不並發就沒有問題

如果兩個 Actor 之間存在循環請求,而同時向其中一個 Actor 發出外部請求,則 Actor 之間可能會陷入僵局。Dapr Actor運行時自動超時Actor調用並向調用者拋出異常以中斷可能的死鎖情況。

actors_background_communication.png

重入性(Preview)

作為對 dapr 中基礎 Actor 的增強。現在重入性為預覽功能,感興趣的小伙伴可以到看官方文檔。

回合制訪問(Turn-based access)

一個回合包括一個Actor方法的完整執行以響應來自其他Actor或客戶端的請求,或者一個計時器/提醒回調的完整執行。即使這些方法和回調是異步的,Dapr Actor運行時也不會將它們交叉。一個回合必須完全完成后,才允許進行新的回合。換句話說,當前正在執行的Actor方法或計時器/提醒回調必須完全完成,才能允許對方法或回調的新調用。

Dapr Actor運行時通過在回合開始時獲取每個Actor的鎖並在回合結束時釋放鎖來實現基於回合的並發性。 因此,基於回合的並發是在每個Actor的基礎上執行的,而不是跨Actor。Actor 方法和計時器/提醒回調可以代表不同的 Actor 同時執行。

以下示例說明了上述概念。考慮實現兩個異步方法(例如 Method1 和 Method2)、計時器和提醒的Actor 類型。下圖顯示了代表屬於此Actor類型的兩個Actors(ActorId1 和 ActorId2)執行這些方法和回調的時間線示例。

actors_background_concurrency.png

Actor狀態管理

Actor可以使用狀態管理功能可靠地保存狀態。您可以通過 HTTP/gRPC 端點與 Dapr 交互以進行狀態管理。

要使用 actor,您的狀態存儲必須支持事務。這意味着您的狀態存儲組件必須實現 TransactionalStore 接口。只有一個狀態存儲組件可以用作所有參與者的狀態存儲。

事務支持列表:https://docs.dapr.io/reference/components-reference/supported-state-stores/

:建議學習的時候都用Redis,官方所有的示例也都是基於Redis,比較容易上手,且Dapr init默認集成

Actor計時器和提醒

Actor可以通過注冊計時器或提醒來安排自己的定期工作。

計時器和提醒的功能非常相似。主要區別在於,Dapr Actor 運行時在停用后不保留有關計時器的任何信息,而使用 Dapr Actor 狀態提供程序保留有關提醒的信息。

定時器和提醒的調度配置是相同的,總結如下:


DueTime 是一個可選參數,用於設置第一次調用回調之前的時間或時間間隔。如果省略 DueTime,則在定時器/提醒注冊后立即調用回調。

支持的格式:

  • RFC3339 日期格式,例如2020-10-02T15:00:00Z
  • time.Duration 格式,例如2h30m
  • ISO 8601 持續時間格式,例如PT2H30M

period 是一個可選參數,用於設置兩次連續回調調用之間的時間間隔。當以 ISO 8601-1 持續時間格式指定時,您還可以配置重復次數以限制回調調用的總數。如果省略 period,則回調將僅被調用一次。

支持的格式:

  • time.Duration 格式,例如2h30m
  • ISO 8601 持續時間格式,例如PT2H30M, R5/PT1M30S

ttl 是一個可選參數,用於設置計時器/提醒到期和刪除的時間或時間間隔。如果省略 ttl,則不應用任何限制。

支持的格式:

  • RFC3339 日期格式,例如2020-10-02T15:00:00Z
  • time.Duration 格式,例如2h30m
  • ISO 8601 持續時間格式,例如PT2H30M

當您同時指定周期內的重復次數和 ttl 時,計時器/提醒將在滿足任一條件時停止。

Actor 運行時配置

  • actorIdleTimeout - 停用空閑 actor 之前的超時時間。每個 actorScanInterval 間隔都會檢查超時。默認值:60 分鍾

  • actorScanInterval - 指定掃描演員以停用空閑Actor的頻率的持續時間。閑置時間超過 actor_idle_timeout 的 Actor 將被停用。默認值:30 秒

  • drainOngoingCallTimeout - 在耗盡Rebalanced的Actor的過程中的持續時間。這指定了當前活動 Actor 方法完成的超時時間。如果當前沒有 Actor 方法調用,則忽略此項。默認值:60 秒

  • drainRebalancedActors - 如果為 true,Dapr 將等待 drainOngoingCallTimeout 持續時間以允許當前角色調用完成,然后再嘗試停用角色。默認值:true

    drainRebalancedActors與上面的drainOngoingCallTimeout需搭配使用

  • reentrancy - (ActorReentrancyConfig) - 配置角色的重入行為。如果未提供,則禁用可重入。默認值:disabled, 0

  • remindersStoragePartitions - 配置Actor提醒的分區數。如果未提供,則所有提醒都將保存為Actor狀態存儲中的單個記錄。默認值:0

// In Startup.cs
public void ConfigureServices(IServiceCollection services)
{
    // Register actor runtime with DI
    services.AddActors(options =>
    {
        // Register actor types and configure actor settings
        options.Actors.RegisterActor<MyActor>();

        // Configure default settings
        options.ActorIdleTimeout = TimeSpan.FromMinutes(60);
        options.ActorScanInterval = TimeSpan.FromSeconds(30);
        options.DrainOngoingCallTimeout = TimeSpan.FromSeconds(60);
        options.DrainRebalancedActors = true;
        options.RemindersStoragePartitions = 7;
        // reentrancy not implemented in the .NET SDK at this time
    });

    // Register additional services for use with actors
    services.AddSingleton<BankService>();
}

分區提醒(Preview)

在 sidecar 重新啟動后,Actor 提醒會保留並繼續觸發。在 Dapr 運行時版本 1.3 之前,提醒被保存在 actor 狀態存儲中的單個記錄上。

此為Preview功能,感興趣可以看官方文檔

.Net調用Dapr的Actor

與以往不同,Actor示例會多創建一個共享類庫用於存放Server和Client共用的部分

創建Assignment.Shared

創建類庫項目,並添加Dapr.ActorsNuGet包引用,最后添加以下幾個類:

AccountBalance.cs

namespace Assignment.Shared;
public class AccountBalance
{
    public string AccountId { get; set; } = default!;

    public decimal Balance { get; set; }
}

IBankActor.cs

:這個是Actor接口,IActor是Dapr SDK提供的

using Dapr.Actors;

namespace Assignment.Shared;
public interface IBankActor : IActor
{
    Task<AccountBalance> GetAccountBalance();

    Task Withdraw(WithdrawRequest withdraw);
}

OverdraftException.cs

namespace Assignment.Shared;
public class OverdraftException : Exception
{
    public OverdraftException(decimal balance, decimal amount)
        : base($"Your current balance is {balance:c} - that's not enough to withdraw {amount:c}.")
    {
    }
}

WithdrawRequest.cs

namespace Assignment.Shared;
public class WithdrawRequest
{
    public decimal Amount { get; set; }
}

創建Assignment.Server

創建類庫項目,並添加Dapr.Actors.AspNetCoreNuGet包引用和Assignment.Shared項目引用,最后修改程序端口為5000。

:Server與Shared和Client的NuGet包不一樣,Server是集成了服務端的一些功能

修改program.cs

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<BankService>();
builder.Services.AddActors(options =>
{
    options.Actors.RegisterActor<DemoActor>();
});

var app = builder.Build();

app.UseRouting();

app.UseEndpoints(endpoints =>
{
    endpoints.MapActorsHandlers();
});

app.Run();

添加BankService.cs

using Assignment.Shared;

namespace Assignment.Server;
public class BankService
{
    // Allow overdraft of up to 50 (of whatever currency).
    private readonly decimal OverdraftThreshold = -50m;

    public decimal Withdraw(decimal balance, decimal amount)
    {
        // Imagine putting some complex auditing logic here in addition to the basics.

        var updated = balance - amount;
        if (updated < OverdraftThreshold)
        {
            throw new OverdraftException(balance, amount);
        }

        return updated;
    }
}

添加BankActor.cs

using Assignment.Shared;
using Dapr.Actors.Runtime;
using System;

namespace Assignment.Server;
public class BankActor : Actor, IBankActor, IRemindable // IRemindable is not required
{
    private readonly BankService bank;

    public BankActor(ActorHost host, BankService bank)
        : base(host)
    {
        // BankService is provided by dependency injection.
        // See Program.cs
        this.bank = bank;
    }

    public async Task<AccountBalance> GetAccountBalance()
    {
        var starting = new AccountBalance()
        {
            AccountId = this.Id.GetId(),
            Balance = 10m, // Start new accounts with 100, we're pretty generous.
        };

        var balance = await StateManager.GetOrAddStateAsync("balance", starting);
        return balance;
    }

    public async Task Withdraw(WithdrawRequest withdraw)
    {
        var starting = new AccountBalance()
        {
            AccountId = this.Id.GetId(),
            Balance = 10m, // Start new accounts with 100, we're pretty generous.
        };

        var balance = await StateManager.GetOrAddStateAsync("balance", starting)!;

        if (balance.Balance <= 0)
        {
            // Simulated reminder deposit
            if (Random.Shared.Next(100) > 90)
            {
                await RegisterReminderAsync("Deposit", null, TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
            }
        }

        // Throws Overdraft exception if the account doesn't have enough money.
        var updated = this.bank.Withdraw(balance.Balance, withdraw.Amount);

        balance.Balance = updated;
        await StateManager.SetStateAsync("balance", balance);
    }

    public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
    {
        if (reminderName == "Deposit")
        {
            var balance = await StateManager.GetStateAsync<AccountBalance>("balance")!;

            if (balance.Balance <= 0)
            {
                balance.Balance += 60; // 50(Overdraft Threshold) + 10 = 60
                Console.WriteLine("Deposit: 10");
            }
            else
            {
                Console.WriteLine("Deposit: ignore");
            }
        }
    }
}

運行Assignment.Server

使用Dapr CLI來啟動,先使用命令行工具跳轉到目錄 dapr-study-room\Assignment07\Assignment.Server,然后執行下面命令

dapr run --app-id testactor --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet run

創建Assignment.Client

創建控制台項目,並添加Dapr.ActorsNuGet包引用和Assignment.Shared項目引用。

修改Program.cs

using Assignment.Shared;
using Dapr.Actors;
using Dapr.Actors.Client;

Console.WriteLine("Creating a Bank Actor");
var bank = ActorProxy.Create<IBankActor>(ActorId.CreateRandom(), "BankActor");
Parallel.ForEach(Enumerable.Range(1, 10), async i =>
{
    while (true)
    {
        var balance = await bank.GetAccountBalance();
        Console.WriteLine($"[Worker-{i}] Balance for account '{balance.AccountId}' is '{balance.Balance:c}'.");

        Console.WriteLine($"[Worker-{i}] Withdrawing '{1m:c}'...");
        try
        {
            await bank.Withdraw(new WithdrawRequest() { Amount = 1m });
        }
        catch (ActorMethodInvocationException ex)
        {
            Console.WriteLine("[Worker-{i}] Overdraft: " + ex.Message);
        }

        Task.Delay(1000).Wait();
    }
});

Console.ReadKey();

運行Assignment.Client

使用Dapr CLI來啟動,先使用命令行工具跳轉到目錄 dapr-study-room\Assignment07\Assignment.Client,然后執行下面命令

dotnet run

本章源碼

Assignment07

https://github.com/doddgu/dapr-study-room

我們正在行動,新的框架、新的生態

我們的目標是自由的易用的可塑性強的功能豐富的健壯的

所以我們借鑒Building blocks的設計理念,正在做一個新的框架MASA Framework,它有哪些特點呢?

  • 原生支持Dapr,且允許將Dapr替換成傳統通信方式
  • 架構不限,單體應用、SOA、微服務都支持
  • 支持.Net原生框架,降低學習負擔,除特定領域必須引入的概念,堅持不造新輪子
  • 豐富的生態支持,除了框架以外還有組件庫、權限中心、配置中心、故障排查中心、報警中心等一系列產品
  • 核心代碼庫的單元測試覆蓋率90%+
  • 開源、免費、社區驅動
  • 還有什么?我們在等你,一起來討論

經過幾個月的生產項目實踐,已完成POC,目前正在把之前的積累重構到新的開源項目中

目前源碼已開始同步到Github(文檔站點在規划中,會慢慢完善起來):

MASA.BuildingBlocks

MASA.Contrib

MASA.Utils

MASA.EShop

BlazorComponent

MASA.Blazor

QQ群:7424099

微信群:加技術運營微信(MasaStackTechOps),備注來意,邀請進群

masa_stack_tech_ops.png


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM