一、安裝
我這里直接下載的二進制文件
https://github.com/dapr/dapr/releases/download/v1.3.0/daprd_linux_amd64.tar.gz
tar zxvf dapr_linux_amd64.tar.gz
cp dapr /usr/bin/
chmod +x /usr/bin/dapr
dapr init
默認安裝1.3.0版本
二、本地運行程序
編寫Demo程序,一個很簡單的Asp.net core 程序
using DaprTest.Controllers;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace DaprTest
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers().AddDapr();
services.AddActors(options =>
{
options.Actors.RegisterActor<IncrActorOne>();
});
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseRouting();
app.UseCloudEvents();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
endpoints.MapControllers();
endpoints.MapActorsHandlers();
});
}
}
}
1 using Dapr; 2 using Dapr.Actors; 3 using Dapr.Actors.Client; 4 using Dapr.Actors.Runtime; 5 using Dapr.Client; 6 using Microsoft.AspNetCore.Mvc; 7 using Microsoft.Extensions.Logging; 8 using System.Threading.Tasks; 9 10 namespace DaprTest.Controllers 11 { 12 [ApiController] 13 [Route("[controller]/[action]")] 14 public class DaprController : ControllerBase 15 { 16 private readonly ILogger<DaprController> _logger; 17 18 public DaprController(ILogger<DaprController> logger) 19 { 20 _logger = logger; 21 } 22 23 private static object _lockOjbect = new object(); 24 public static int incr = 0; 25 26 [HttpGet] 27 public async Task<int> Incr() 28 { 29 int value = 0; 30 lock(_lockOjbect) 31 { 32 value= ++incr; 33 } 34 35 return await Task.FromResult(value); 36 } 37 38 [HttpGet] 39 public async Task<int> IncrActor() 40 { 41 var actorId = new ActorId("123"); 42 var actor = ActorProxy.Create<IIncrActor>(actorId,nameof(IncrActorOne)); 43 var counterValue= await actor.IncrAcor(); 44 45 return counterValue; 46 } 47 48 [Topic("redis-pubsub", "daprpub")] 49 [HttpGet] 50 public async Task<int> Pub([FromServices] DaprClient daprClient) 51 { 52 var eventData = new EnventData { Id = "17", Amount = 10m, }; 53 await daprClient.PublishEventAsync("redis-pubsub", "daprpub", eventData); 54 return await Task.FromResult(4); 55 } 56 57 58 [HttpPost] 59 public string Sub([FromBody]EnventData enventData) 60 { 61 _logger.LogInformation($"來啦,小老弟!{enventData.Amount},{enventData.Id}"); 62 return "123"; 63 } 64 65 [Route("/kafka-binding")] 66 [HttpPost] 67 public string KafkaBinding([FromBody] EnventData enventData) 68 { 69 _logger.LogInformation($"來啦,小老弟!{enventData.Amount},{enventData.Name}"); 70 return "123"; 71 } 72 } 73 74 public class EnventData 75 { 76 public string Name { get; set; } 77 78 public decimal Amount { get; set; } 79 } 80 public interface IIncrActor: IActor 81 { 82 Task<int> IncrAcor(); 83 } 84 85 public class IncrActorOne : Actor, IIncrActor 86 { 87 private static int COUNT = 0; 88 public IncrActorOne(ActorHost host) : base(host) { } 89 90 public async Task<int> IncrAcor() 91 { 92 return await Task.FromResult(++COUNT); 93 } 94 95 public async Task<int> IncrStateAcor() 96 { 97 var currntScore = await StateManager.AddOrUpdateStateAsync("score", 1,(key, currentScore) => currentScore + 1); 98 return currntScore; 99 } 100 } 101 }
dapr run --app-id DaprTest --app-port 5000 dotnet run
這里要注意 --app-port參數,我之前沒指定這個參數一直報無法通信。
三、k8s運行demo
要在k8s中運行dapr首先要在k8s中安裝dapr相關的組件
dapr init -k
首先要確保執行命令的服務器上能夠連接上k8s服務器,等待安裝成功。


截圖中使用的k8s管理工具kuboard,在測試k8s集群中安裝了Dapr、Istio(后續一定要記一些istio、Envoy的知識點)。
安裝成功后就可以將Demo程序打鏡像發布到k8s中了。
FROM mcr.microsoft.com/dotnet/aspnet:5.0 AS base WORKDIR /app EXPOSE 80 FROM mcr.microsoft.com/dotnet/sdk:5.0 AS build WORKDIR /src COPY ["DaprTest/DaprTest.csproj", "DaprTest/"] RUN dotnet restore "DaprTest/DaprTest.csproj" COPY . . WORKDIR "/src/DaprTest" RUN dotnet build "DaprTest.csproj" -c Release -o /app/build FROM build AS publish RUN dotnet publish "DaprTest.csproj" -c Release -o /app/publish FROM base AS final WORKDIR /app COPY --from=publish /app/publish . RUN ls ENTRYPOINT ["dotnet", "DaprTest.dll"]
docker build -t 鏡像名 docker push //推送鏡像
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: dotnet-daprtest
namespace: dapr-test
spec:
hosts:
- "*"
gateways:
- dapr-test-gateway
http:
- match:
- uri:
prefix: /betadaprtest/
rewrite:
uri: /
route:
- destination:
host: dotnet-daprtest
port:
number: 80
---
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: dapr-test-gateway
namespace: dapr-test
spec:
selector:
istio: ingressgateway
servers:
- hosts:
- xxx.xxx.com
port:
name: http
number: 80
protocol: HTTP
apiVersion: v1
kind: Service
metadata:
name: dotnet-daprtest
namespace: dapr-test
labels:
app: dotnet-daprtest
service: dotnet-daprtest
spec:
ports:
- port: 80
name: http
selector:
app: dotnet-daprtest
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: dapr-test
name: dotnet-daprtest
spec:
replicas: 1
selector:
matchLabels:
app: dotnet-daprtest
template:
metadata:
labels:
app: dotnet-daprtest
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "dotnet-daprtest"
dapr.io/app-port: "80"
spec:
imagePullSecrets:
- name: xxxx
restartPolicy: Always
containers:
- name: dotnet-daprtest
image: xxxxx
ports:
- containerPort: 80
virtualservice、gateway 定義如何訪問服務。
出於測試目的就沒有在命名空間級別啟用dapr 注入,直接在metadata -》annotaions 中啟用dapr。
pod啟動成功后就可以看到Pod中啟動了兩個容器,一個是Demo本身,還有一個就是daprd的邊車容器

可以看下daprd容器中的日志,看看這個容器在啟動時都做了些什么

可以看到都是一些初始化,加載component組件等,這里要注意,如果發現daprd容器啟動失敗,可以從這個容器日志看到一些信息,一般都是一些組件加載失敗,找到對應的組件修改即可。
也可以用命令查看daprd容器日志,kubectl logs -f --tail=100 -n 命名空間 podId -c daprd


上面兩張圖就是因為kafka組件無法正確的連接到kafka服務端導致容器無法啟動,進而導致整個Pod無法正常被調度。
四、StatusDemo
dapr在初始化k8s組件時並不會初始化Status組件(單機初始化時會創建一個redis 作為status實現),所以需要我們自行安裝redis或者其他dapr支持的存儲組件,Demo里使用自建的redis
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
namespace: dapr-test
spec:
metadata:
- name: redisHost
value: 'xxx:6379'
- name: actorStateStore
value: 'true'
- name: redisDB
value: '17'
type: state.redis
version: v1
五、Actor編程模型
對於Actor並發編程模型的定義就不過多贅述了。
對於Actor只需要在代碼中預先定義好,並注冊即可
[httpGet]
public async Task<int> IncrActor()
{
var actorId=new ActorId("123");
var actor= ActorProxy.Create<IIncrActor>(actorId,nameof(IncrActorOne));
var counterValue = await actor.incrAcor();
}
public interface IIncrActor: IActor { Task<int> IncrAcor(); } public class IncrActorOne : Actor, IIncrActor { private static int COUNT = 0; public IncrActorOne(ActorHost host) : base(host) { } public async Task<int> IncrAcor() { return await Task.FromResult(++COUNT); } public async Task<int> IncrStateAcor() { var currntScore = await StateManager.AddOrUpdateStateAsync("score", 1,(key, currentScore) => currentScore + 1); return currntScore; } } public void ConfigureServices(IServiceCollection services) { services.AddControllers().AddDapr(); services.AddActors(options => { options.Actors.RegisterActor<IncrActorOne>(); }); }
Actor是動態,可以根據實際情況動態添加,例如根據用戶id等標識一個Actor,可以在Dapr自帶的Dashboard中查看Actor

它在redis中的存儲結構

六、發布訂閱
發布訂閱也同樣需要定義Component組件
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: redis-pubsub
namespace: dapr-test
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: xxxx:6379
- name: consumerID
value: "myGroup"
- name: enableTLS
value: "false"
- name: redisPassword
value: 12345678
---
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: dotnet-subscription
namespace: dapr-test
spec:
pubsubname: redis-pubsub
topic: daprpub
route: /Dapr/Sub
scopes:
- dotnet-daprtest
這里定義了 發布組件、訂閱組件,使用Redis實現。使用方式如下
[HttpGet] public async Task<int> Pub([FromServices] DaprClient daprClient) { var eventData = new EnventData { Id = "17", Amount = 10m, }; await daprClient.PublishEventAsync("redis-pubsub", "daprpub", eventData);//redis-pubsub 要使用的組件名稱,daprpub topic return await Task.FromResult(4); }
如果想更改實現組件,例如從redis切換到kafka,只需要更改一下 發布組件即可
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: redis-pubsub namespace: dapr-test spec: type: pubsub.kafka version: v1 metadata: - name: brokers value: "xxxxx:9092" - name: consumerGroup value: "daprtest" - name: clientID value: "dotnet-daprtest" - name: authRequired value: "false" - name: maxMessageBytes value: 1024
組件名稱依舊是 redis-pubsub,但是實現已經更改為了 kafka。需要注意的是,這里雖然更改了組件但是因為daprd容器並沒有監聽這種變化,這就導致了要手動去重啟pod,這里dapr官方后期應該會被優化掉。
這里需要注意的是,發布訂閱的數據格式一定要正確,不能通過其他客戶端直接向發布組件發送不正確的消息格式,一旦發送錯誤格式的數據會導致daprd容器報錯,訂閱方無法收到消息。
正確的數據格式如下:
{"traceid":"00-86b349b9b127754792b69a6bba049c40-4489f813ea9fab64-00","id":"9f5fc119-60ee-4fef-b7a7-b2ecd31e7da7","datacontenttype":"application/json","type":"com.dapr.event.sent","pubsubname":"redis-pubsub","data":{"name":"17","amount":10},"specversion":"1.0","source":"dotnet-daprtest","topic":"daprpub"}
我在測試時使用了kafka 客戶端工具發送了錯誤的數據導致daprd容器一直報錯,訂閱端死活收不到消息。查看daprd容器日志發現是有錯誤的數據。查看daprd容器日志是排查錯誤的一個有效方法。
七、bindings
bindings同樣也是添加component即可
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-binding
namespace: -test
spec:
metadata:
- name: topics
value: daprtest
- name: brokers
value: 'ffffff:9092'
- name: consumerGroup
value: daprtest
- name: publishTopic
value: daprtest
- name: authRequired
value: false
- name: maxMessageBytes
value: 1024
type: bindings.kafka
version: v1
這里需要注意的是 bindings是通http通信的 這就需要http的url要與bindings組件名一致,當然也有可能是我理解的不到位。
測試時可以用kafka測試工具發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic daprtest 這個命令會卡住shell,並接收用戶輸入的數據

輸入對應的數據,就可以在程序中看到Demo的方法有沒執行。
