ASP.NET Core 中使用 dapr:pub/sub 發送與訂閱消息


我們決定從這周開始在實際開發中使用 dapr,先在 pub/sub 場景使用。這篇博文記錄一下在 kubernetes 集群中基於 ASP.NET Core 使用 dapr 發送/訂閱消息的試驗過程。

Dapr 環境准備

在 k8s 集群上部署好 dapr

# dapr status -k                
NAME                   NAMESPACE    HEALTHY  STATUS   REPLICAS  VERSION  AGE  CREATED              
dapr-placement-server  dapr-system  True     Running  3         1.5.0    7d   2021-11-13 11:22.53  
dapr-sentry            dapr-system  True     Running  3         1.5.0    7d   2021-11-13 10:51.45  
dapr-dashboard         dapr-system  True     Running  1         0.9.0    7d   2021-11-13 10:50.39  
dapr-operator          dapr-system  True     Running  3         1.5.0    7d   2021-11-13 10:51.10  
dapr-sidecar-injector  dapr-system  True     Running  3         1.5.0    7d   2021-11-13 10:50.40 

部署 pub/sub component ,這里用 redis

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: production
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: redis-master.production.svc.cluster.local:6379
    - name: redisPassword
      secretKeyRef:
        name: redis
        key: redis-password

給發布與訂閱消息的應用添加 dapr 注解(k8s deployment),添加之后 dapr 會自動向 pod 中注入 sidecar。

spec:
  template:
    metadata:
      annotations:
        dapr.io/app-id: ing-web
        dapr.io/enabled: "true"
        dapr.io/app-port: "80"

注:dapr.io/app-port: "80" 一定不能少,默認端口不是80,開始沒有加這個,造成訂閱的消息總是收不到。

應用A發送消息

安裝 dapr .net sdk 的 nuget 包 Dapr.AspNetCore

dotnet add package

在 Startup 的 ConfigureServices 中添加 AddDaprClient

services.AddDaprClient();

注:這個項目只發消息,不訂閱消息,所以不需要 AddDapr 與 Configure 的 MapSubscribeHandler

在構造函數中注入 DaprClient

public IngService(DaprClient daprClient)
{
}

用 PublishEventAsync 方法發消息到消息隊列

await _daprClient.PublishEventAsync("pubsub", "newIng", ing);

在 redis 中檢查消息是否發送成功

kubectl exec -it StatefulSet/redis-master -- redis-cli
127.0.0.1:6379> KEYS *
1) "newIng"
127.0.0.1:6379> TYPE newIng
stream

確認發送成功,消息是以 stream 類型保存在 redis 中的,key 名稱就是 topic 名稱。

應用B訂閱消息

nuget 安裝 Dapr.AspNetCore 並在 Startup 的 ConfigureServices 中添加 AddDapr

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc().AddDapr();
}

在 Configure 中添加 endpoints.MapSubscribeHandler()

app.UseEndpoints(endpoints =>
{
    endpoints.MapSubscribeHandler();
    // ...
});

實現訂閱消息的 Controller,並在 Action 處添加 Topic 屬性

[ApiController]
public class SubscriptionController : ControllerBase
{
    private readonly ILogger _logger;

    public SubscriptionController(ILogger<SubscriptionController> logger)
    {
        _logger = logger;
    }

    [Topic("pubsub", "newIng")]
    [HttpPost("/sub/newing")]
    public IActionResult NewIng(Ing ing)
    {
        _logger.LogInformation(
            "Received message: {Content} {DateAdded}",
            ing.Content,
            ing.DateAdded.ToString("yyyy-MM-dd HH:mm:ss"));

        return Ok();
    }
}

endpoints.MapSubscribeHandler 的作用是讓 dapr 發現消息訂閱者,dapr 收到消息后會向應用的 /dapr/subscribe 路徑發請求,如果發現有對應消息的訂閱者,會向訂閱者的請求路徑 POST 消息。

對於上面的示例代碼,我們可以進入應用的容器用 curl 命令驗證一下

# curl localhost/dapr/subscribe   
[{"topic":"newIng","pubsubName":"pubsub","route":"sub/newing"}]

點火試驗

應用A對應的是園子的閃存,我發了一條閃存“[dapr]此閃會通過 dapr 向消息隊列發一條消息3”,隨即對應的消息被發出,看看應用B的日志中是否記錄了這條訂閱消息。

Received message: [dapr]此閃會通過 dapr 向消息隊列發一條消息3 2021-11-21 15:19:41

收到了,試驗初步成功。

待解決問題

A default subscription to topic newIng on pubsub pubsub already exists.

  • 調用訂閱者 Action 的鑒權問題,比如調用上面加了 Topic 屬性 NewIng 方法,如果應用是暴露在公網上的,沒有鑒權,就誰都可以調用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM