dapr入門學習


一、安裝

我這里直接下載的二進制文件

https://github.com/dapr/dapr/releases/download/v1.3.0/daprd_linux_amd64.tar.gz
tar zxvf dapr_linux_amd64.tar.gz
cp dapr /usr/bin/
chmod +x /usr/bin/dapr
dapr init
默認安裝1.3.0版本

二、本地運行程序
編寫Demo程序,一個很簡單的Asp.net core 程序

using DaprTest.Controllers;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace DaprTest
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers().AddDapr();
            
            services.AddActors(options =>
            {
                options.Actors.RegisterActor<IncrActorOne>();
            });
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            app.UseRouting();
            app.UseCloudEvents();
            app.UseAuthorization();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapSubscribeHandler();
                endpoints.MapControllers();
                endpoints.MapActorsHandlers();
            });
        }
    }
}

  

  1 using Dapr;
  2 using Dapr.Actors;
  3 using Dapr.Actors.Client;
  4 using Dapr.Actors.Runtime;
  5 using Dapr.Client;
  6 using Microsoft.AspNetCore.Mvc;
  7 using Microsoft.Extensions.Logging;
  8 using System.Threading.Tasks;
  9 
 10 namespace DaprTest.Controllers
 11 {
 12     [ApiController]
 13     [Route("[controller]/[action]")]
 14     public class DaprController : ControllerBase
 15     {
 16         private readonly ILogger<DaprController> _logger;
 17         
 18         public DaprController(ILogger<DaprController> logger)
 19         {
 20             _logger = logger;
 21         }
 22         
 23         private static object _lockOjbect = new object();
 24         public static int incr = 0;
 25 
 26         [HttpGet]
 27         public async Task<int> Incr()
 28         {
 29             int value = 0;
 30             lock(_lockOjbect)
 31             {
 32                 value= ++incr;
 33             }
 34 
 35             return await Task.FromResult(value);
 36         }
 37 
 38         [HttpGet]
 39         public async Task<int> IncrActor()
 40         {
 41             var actorId = new ActorId("123");
 42             var actor =  ActorProxy.Create<IIncrActor>(actorId,nameof(IncrActorOne));
 43             var counterValue= await actor.IncrAcor();
 44 
 45             return counterValue;
 46         }
 47 
 48         [Topic("redis-pubsub", "daprpub")]
 49         [HttpGet]
 50         public async Task<int> Pub([FromServices] DaprClient daprClient)
 51         {
 52             var eventData = new EnventData { Id = "17", Amount = 10m, };
 53             await daprClient.PublishEventAsync("redis-pubsub", "daprpub", eventData);
 54             return await Task.FromResult(4);
 55         }
 56 
 57         
 58         [HttpPost]
 59         public string Sub([FromBody]EnventData enventData)
 60         {
 61             _logger.LogInformation($"來啦,小老弟!{enventData.Amount},{enventData.Id}");
 62             return "123";
 63         }
 64 
 65         [Route("/kafka-binding")]
 66         [HttpPost]
 67         public string KafkaBinding([FromBody] EnventData enventData)
 68         {
 69             _logger.LogInformation($"來啦,小老弟!{enventData.Amount},{enventData.Name}");
 70             return "123";
 71         }
 72     }
 73 
 74     public class EnventData
 75     {
 76         public string Name { get; set; }
 77 
 78         public decimal Amount { get; set; }
 79     }
 80     public interface IIncrActor: IActor
 81     {
 82         Task<int> IncrAcor();
 83     }
 84 
 85     public class IncrActorOne : Actor, IIncrActor
 86     {
 87         private static int COUNT = 0;
 88         public IncrActorOne(ActorHost host) : base(host) { }
 89 
 90         public async Task<int> IncrAcor()
 91         {
 92             return await Task.FromResult(++COUNT);
 93         }
 94 
 95         public async Task<int> IncrStateAcor()
 96         {
 97             var currntScore = await StateManager.AddOrUpdateStateAsync("score", 1,(key, currentScore) => currentScore + 1);
 98             return currntScore;
 99         }
100     }
101 }
View Code
dapr run --app-id DaprTest --app-port 5000 dotnet run

  這里要注意 --app-port參數,我之前沒指定這個參數一直報無法通信。

三、k8s運行demo

      要在k8s中運行dapr首先要在k8s中安裝dapr相關的組件

dapr init -k

  首先要確保執行命令的服務器上能夠連接上k8s服務器,等待安裝成功。

截圖中使用的k8s管理工具kuboard,在測試k8s集群中安裝了Dapr、Istio(后續一定要記一些istio、Envoy的知識點)。

安裝成功后就可以將Demo程序打鏡像發布到k8s中了。

FROM mcr.microsoft.com/dotnet/aspnet:5.0 AS base
WORKDIR /app
EXPOSE 80

FROM mcr.microsoft.com/dotnet/sdk:5.0 AS build
WORKDIR /src
COPY ["DaprTest/DaprTest.csproj", "DaprTest/"]
RUN dotnet restore "DaprTest/DaprTest.csproj"
COPY . .
WORKDIR "/src/DaprTest"
RUN dotnet build "DaprTest.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "DaprTest.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
RUN ls
ENTRYPOINT ["dotnet", "DaprTest.dll"]
docker build -t 鏡像名
docker push //推送鏡像

  

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: dotnet-daprtest
  namespace: dapr-test
spec:
  hosts:
  - "*"
  gateways:
  - dapr-test-gateway
  http:
  - match:
    - uri:
        prefix: /betadaprtest/
    rewrite:
      uri: /
    route:
    - destination:
        host: dotnet-daprtest
        port:
          number: 80
---
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: dapr-test-gateway
  namespace: dapr-test
spec:
  selector:
    istio: ingressgateway
  servers:
    - hosts:
        - xxx.xxx.com
      port:
        name: http
        number: 80
        protocol: HTTP
apiVersion: v1
kind: Service
metadata:
  name: dotnet-daprtest
  namespace: dapr-test
  labels:
    app: dotnet-daprtest
    service: dotnet-daprtest
spec:
  ports:
  - port: 80
    name: http
  selector:
    app: dotnet-daprtest
apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: dapr-test
  name: dotnet-daprtest
spec:
  replicas: 1
  selector:
    matchLabels:
      app: dotnet-daprtest
  template: 
    metadata:
      labels:
        app: dotnet-daprtest
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "dotnet-daprtest"
        dapr.io/app-port: "80"
    spec:
      imagePullSecrets:
      - name: xxxx
      restartPolicy: Always
      containers:
      - name: dotnet-daprtest
        image: xxxxx
        ports:
        - containerPort: 80

 

virtualservice、gateway 定義如何訪問服務。

出於測試目的就沒有在命名空間級別啟用dapr 注入,直接在metadata -》annotaions 中啟用dapr。

pod啟動成功后就可以看到Pod中啟動了兩個容器,一個是Demo本身,還有一個就是daprd的邊車容器

可以看下daprd容器中的日志,看看這個容器在啟動時都做了些什么

可以看到都是一些初始化,加載component組件等,這里要注意,如果發現daprd容器啟動失敗,可以從這個容器日志看到一些信息,一般都是一些組件加載失敗,找到對應的組件修改即可。

也可以用命令查看daprd容器日志,kubectl logs -f --tail=100 -n 命名空間  podId  -c daprd

上面兩張圖就是因為kafka組件無法正確的連接到kafka服務端導致容器無法啟動,進而導致整個Pod無法正常被調度。

四、StatusDemo

dapr在初始化k8s組件時並不會初始化Status組件(單機初始化時會創建一個redis 作為status實現),所以需要我們自行安裝redis或者其他dapr支持的存儲組件,Demo里使用自建的redis

---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
  namespace: dapr-test
spec:
  metadata:
    - name: redisHost
      value: 'xxx:6379'
    - name: actorStateStore
      value: 'true'
    - name: redisDB
      value: '17'
  type: state.redis
  version: v1

 

五、Actor編程模型

對於Actor並發編程模型的定義就不過多贅述了。

對於Actor只需要在代碼中預先定義好,並注冊即可

[httpGet]
public async Task<int> IncrActor()
{
  var actorId=new ActorId("123");
var actor= ActorProxy.Create<IIncrActor>(actorId,nameof(IncrActorOne));
var counterValue = await actor.incrAcor();
}
public
interface IIncrActor: IActor { Task<int> IncrAcor(); } public class IncrActorOne : Actor, IIncrActor { private static int COUNT = 0; public IncrActorOne(ActorHost host) : base(host) { } public async Task<int> IncrAcor() { return await Task.FromResult(++COUNT); } public async Task<int> IncrStateAcor() { var currntScore = await StateManager.AddOrUpdateStateAsync("score", 1,(key, currentScore) => currentScore + 1); return currntScore; } } public void ConfigureServices(IServiceCollection services) { services.AddControllers().AddDapr(); services.AddActors(options => { options.Actors.RegisterActor<IncrActorOne>(); }); }

Actor是動態,可以根據實際情況動態添加,例如根據用戶id等標識一個Actor,可以在Dapr自帶的Dashboard中查看Actor

它在redis中的存儲結構

六、發布訂閱

發布訂閱也同樣需要定義Component組件

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
  namespace: dapr-test
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: xxxx:6379
  - name: consumerID
    value: "myGroup"
  - name: enableTLS
    value: "false"
  - name: redisPassword
    value: 12345678
---
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: dotnet-subscription
  namespace: dapr-test
spec:
  pubsubname: redis-pubsub
  topic: daprpub
  route: /Dapr/Sub
scopes:
- dotnet-daprtest

這里定義了 發布組件、訂閱組件,使用Redis實現。使用方式如下

        [HttpGet]
        public async Task<int> Pub([FromServices] DaprClient daprClient)
        {
            var eventData = new EnventData { Id = "17", Amount = 10m, };
            await daprClient.PublishEventAsync("redis-pubsub", "daprpub", eventData);//redis-pubsub 要使用的組件名稱,daprpub topic
            return await Task.FromResult(4);
        }

如果想更改實現組件,例如從redis切換到kafka,只需要更改一下 發布組件即可

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
  namespace: dapr-test
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers
    value: "xxxxx:9092"
  - name: consumerGroup
    value: "daprtest"
  - name: clientID
    value: "dotnet-daprtest"
  - name: authRequired
    value: "false"
  - name: maxMessageBytes
    value: 1024

組件名稱依舊是 redis-pubsub,但是實現已經更改為了 kafka。需要注意的是,這里雖然更改了組件但是因為daprd容器並沒有監聽這種變化,這就導致了要手動去重啟pod,這里dapr官方后期應該會被優化掉。

這里需要注意的是,發布訂閱的數據格式一定要正確,不能通過其他客戶端直接向發布組件發送不正確的消息格式,一旦發送錯誤格式的數據會導致daprd容器報錯,訂閱方無法收到消息。

正確的數據格式如下:

{"traceid":"00-86b349b9b127754792b69a6bba049c40-4489f813ea9fab64-00","id":"9f5fc119-60ee-4fef-b7a7-b2ecd31e7da7","datacontenttype":"application/json","type":"com.dapr.event.sent","pubsubname":"redis-pubsub","data":{"name":"17","amount":10},"specversion":"1.0","source":"dotnet-daprtest","topic":"daprpub"}

我在測試時使用了kafka 客戶端工具發送了錯誤的數據導致daprd容器一直報錯,訂閱端死活收不到消息。查看daprd容器日志發現是有錯誤的數據。查看daprd容器日志是排查錯誤的一個有效方法。

七、bindings

bindings同樣也是添加component即可

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-binding
  namespace: -test
spec:
  metadata:
    - name: topics
      value: daprtest
    - name: brokers
      value: 'ffffff:9092'
    - name: consumerGroup
      value: daprtest
    - name: publishTopic
      value: daprtest
    - name: authRequired
      value: false
    - name: maxMessageBytes
      value: 1024
  type: bindings.kafka
  version: v1

這里需要注意的是 bindings是通http通信的  這就需要http的url要與bindings組件名一致,當然也有可能是我理解的不到位。

測試時可以用kafka測試工具發送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic daprtest  這個命令會卡住shell,並接收用戶輸入的數據

輸入對應的數據,就可以在程序中看到Demo的方法有沒執行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM