Dapr-Actor構建塊


前言:

 前篇-綁定 文章對Dapr的綁定構建塊進行了解,本篇繼續對 Actor 構建塊進行了解學習。

一、Actor簡介:

 Actors 為最低級別的“計算單元”。 換句話說,您將代碼寫入獨立單元 ( 稱為actor) ,該單元接收消息並一次處理消息,而不進行任何類型的並行或線程處理。

 當代碼處理一條消息時,它可以向其他參與者發送一條或多條消息,或者創建新的 Actors。 底層 運行時 將管理每個 actor 的運行方式,時機和位置,並在 Actors 之間傳遞消息。

 大量 Actors 可以同時執行,而 Actors 可以相互獨立執行。

 Dapr 包含專門實現 virtual actors 模式 的運行時。 通過 Dapr 的實現,您可以根據 Actors 模型編寫 Dapr Actor,而 Dapr 利用底層平台提供的可擴展性和可靠性保證。

 應用場景:

  • 您的問題空間涉及大量(數千或更多) 的獨立和孤立的小單位和邏輯。
  • 您想要處理單線程對象,這些對象不需要外部組件的大量交互,例如在一組 Actors 之間查詢狀態。
  • 您的 actor 實例不會通過發出I/O操作來阻塞調用方。

 生命周期:

  Dapr Actors 是虛擬的,意思是他們的生命周期與他們的 in - memory 表現不相關。 因此,它們不需要顯式創建或銷毀。 Dapr Actors 運行時在第一次接收到該 actor ID 的請求時自動激活 actor。 如果 actor 在一段時間內未被使用,那么 Dapr Actors 運行時將回收內存對象。 如果以后需要重新啟動,它還將保持對 actor 的一切原有數據。

  調用 actor 方法和 reminders 將重置空閑時間,例如,reminders 觸發將使 actor 保持活動狀態。 不論 actor 是否處於活動狀態或不活動狀態 Actor reminders 都會觸發,對不活動 actor ,那么會首先激活 actor。 Actor timers 不會重置空閑時間,因此 timer 觸發不會使參與者保持活動狀態。 Timer 僅在 actor 活躍時被觸發。

  空閑超時和掃描時間間隔 Dapr 運行時用於查看是否可以對 actor 進行垃圾收集。 當 Dapr 運行時調用 actor 服務以獲取受支持的 actor 類型時,可以傳遞此信息。

  Virtual actors 生命周期抽象會將一些警告作為 virtual actors 模型的結果,而事實上, Dapr Actors 實施有時會偏離此模型。

  在第一次將消息發送到其 actor 標識時,將自動激活 actor ( 導致構造 actor 對象) 。 在一段時間后,actor 對象將被垃圾回收。 以后,再次使用 actor ID 訪問,將構造新的 actor。 Actor 的狀態比對象的生命周期更久,因為狀態存儲在 Dapr 運行時的配置狀態提供程序中(也就是說Actor即使不在活躍狀態,仍然可以讀取它的狀態)。

二、工作原理:

 Dapr Sidecar 提供了用於調用執行組件的 HTTP/gRPC API。 這是 HTTP API 的URL格式: 

http://localhost:<daprPort>/v1.0/actors/<actorType>/<actorId>/
  • <daprPort>: Dapr 偵聽的 HTTP 端口。
  • <actorType>:執行組件類型。
  • <actorId>:要調用的特定參與者的 ID。

 a)Actor組件放置服務流程:

  1. 啟動時,Sidecar 調用執行組件服務以獲取注冊的執行組件類型和執行組件的配置設置。
  2. Sidecar 將注冊的執行組件類型的列表發送到放置服務。
  3. 放置服務會將更新的分區信息廣播到所有執行組件服務實例。 每個實例都將保留分區信息的緩存副本,並使用它來調用執行組件。

 b)調用Actor組件方法流程:

  

  1. 服務在Sidecar 上調用執行組件 API。 請求正文中的 JSON 有效負載包含要發送到執行組件的數據。
  2. Sidecar 使用位置服務中的本地緩存的分區信息來確定哪個執行組件服務實例 (分區) 負責托管 ID 為的執行組件 3 。 在此示例中,它是 pod 2 中的服務實例。 調用將轉發到相應的Sidecar 。
  3. Pod 2 中的Sidecar 實例調用服務實例以調用執行組件。 服務實例激活actor(如果它還沒有激活)並執行actor 方法。

三、Actor timers(定時器) 和 reminders(提醒)

 可以使用計時器和提醒來計划自身的調用。 這兩個概念都支持配置截止時間。 不同之處在於回調注冊的生存期:

  • 只要激活執行組件,計時器就會保持活動狀態。 計時器 不會 重置空閑計時器,因此它們不能使執行組件處於活動狀態。
  • 提醒長於執行組件激活。 如果停用了某個執行組件,則會重新激活該執行組件。 提醒  重置空閑計時器。

 1、Timers定時器:

  Dapr Actor 運行時確保回調方法被順序調用,而非並發調用。 這意味着,在此回調完成執行之前,不會有其他Actor方法或timer/remider回調被執行。

  Timer的下一個周期在回調完成執行后開始計算。 這意味着 timer 在回調執行時停止,並在回調完成時啟動。

  Dapr Actor 運行時在回調完成時保存對actor的狀態所作的更改。 如果在保存狀態時發生錯誤,那么將取消激活該actor對象,並且將激活新實例。

  當actor作為垃圾回收(GC)的一部分被停用時,所有 timer 都會停止。 在此之后,將不會再調用 timer 的回調。 此外, Dapr Actors 運行時不會保留有關在失活之前運行的 timer 的任何信息。 也就是說,重新啟動 actor 后將會激活的 timer 完全取決於注冊時登記的 timer。

  a) 創建定時器:

POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>

  Timer 的 duetime 和回調函數可以在請求主體中指定。 到期時間(due time)表示注冊后 timer 將首次觸發的時間。 period 表示timer在此之后觸發的頻率。 到期時間為0表示立即執行。 負 due times 和負 periods 都是無效。

  以下請求體配置了一個 timer, dueTime 9秒, period 3秒。 這意味着它將在9秒后首次觸發,然后每3秒觸發一次。

{
  "dueTime":"0h0m9s0ms",
  "period":"0h0m3s0ms"
}

  b) 刪除定時器:

DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>

 2、Reminders 提醒:

  Reminders 是一種在指定時間內觸發 persistent 回調的機制。 它們的功能類似於 timer。 但與 timer 不同,在所有情況下 reminders 都會觸發,直到 actor 顯式取消注冊 reminders 或刪除 actor 。 具體而言, reminders 會在所有 actor 失活和故障時也會觸發觸發,因為Dapr Actors 運行時會將 reminders 信息持久化到 Dapr Actors 狀態提供者中。

  a) 創建Reminders   

POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>

  Reminders 的 duetime 和回調函數可以在請求主體中指定。 到期時間(due time)表示注冊后 reminders將首次觸發的時間。 period 表示在此之后 reminders 將觸發的頻率。 到期時間為0表示立即執行。 負 due times 和負 periods 都是無效。 若要注冊僅觸發一次的 reminders ,請將 period 設置為空字符串。

  以下請求體配置了一個 reminders, dueTime 9秒, period 3秒。 這意味着它將在9秒后首次觸發,然后每3秒觸發一次。

{
  "dueTime":"0h0m9s0ms",
  "period":"0h0m3s0ms"
}

  b) 獲取Reminders 

GET http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>

  c) 刪除Reminders 

DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>

四、數據持久化:

  使用 Dapr 狀態管理構建塊保存執行組件狀態。由於執行組件可以一輪執行多個狀態操作,因此狀態存儲組件必須支持多項事務

  當前狀態管理組件支持事務/Actors支持情況:

Name CRUD 事務 ETag Actors 狀態 組件版本 自從
Aerospike Alpha v1 1.0
Apache Cassandra Alpha v1 1.0
Cloudstate Alpha v1 1.0
Couchbase Alpha v1 1.0
Hashicorp Consul Alpha v1 1.0
Hazelcast Alpha v1 1.0
Memcached Alpha v1 1.0
MongoDB GA v1 1.0
MySQL Alpha v1 1.0
PostgreSQL Alpha v1 1.0
Redis GA v1 1.0
RethinkDB Alpha v1 1.0
Zookeeper Alpha v1 1.0

 需要支持Actor狀態存儲需添加以下內容:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
 - name: actorStateStore value: "true"  

五、.NET Core 實例:

  1、添加nuget包引用:Dapr.ActorsDapr.Actors.AspNetCore。

  2、定義IOrderStatusActor接口,需要繼承自IActor

public interface IOrderStatusActor : IActor
{
    Task<string> Paid(string orderId);
    Task<string> GetStatus(string orderId);
}

  執行組件方法的返回類型必須為 Task 或 Task<T> 。 此外,執行組件方法最多只能有一個參數。 返回類型和參數都必須可 System.Text.Json 序列化。

 3、定義OrderStatusActor實現IOrderStatusActor,並繼承自Actor

public class OrderStatusActor : Actor, IOrderStatusActor
{
    public OrderStatusActor(ActorHost host) : base(host)
    {
    }
    public async Task<string> Paid(string orderId)
    {
        // change order status to paid
        await StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid");
        return orderId;
    }
    public async Task<string> GetStatus(string orderId)
    {
        return await StateManager.GetStateAsync<string>(orderId);
    }
}

 4、修改Statup.cs文件

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();
    //注冊Actor
    services.AddActors(option =>
    {
        option.Actors.RegisterActor<OrderStatusActor>();
    });
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    if (env.IsDevelopment())
    {
        app.UseDeveloperExceptionPage();
    }
    app.UseRouting();
    app.UseAuthorization();
    app.UseEndpoints(endpoints =>
    {
        endpoints.MapControllers();
        //
        endpoints.MapActorsHandlers();
    });
}

 5、添加ActorController操作Actor

[Route("api/[controller]")]
[ApiController]
public class ActorController : ControllerBase
{
    private readonly IActorProxyFactory _actorProxyFactory;

    public ActorController(IActorProxyFactory actorProxyFactory)
    {
        _actorProxyFactory = actorProxyFactory;
    }

    /// <summary>
    /// 方式一:ActorProxy.Create方式
    /// </summary>
    /// <param name="orderId"></param>
    /// <returns></returns>
    [HttpGet("paid/{orderId}")]
    public async Task<ActionResult> PaidAsync(string orderId)
    {
        var actorId = new ActorId("myid-" + orderId);
        var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor");
        var result = await proxy.Paid(orderId);
        return Ok(result);
    }

    /// <summary>
    /// 方式二:依賴注入方式
    /// </summary>
    /// <param name="orderId"></param>
    /// <returns></returns>
    [HttpGet("get/{orderId}")]
    public async Task<ActionResult> GetAsync(string orderId)
    {
        var proxy = _actorProxyFactory.CreateActorProxy<IOrderStatusActor>(
            new ActorId("myid-" + orderId), "OrderStatusActor");

        return Ok(await proxy.GetStatus(orderId));
    }
}

 6、Timer應用:使用Actor基類的 RegisterTimerAsync 方法注冊計時器:在OrderStatusActor類中新增方法

#region Timer操作

/// <summary>
/// 啟動Timer定時器
/// </summary>
/// <param name="name">定時器名稱</param>
/// <param name="text">定時器參數</param>
/// <returns></returns>
public Task StartTimerAsync(string name, string text)
{
    //注冊立即執行的間隔3s執行的定時器
    return RegisterTimerAsync(
        name,
        nameof(TimerCallbackAsync),
        Encoding.UTF8.GetBytes(text),
        TimeSpan.Zero,
        TimeSpan.FromSeconds(3));
}

/// <summary>
/// 定時器回調
/// </summary>
/// <param name="state"></param>
/// <returns></returns>
public Task TimerCallbackAsync(byte[] state)
{
    var text = Encoding.UTF8.GetString(state);

    Console.WriteLine($"Timer fired: {text}");

    return Task.CompletedTask;

}

/// <summary>
/// 停止定時器
/// </summary>
/// <param name="name"></param>
/// <returns></returns>
public Task StopTimerAsync(string name)
{
    //停止計時器 UnregisterTimerAsync
    return UnregisterTimerAsync(name);
}

#endregion

 7、Reminder操作:使用Actor基類的 RegisterReminderAsync 方法計划計時器。在OrderStatusActor類中新增方法

#region Reminder 操作

public Task SetReminderAsync(string text)
{
    return RegisterReminderAsync(
        "test-reminder",
        Encoding.UTF8.GetBytes(text),
        TimeSpan.Zero,
        TimeSpan.FromSeconds(1));
}

/// <summary>
/// Reminder觸發處理(實現IRemindable接口處理觸發)
/// </summary>
/// <param name="reminderName"></param>
/// <param name="state"></param>
/// <param name="dueTime"></param>
/// <param name="period"></param>
/// <returns></returns>
public Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
{
    if (reminderName == "test-reminder")
    {
        var text = Encoding.UTF8.GetString(state);
        Console.WriteLine($"reminder fired: {text}");
    }
    return Task.CompletedTask;
}
#endregion

 8、啟動定時器:

public class OrderStatusActor : Actor, IOrderStatusActor, IRemindable
{
    public OrderStatusActor(ActorHost host) : base(host)
    {
        //注冊Timer
        StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult();

        //設置Reminder
        SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult();
    }
    //其他處理邏輯    
}

 

總結:

 Dapr 執行組件構建基塊可以更輕松地編寫正確的並發系統。 執行組件是狀態和邏輯的小單元。 它們使用基於輪次的訪問模型,無需使用鎖定機制編寫線程安全代碼。 執行組件是隱式創建的,在未執行任何操作時以無提示方式從內存中卸載。 重新激活執行組件時,自動持久保存並加載執行組件中存儲的任何狀態。 執行組件模型實現通常是為特定語言或平台創建的。 但是,借助 Dapr 執行組件構建基塊,可以從任何語言或平台利用執行組件模型。

 Actor組件支持計時器和提醒來計划將來的工作。 計時器不會重置空閑計時器,並且允許執行組件在未執行其他操作時停用。 提醒會重置空閑計時器,並且也會自動保留。 計時器和提醒都遵守基於輪次的訪問模型,確保在處理計時器/提醒事件時無法執行任何其他操作。

 使用 Dapr 狀態管理構建基塊 持久保存執行組件狀態。 支持多項事務的任何狀態存儲都可用於存儲執行組件狀態。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM