前言
前篇文章對Dapr的狀態管理進行了解,本篇繼續對 訂閱/發布 構建塊進行了解。
一、定義:
發布訂閱的概念來自於事件驅動架構(EDA)的設計思想,這是一種讓程序(應用、服務)之間解耦的主要方式,通過發布訂閱的思想也可以實現服務之間的異步調用。而大部分分布式應用都會依賴這樣的發布訂閱解耦模式。
步驟:
- 發布服務器將消息發送到消息代理。
- 訂閱服務器將綁定到消息代理上的訂閱。
- 消息代理將消息的副本轉發給感興趣的訂閱。
- 訂閱服務器從其訂閱使用消息。
但是不同的消息中間件之間存在細微的差異,項目使用不同的產品需要實現不同的實現類,雖然是明智的決策,但必須編寫和維護抽象及其基礎實現。此方法需要復雜、重復且容易出錯的自定義代碼。
Dapr為了解決這種問題,提供開箱即用的消息傳送抽象和實現,封裝在 Dapr 構建基塊中。業務系統只需調用跟據Dapr的要求實現訂閱發布即可。
二、工作原理:
Dapr 發布&訂閱構建基塊提供了平台無關的 API 框架來發送和接收消息。你的服務將消息發布到一個命名主題(topic)。服務訂閱主題(topic)來使用消息。
服務在 Dapr Sidecar上調用 pub/sub API。 然后,Sidecar將調用一個預定義的 Dapr pub/sub 組件來封裝特定的消息代理產品。 下圖 顯示了 Dapr 發布/訂閱 消息傳遞堆棧。
三、功能:
-
發布/訂閱API
Dapr 發布&訂閱構建基塊提供了一個與平台無關的 API 框架來發送和接收消息。
服務將消息發布到指定主題, 業務服務訂閱主題以使用消息。
服務在 Dapr sidecar 上調用 pub/sub API。然后,sidecar 調用預定義 Dapr pub/sub 組件。
任何編程平台都可以使用 Dapr 本機 API 通過 HTTP 或 gRPC 調用構建基塊。若要發布消息,請進行以下 API 調用:
http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>
上述調用中有幾個特定於 Dapr 的 URL 段:
-
-
<dapr-port>
提供 Dapr sidecar 偵聽的端口號。 -
<pub-sub-name>
提供所選 Dapr pub/sub 組件的名稱。 -
<topic>
提供消息發布到的主題的名稱。
-
-
消息格式
要啟用消息路由並為每個消息提供附加上下文,Dapr 使用 CloudEvents 1.0 規范 作為其消息格式。 使用 Dapr 應用程序發送的任何信息都將自動包入 Cloud Events 信封中,datacontenttype
屬性使用 Content-Type
頭部值。
Dapr 實現以下 Cloud Events 字段:
-
id
source
specversion
type
datacontenttype
(可選)
下面的示例顯示了 CloudEvent v1.0 中序列化為 JSON 的 XML 內容:
{ "specversion" : "1.0", "type" : "xml.message", "source" : "https://example.com/message", "subject" : "Test XML Message", "id" : "id-1234-5678-9101", "time" : "2020-09-23T06:23:21Z", "datacontenttype" : "text/xml", "data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>" }
-
訂閱消息
Dapr 應用程序可以訂閱已發布的 topics。 Dapr 允許您的應用程序有兩種方法來訂閱 topics:
聲明式:其中定義在外部文件中:
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: myevent-subscription spec: topic: test_topic //主題 route: /TestPubSub //路由 pubsubname: pubsub //名稱 scopes: - frontend //為該應用啟用訂閱
上面的示例顯示了 test_topic
主題的事件訂閱,使用組件 pubsub
。
-
-
route
告訴 Dapr 將所有主題消息發送到應用程序中的/TestPubSub
端點。 -
scopes
為 frontend 應用啟用訂閱
-
編程方式:訂閱在用戶代碼中定義
-
消息傳遞
Dapr 保證消息傳遞 at-least-once 語義。 這意味着,當應用程序使用發布/訂閱 API 將消息發布到主題時,Dapr 可確保此消息至少傳遞給每個訂閱者一次(at least once)
-
消費者群體和競爭行消費者模式
多個消費組、多個應用程序實例使用一個消費組,這些都將由 Dapr 自動處理。 當同一個應用程序的多個實例(相同的 ID) 訂閱主題時,Dapr 只將每個消息傳遞給該應用程序的一個實例。
同樣,如果兩個不同的應用程序 (不同的 ID) 訂閱同一主題,那么 Dapr 將每個消息僅傳遞到每個應用程序的一個實例。
-
Topic作用域:
默認情況下,支持Dapr發布/訂閱組件的所有主題 (例如,Kafka、Redis、RabbitMQ) 都可用於配置該組件的每個應用程序。 為了限制哪個應用程序可以發布或訂閱 topic,Dapr 提供了 topic 作用域限定。 這使您能夠讓應用程序允許發布哪些主題以及應用程序允許訂閱哪些主題。
pub/sub 主題作用域限定
為每個 pub/sub 組件定義發布/訂閱范圍。 您可能有一個名為 pubsub
的 pub/sub 組件,它有一組范圍設置,另一個 pubsub2
另有一組范圍設置。
要使用這個主題范圍,可以設置一個 pub/sub 組件的三個元數據屬性:
-
spec.metadata.publishingScopes
- 分號分隔應用程序列表& 逗號分隔的主題列表允許該 app 發布信息到主題列表
- 如果在
publishingScopes
(缺省行為) 中未指定任何內容,那么所有應用程序可以發布到所有主題 - 要拒絕應用程序發布信息到任何主題,請將主題列表留空 (
app1=;app2=topic2
) - 例如,
app1=topic1;app2=topic2,topic3;app3=
允許 app1 發布信息至 topic1 ,app2 允許發布信息到 topic2 和 topic3 ,app3 不允許發布信息到任何主題。
spec.metadata.subscriptionScopes
- 分號分隔應用程序列表& 逗號分隔的主題列表允許該 app 訂閱主題列表
- 如果在
subscriptionScopes
(缺省行為) 中未指定任何內容,那么所有應用程序都可以訂閱所有主題 - 例如,
app1=topic1;app2=topic2,topic3
允許 app1 訂閱 topic1 ,app2 可以訂閱 topic2 和 topic3
spec.metadata.allowedTopics
- 一個逗號分隔的允許主題列表,對所有應用程序。
- 如果未設置
allowedTopics
(缺省行為) ,那么所有主題都有效。subscriptionScopes
和publishingScopes
如果存在則仍然生效。 publishingScopes
或subscriptionScopes
可用於與allowedTopics
的 conjuction ,以添加限制粒度
-
消息生存時間:
Dapr 可以在每個消息的基礎上設置超時。 表示如果消息未從 Pub/Sub 組件讀取,則消息將被丟棄。 這是為了防止未讀消息的積累。 在隊列中超過配置的 TTL 的消息就可以說它掛了。
四、.NET Core 應用
1、設置Pub/Sub組件:
本機默認下安裝了Redis Staram,在 Windows 上打開%UserProfile%\.dapr\components\pubsub.yaml
組件文件以驗證:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
2、實現發布/訂閱功能 :
添加控制器(PubSubController)
[Route("api/[controller]")] [ApiController] public class PubSubController : ControllerBase { private readonly ILogger<PubSubController> _logger; private readonly DaprClient _daprClient; public PubSubController(ILogger<PubSubController> logger, DaprClient daprClient) { _logger = logger; _daprClient = daprClient; } /// <summary> /// 發布消息 /// </summary> /// <returns></returns> [HttpGet("pub")] public async Task<ActionResult> PubAsync() { var data = new WeatherForecast() { Summary = "city", Date = DateTime.Now }; await _daprClient.PublishEventAsync<WeatherForecast>("pubsub", "test_topic", data); return Ok(); } /// <summary> /// 消費消息 /// </summary> /// <returns></returns> [Topic("pubsub", "test_topic")] [HttpPost("sub")] public async Task<ActionResult> Sub() { Stream stream = Request.Body; byte[] buffer = new byte[Request.ContentLength.Value]; stream.Position = 0L; await stream.ReadAsync(buffer, 0, buffer.Length); string content = Encoding.UTF8.GetString(buffer); _logger.LogInformation("testsub" + content); return Ok(content); } }
Startup.cs中調整:
public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { //注入Dapr services.AddControllers().AddDapr(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { // 使用CoudEvent app.UseCloudEvents(); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.Use((context, next) => { context.Request.EnableBuffering(); return next(); }); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers() //訂閱處理 endpoints.MapSubscribeHandler();; }); } }
3、dapr運行程序:
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd.dll
4、調用發布命令:
http://127.0.0.1:3501/v1.0/invoke/frontend/method/api/pubsub/pub
5、通過Dapr cli 發布消息:
dapr publish --publish-app-id frontend --pubsub pubsub --topic test_topic --data '{"date":"0001-01-01T00:00:00","temperatureC":0,"temperatureF":32,"summary":null}'
總結
pub/sub 模式可幫助你分離分布式應用程序中的服務。 Dapr 發布&訂閱構建基塊簡化了在應用程序中實現此行為。
通過 Dapr pub/sub,可以將消息發布到特定 主題。構建基塊還將查詢服務,以確定 (訂閱) 主題。
可以通過 HTTP 或特定於語言的 SDK 之一(例如用於 Dapr 的 .NET SDK)本機使用 Dapr pub/sub。 .NET SDK 與 ASP.NET 平台緊密集成。
使用 Dapr,可以將受支持的消息代理產品插入應用程序。 然后,無需更改應用程序的代碼,即可交換消息代理。