Dapr微服務應用開發系列5:發布訂閱構建塊


題記:這篇介紹發布訂閱構建塊,這是對事件驅動架構設計的一種實現落地。

注:對於“Building Blocks”這個詞組的翻譯,我之前使用了“構件塊”,現在和官方文檔(Dapr中文社區的貢獻)保持一致,采用“構建塊”。

原理

發布訂閱的概念來自於事件驅動架構(EDA)的設計思想,這是一種讓程序(應用、服務)之間解耦的主要方式,通過發布訂閱的思想也可以實現服務之間的異步調用。而大部分分布式應用都會依賴這樣的發布訂閱解耦模式。

整個發布訂閱的思想其實是比較簡單的:

如上圖所示,把需要解耦的程序分別設定為事件發布者或者事件訂閱者(理論上,對於某個事件,一個程序僅能作為一種角色;對於不同事件,一個程序可以既作為發布者又可以作為訂閱者)。同時利用消息代理(Message Broker)中間件把兩者對接起來,消息代理即作為事件消息的傳輸通道。

在Dapr中對這種發布訂閱模式進行了高度抽象的實現,並提供了自由替換消息代理中間件的特性,如下圖所示:

Dapr的發布訂閱構建塊也可以被看作一種事件總線(Event Bus)的實現,只是你不需要使用特殊的協議,在發布端和訂閱端僅使用HTTP/gRPC即可。

在事件總線中,把發布訂閱兩者關聯在一起的是事件類型,那么在Dapr中也引入了一個類似的概念——主題(Topic)。如果對消息隊列中間件熟悉的人對於這個概念不會陌生。由此發布端和訂閱端的處理過程和針對Dapr的接口也就是圍繞主題來展開的。

能力

消息發送

既然Dapr的PubSub是一種事件總線,那么要發送消息,必然需要對代表主題(事件類型)的消息進行封裝。Dapr並沒有去創造一種獨有的格式,而是采用了目前業界比較流行的開放協議——雲事件(CloudEvents)規范。這種格式把事件消息封裝為如下JSON數據:

{
    "specversion" : "1.0",
    "type" : "xml.message",
    "source" : "https://example.com/message",
    "subject" : "Test XML Message",
    "id" : "id-1234-5678-9101",
    "time" : "2020-09-23T06:23:21Z",
    "datacontenttype" : "text/xml",
    "data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"
}

當然對消息的封裝不需要應用程序本身去關心,你只需要給Dapr傳遞data的字符串即可,而這個字符串本身是以什么格式(不管xml還是json)去承載內容都是由應用程序確定。具體如何發送消息,下面規范部分會介紹。

消息傳遞

Dapr會自動根據主題把消息發送給所有訂閱者,傳遞過程保證“至少一次”送達。送達的判斷標准是基於訂閱者的響應是否成功(即HTTP狀態碼為20X)。

當然,訂閱者也可以在響應體中設置 status 屬性來給出更為精細的處理指令,比如 RETRY 告知Dapr之前處理失敗了,現在是重試成功了;或者 DROP 告知Dapr應用程序對這個消息處理出現問題,已經記錄了告警日志,但是不打算繼續處理它了。

消息傳遞還有一個重要的特性需要理解,就是消息的生存期(Time-to-Live,TTL)。TTL規定了消息在Dapr(實際上是在消息代理中間件)里面的存活時間,如果TTL過期,那么消息就不會再被傳遞(即變成死信)。所有目前支持的發布訂閱組件都支持TTL的特性,Dapr會幫助你處理這方面的邏輯。

消息消費

為了消費消息,需要對主題進行注冊,可以通過聲明式和編程式來進行注冊。聲明式通過外部的yaml文件定義一個K8S的CRD,來描述服務需要訂閱什么主題,接收事件的HTTP API路由地址。編程式通過暴露特定的HTTP API路由地址或者特定的gRPC方法來讓Dapr運行時進行訪問,從而注冊需要訂閱什么主題和接收事件的地址。

發布訂閱構建塊采用的是所謂競爭者消費模式,即同一個應用(AppId相同)的多個實例,只會有一個實例獲得消息,這些同個應用的多個實例稱之為一個消費組。如果你希望消息被多個應用得到,那么就需要使用多個消費組,也即多個AppId。

主題范圍限制

從上面所知,在發送消息和消費消息的時候,都需要針對某個主題。為了對消息的傳遞進行更加精細的控制,在發布訂閱構建塊中可以對主題范圍進行限制,即某些主題只能由某些應用來發布,某些主題只能由某些應用來訂閱。

要進行范圍限制,需要對發布訂閱組件的配置yaml進行配置,設置 spec.metadata 下面的 publishingScopessubscriptionScopesallowedTopics 配置。(詳細說明見未來的關於組件的文章)。

規范

Dapr給PubSub這一構建塊提供了兩方面的規范:消息生產端和消息消費端。

消息生產端

通過POST如下地址,來發送消息到特定主題:

POST http://localhost:<daprPort>/v1.0/publish/<pubsubname>/<topic>[?<metadata>]

其中pubsubname代表了PubSub組件的名稱;topic代表了目標主題名稱。

Content-Type 請求頭中設置請求體的格式,比如 application/json

請求體根據請求頭里面的設置格式,傳入JSON或者XML,Dapr會自動把請求體封裝為CloudEvent格式。

如果是直接調用gRPC的接口的話,是調用 PublishEvent 接口並傳遞 PublishEventRequest 實體。

消息消費端

如果你的消費端是通過聲明式來向Dapr注冊需要訂閱什么主題的消息,那么在如下格式的yaml文件中進行定義:

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: deathStarStatus
  route: /dsstatus
  pubsubname: pubsub
scopes:
- app1
- app2

其中 spec.topic 代表了要訂閱的主題名稱,spec.route 代表了接收訂閱消息的HTTP路由地址,spec.route 代表了針對的PubSub組件是那個。尤為關鍵是 scopes 里面設置了這樣的訂閱聲明是針對那個(或那幾個)應用起作用(填入appid)。

如果你的消費端是通過編程式來向Dapr注冊需要訂閱什么主題的消息,那么暴露一個如下特殊HTTP路由地址的接口:

GET http://localhost:<appPort>/dapr/subscribe

並返回如下格式的響應體,讓Dapr知道你的應用需要訂閱什么主題,接收消息的接口路由地址是什么:

[
  {
    "pubsubname": "pubsub",
    "topic": "newOrder",
    "route": "/orders"
  }
]

當然你的應用需要暴露注冊的接收路由接口,並支持POST謂詞,接口收到請求后返回2xx狀態碼告訴Dapr消息處理成功了,或者404告訴Dapr出現錯誤且消息已經丟棄,或者其他狀態碼讓Dapr進行重試。

兩種訂閱注冊方式各有優缺,聲明式方便一個主題注冊多個應用,編程式方便一個應用注冊多個主題。

注意:如果是使用gRPC來注冊和暴露消費接口,那么規范有所不同,做法見下面。

DOTNET SDK

Dapr的.NET SDK同樣針對消息生產端和消費端提供相關的函數庫。

在DaprClient這個客戶端類中提供了 PublishEventAsync 這個方法來用於發送事件消息到特定PubSub的特定主題上 (底層是請求了gRPC的接口)。比如:

using var client = new DaprClientBuilder().Build();

var eventData = new { Id = "17", Amount = 10m, };
await client.PublishEventAsync(pubsubName, "deposit", eventData, cancellationToken);

在消費端,目前針對ASP.NET Core提供了一個特殊的屬性標記 TopicAttribute 來簡化編程式訂閱注冊的過程。比如:

[Topic("pubsub", "deposit")]
[HttpPost("deposit")]
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)

如果你是使用gRPC來實現消費端,那么目前並沒有一個簡化方式來注冊(未來我會補上這個坑),只能遵循如下規范:

首先用ListTopicSubscriptions注冊:

public override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(Empty request, ServerCallContext context)
{
    var result = new ListTopicSubscriptionsResponse();
    result.Subscriptions.Add(new TopicSubscription
    {
        PubsubName = "pubsub",
        Topic = "deposit"
    });
    result.Subscriptions.Add(new TopicSubscription
    {
        PubsubName = "pubsub",
        Topic = "withdraw"
    });
    return Task.FromResult(result);
}

接着用OnTopicEvent接收:

public override async Task<TopicEventResponse> OnTopicEvent(TopicEventRequest request, ServerCallContext context)
{
    if (request.PubsubName == "pubsub")
    {
        var input = JsonSerializer.Deserialize<Models.Transaction>(request.Data.ToStringUtf8(), this.jsonOptions);
        var transaction = new GrpcServiceSample.Generated.Transaction() { Id = input.Id, Amount = (int)input.Amount, };
        if (request.Topic == "deposit")
        {
            await Deposit(transaction, context);
        }
        else
        {
            await Withdraw(transaction, context);
        }
    }

    return await Task.FromResult(default(TopicEventResponse));
}

具體見SDK的examples:https://github.com/heavenwing/dapr-dotnet-sdk/blob/master/examples/AspNetCore/GrpcServiceSample/Services/BankingService.cs

用法與例子

使用SDK來進行事件消息的發布和訂閱,可以直接參考SDK的examples中的消費端例子 ControllerSample 和生產端例子 PublishSubscribe

如果是非SDK的用法,可以參考我的quickstarts,消費端 PubSubConsumer和生產端 PubSubProducer

我的quickstarts的消費端同時使用聲明式和編程式兩種注冊方式。大致代碼如下:

[Route("dapr/subscribe")]
[ApiController]
public class DaprSubscribeController : ControllerBase
{
    public ActionResult<DaprSubscribeOutput[]> Get()
    {
        return Ok(new DaprSubscribeOutput[]
        {
            new DaprSubscribeOutput
            {
                PubSubName="pubsub",
                Topic="quickstarts/wakeup",
                Route="/api/wakeup"
            }
        });
    }
}

public async Task<IActionResult> PostAsync(TinyCloudEvent<MessageInput> model)
{
    _logger.LogInformation(model.Data.Name);
    return Ok();
}

 using var httpClient = new HttpClient();
httpClient.BaseAddress = new Uri(pubsubUrl);

Console.WriteLine($"To {topicName1} ...");
var request1 = new HttpRequestMessage(HttpMethod.Post, topicName1);
request1.Content = new StringContent(JsonSerializer.Serialize(new { name = "Jack" }), Encoding.UTF8, "application/json");
await httpClient.SendAsync(request1);
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: quickstarts-subscription
spec:
  topic: quickstarts/sleep
  route: /api/sleep
  pubsubname: pubsub
scopes:
- quickstarts-pbc


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM