4小時Dapr+.NET 5+K8S 的實戰 https://ke.qq.com/course/4000292?tuin=1271860f
Dapr進階虛擬機集群實戰(非K8S) https://ke.qq.com/course/4002149?tuin=1271860f
什么是發布-訂閱
發布訂閱是一種眾所周知並被廣泛使用的消息傳送模式,常用在微服務架構的服務間通信,高並發削峰等情況。但是不同的消息中間件之間存在細微的差異,項目使用不同的產品需要實現不同的實現類,雖然是明智的決策,但必須編寫和維護抽象及其基礎實現。 此方法需要復雜、重復且容易出錯的自定義代碼。
Dapr為了解決這種問題,提供開箱即用的消息傳送抽象和實現,封裝在 Dapr 構建基塊中。業務系統只需調用跟據Dapr的要求實現訂閱發布即可。
工作原理
Dapr 發布&訂閱構建基塊提供了一個與平台無關的 API 框架來發送和接收消息。
服務將消息發布到指定主題, 業務服務訂閱主題以使用消息。
服務在 Dapr sidecar 上調用 pub/sub API。 然后,sidecar 調用預定義 Dapr pub/sub 組件。
任何編程平台都可以使用 Dapr 本機 API 通過 HTTP 或 gRPC 調用構建基塊。 若要發布消息,請進行以下 API 調用:
http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>
上述調用中有幾個特定於 Dapr 的 URL 段:
<dapr-port>
提供 Dapr sidecar 偵聽的端口號。<pub-sub-name>
提供所選 Dapr pub/sub 組件的名稱。<topic>
提供消息發布到的主題的名稱。
設置發布訂閱組件
Dapr 為 Pub/Sub 提供很多支持的組件,例如 Redis 和 Kafka 等。支持組件詳見 鏈接
在win10上的自承載的Dapr中,默認在 %UserProfile%\.dapr\components\pubsub.yaml 中使用redis作為了pub/sub組件,dapr run一個app時,使用默認組件作為pub/sub組件
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub spec: type: pubsub.redis version: v1 metadata: - name: redisHost value: localhost:6379 - name: redisPassword value: ""
訂閱主題
Dapr 允許兩種方法訂閱主題:
- 聲明式,其中定義在外部文件中。
- 編程方式,訂閱在用戶代碼中定義
1.聲明式訂閱
在默認組件目錄 %UserProfile%\.dapr\components\pubsub.yaml 中新建subscription.yaml文件,並寫入以下內容
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: myevent-subscription spec: topic: test_topic route: /TestPubSub pubsubname: pubsub scopes: - frontend
上面的示例顯示了 test_topic
主題的事件訂閱,使用組件 pubsub
。
route
告訴 Dapr 將所有主題消息發送到應用程序中的/TestPubSub
端點。scopes
為 FrontEnd啟用訂閱
現在需要在FrontEnd項目中定義接口TestSub,在FrontEnd中新建TestPubSubController
using Dapr.Client; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; using System.IO; using System.Text; using System.Threading.Tasks; namespace FrontEnd.Controllers { [Route("[controller]")] [ApiController] public class TestPubSubController : ControllerBase { private readonly ILogger<TestPubSubController> _logger; private readonly DaprClient _daprClient; public TestPubSubController(ILogger<TestPubSubController> logger, DaprClient daprClient) { _logger = logger; _daprClient = daprClient; } [HttpPost] public ActionResult Post() { Stream stream = Request.Body; byte[] buffer = new byte[Request.ContentLength.Value]; stream.Position = 0L; stream.ReadAsync(buffer, 0, buffer.Length); string content = Encoding.UTF8.GetString(buffer); return Ok(content); } [HttpGet("pub")] public async Task<ActionResult> PubAsync() { var data = new WeatherForecast(); await _daprClient.PublishEventAsync<WeatherForecast>("pubsub", "test_topic", data); return Ok(); } } }
需要在Startup的Configure中開啟重復讀取Body才能讀取到數據
app.Use((context, next) => { context.Request.EnableBuffering(); return next(); });
啟動FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll
調用 pub API發布消息
查看訂閱情況,訂閱消息消費成功
2.編程式訂閱
為了防止聲明式訂閱的影響,需要先把目錄<%UserProfile%\.dapr\components\pubsub.yaml>中subscription.yaml文件刪除
TestPubSubController新增Api Sub
[Topic("pubsub", "test_topic")] [HttpPost("sub")] public async Task<ActionResult> Sub() { Stream stream = Request.Body; byte[] buffer = new byte[Request.ContentLength.Value]; stream.Position = 0L; stream.ReadAsync(buffer, 0, buffer.Length); string content = Encoding.UTF8.GetString(buffer); _logger.LogInformation("testsub" + content); return Ok(content); }
在Startup的Configure方法中新增中間件
public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { // ... app.UseCloudEvents(); app.UseEndpoints(endpoints => { endpoints.MapSubscribeHandler(); // ... }); }
啟動FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll
調用API發布消息
查看訂閱情況,訂閱消息消費成功
通過DapreCLI同樣可以發布消息
dapr publish --publish-app-id frontend --pubsub pubsub --topic test_topic --data '{"date":"0001-01-01T00:00:00","temperatureC":0,"temperatureF":32,"summary":null}'
查看訂閱情況,訂閱消息消費成功