The Dapr publish & subscribe building block
Dapr 發布 & 訂閱構建塊
The Publish-Subscribe pattern (often referred to as "pub/sub") is a well-known and widely used messaging pattern. Architects commonly embrace it in distributed applications. However, the plumbing to implement it can be complex. There are often subtle feature differences across different messaging products. Dapr offers a building block that significantly simplifies implementing pub/sub functionality.
發布-訂閱模式 (通常稱為 "發布/訂閱" ) 是眾所周知且廣泛使用的消息模式。 架構師通常在分布式應用程序中采用它。 但是,實現可能會很復雜。 在不同的消息隊列產品中,通常會有細微的功能差異。 Dapr 提供了一個構建基塊,可顯著簡化實現發布/訂閱功能。
What it solves
解決方法
The primary advantage of the Publish-Subscribe pattern is loose coupling, sometimes referred to as temporal decoupling. The pattern decouples services that send messages (the publishers) from services that consume messages (the subscribers). Both publishers and subscribers are unaware of each other - both are dependent on a centralized message broker that distributes the messages.
發布/訂閱 模式的主要優點是松耦合,有時稱為時間上解耦。 此模式分離服務為發送消息的服務(稱為發布者)和消費消息的服務(稱為訂閱者)。 發布者和訂閱者都不知道對方的存在,兩者都依賴於分發消息的集中式消息代理。
Figure 7-1 shows the high-level architecture of the pub/sub pattern.
圖7-1 顯示了發布/訂閱模式的高層架構。
Figure 7-1. The pub/sub pattern.
圖 7-1。 發布/訂閱模式。
From the previous figure, note the steps of the pattern:
- Publishers send messages to the message broker.
- Subscribers bind to a subscription on the message broker.
- The message broker forwards a copy of the message to interested subscriptions.
- Subscribers consume messages from their subscriptions.
從上圖中,注意模式的步驟:
- 發布者將消息發送到消息代理。
- 訂閱者綁定到消息代理上的訂閱。
- 消息代理將消息的副本轉發給感興趣的訂閱。
- 訂閱者從其訂閱消費消息。
Most message brokers encapsulate a queueing mechanism that can persist messages once received. With it, the message broker guarantees durability by storing the message. Subscribers don't need to be immediately available or even online when a publisher sends a message. Once available, the subscriber receives and processes the message. Dapr guarantees At-Least-Once semantics for message delivery. Once a message is published, it will be delivered at least once to any interested subscriber.
大多數消息代理封裝了一個排隊機制,這種機制可以保證在收到消息后可以持久保存消息的。 利用它,消息代理通過存儲消息來保證 持久性 。 當發布者發送消息時,訂閱者無需立即就緒,甚至不需要在線。 一旦訂閱者就緒,訂閱者將接收並處理消息。 Dapr 為消息傳遞 提供至少一次的語義保證。 發布消息后,該消息將至少傳遞一次到任何相關訂閱者。
If your service can only process a message once, you'll need to provide an idempotency check to ensure that the same message is not processed multiple times. While such logic can be coded, some message brokers, such as Azure Service Bus, provide built-in messaging capabilities.
如果要求服務只能處理一次消息,則需要提供 冪等性檢查 ,確保不會多次處理同一消息。 雖然可以編碼這種邏輯,但某些消息代理(如 Azure Service Bus)提供內置的消息傳遞 重復檢測 功能。
There are several message broker products available - both commercially and open-source. Each has advantages and drawbacks. Your job is to match your system requirements to the appropriate broker. Once selected, it's a best practice to decouple your application from message broker plumbing. You achieve this functionality by wrapping the broker inside an abstraction. The abstraction encapsulates the message plumbing and exposes generic pub/sub operations to your code. Your code communicates with the abstraction, not the actual message broker. While a wise decision, you'll have to write and maintain the abstraction and its underlying implementation. This approach requires custom code that can be complex, repetitive, and error-prone.
有多種可用的消息代理產品-商業和開源。 各有優缺點。 你需要按系統要求選擇相應的代理。 選擇后,最佳做法是分離應用程序與消息代理系統。 可以通過抽象 代理來實現此功能。 抽象封裝消息代理並向代碼公開通用的發布/訂閱操作。 你的代碼與抽象交互,而不是實際的消息代理。 明智的決定是,你必須編寫和維護抽象及其底層實現。 此方法需要自定義代碼,這些代碼可能會很復雜、重復並且容易出錯。
The Dapr publish & subscribe building block provides the messaging abstraction and implementation out-of-the-box. The custom code you would have had to write is prebuilt and encapsulated inside the Dapr building block. You bind to it and consume it. Instead of writing messaging plumbing code, you and your team focus on creating business functionality that adds value to your customers.
Dapr 發布 & 訂閱構建塊提供開箱即用的消息傳遞抽象和實現。 以前您必須編寫的自定義代碼已在 Dapr 構建基塊內預構建和封裝。 綁定到並使用這些封裝。 你和你的團隊只需聚焦於創建能為客戶帶來價值的業務功能上,而無需再編寫消息管道代碼。
How it works
工作原理
The Dapr publish & subscribe building block provides a platform-agnostic API framework to send and receive messages. Your services publish messages to a named topic. Your services subscribe to a topic to consume messages.
Dapr 發布 & 訂閱構建塊提供了平台無關的 API 框架來發送和接收消息。 你的服務將消息發布到一個命名 主題。 你的服務訂閱主題來消費消息。
The service calls the pub/sub API on the Dapr sidecar. The sidecar then makes calls into a pre-defined Dapr pub/sub component that encapsulates a specific message broker product. Figure 7-2 shows the Dapr pub/sub messaging stack.
服務在 Dapr 邊車上調用 pub/sub API。 然后,邊車將調用一個預定義的 Dapr pub/sub 組件(封裝了特定的消息代理產品)。 圖7-2 顯示了 Dapr pub/sub 消息傳遞棧。
Figure 7-2. The Dapr pub/sub stack.
圖 7-2。 Dapr pub/sub 棧。
The Dapr publish & subscribe building block can be invoked in many ways.
可以通過多種方式調用 Dapr 發布 & 訂閱構建塊。
At the lowest level, any programming platform can invoke the building block over HTTP or gRPC using the Dapr native API. To publish a message, you make the following API call:
在最低級別,任何編程平台均可通過 HTTP 或 gRPC 使用 Dapr 本機 API 來調用構建塊。 若要發布消息,請執行以下 API 調用:
http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>
There are several Dapr specific URL segments in the above call:
<dapr-port>
provides the port number upon which the Dapr sidecar is listening.<pub-sub-name>
provides the name of the selected Dapr pub/sub component.<topic>
provides the name of the topic to which the message is published.
以上調用中有幾個 Dapr 特定的 URL 段:
<dapr-port>
提供 Dapr 邊車正在偵聽的端口號。<pub-sub-name>
提供選定的 Dapr pub/sub 組件的名稱。<topic>
提供消息要發布到的主題的名稱。
Using the curl command-line tool to publish a message, you can try it out:
可以嘗試一下使用 curl 的命令行工具發布消息:
curl -X POST http://localhost:3500/v1.0/publish/pubsub/newOrder \ -H "Content-Type: application/json" \ -d '{ "orderId": "1234", "productId": "5678", "amount": 2 }'
You receive messages by subscribing to a topic. At startup, the Dapr runtime will call the application on a well-known endpoint to identify and create the required subscriptions:
通過訂閱主題來接收消息。 在啟動時,Dapr 運行時將調用應用程序上的已知終結點來識別和創建所需的訂閱:
http://localhost:<appPort>/dapr/subscribe
<appPort>
informs the Dapr sidecar of the port upon which the application is listening.<appPort>
通知 Dapr 邊車應用程序偵聽的端口。
You can implement this endpoint yourself. But Dapr provides more intuitive ways of implementing it. We'll address this functionality later in this chapter.
可以自行實現此終結點。 但 Dapr 提供了更直觀的實現方法。 本章稍后將介紹此功能。
The response from the call contains a list of topics to which the applications will subscribe. Each includes an endpoint to call when the topic receives a message. Here's an example of a response:
對此終結點調用的響應包含應用程序將訂閱的主題的列表。 每個都包括在主題收到消息時要調用的終結點。 下面是響應的示例:
[ { "pubsubname": "pubsub", "topic": "newOrder", "route": "/orders" }, { "pubsubname": "pubsub", "topic": "newProduct", "route": "/productCatalog/products" } ]
In the JSON response, you can see the application wants to subscribe to topics newOrder
and newProduct
. It registers the endpoints /orders
and /productCatalog/products
for each, respectively. For both subscriptions, the application is binding to the Dapr component named pubsub
.
在 JSON 響應中,可以看到應用程序要訂閱主題 newOrder
和 newProduct
。分別為每個終結點注冊終結點 /orders和
/productCatalog/products
。 對於這兩個訂閱,應用程序將綁定到名為的 Dapr 組件 pubsub
。
Figure 7-3 presents the flow of the example.
圖7-3 顯示了該示例的消息流。
Figure 7-3. pub/sub flow with Dapr.
圖 7-3。 Dapr 的發布/訂閱流。
From the previous figure, note the flow:
- The Dapr sidecar for Service B calls the
/dapr/subscribe
endpoint from Service B (the consumer). The service responds with the subscriptions it wants to create. - The Dapr sidecar for Service B creates the requested subscriptions on the message broker.
- Service A publishes a message at the
/v1.0/publish/<pub-sub-name>/<topic>
endpoint on the Dapr Service A sidecar. - The Service A sidecar publishes the message to the message broker.
- The message broker sends a copy of the message to the Service B sidecar.
- The Service B sidecar calls the endpoint corresponding to the subscription (in this case
/orders
) on Service B. The service responds with an HTTP status-code200 OK
so the sidecar will consider the message as being handled successfully.
在上圖中,請注意以下 消息流:
- 服務B的Dapr邊車調用服務B(訂閱者)的終結點
/dapr/subscribe
。 服務會返回它要創建的訂閱作為響應。 - 服務 B 的 Dapr 邊車在消息代理上創建要求的訂閱。
- 服務 A 在其Dapr邊車的終結點
/v1.0/publish/<pub-sub-name>/<topic>
上發布一條消息。 - 服務 A 的Dapr邊車將消息發布到消息代理。
- 消息代理發送消息副本到服務 B 的Dapr邊車。
- 服務 B 的Dapr邊車調用服務 b 上與訂閱對應的終結點(此處為 /orders)。服務以 HTTP 狀態碼
200 OK
進行響應,邊車將認為消息已被成功處理。
In the example, the message is handled successfully. But if something goes wrong while Service B is handling the request, it can use the response to specify what needs to happen with the message. When it returns an HTTP status-code 404
, an error is logged and the message is dropped. With any other status-code than 200
or 404
, a warning is logged and the message is retried. Alternatively, Service B can explicitly specify what needs to happen with the message by including a JSON payload in the body of the response:
示例中,消息已被成功處理。 但是,如果服務 B 處理請求時出現問題,則可以使用響應指定需要對消息執行的操作。 當它返回 HTTP 狀態碼404時
,將記錄錯誤並丟棄消息。 對於任何其他狀態碼(非200
404)
,將記錄警告,並重試消息。 或者,服務 B 可以通過在響應正文中包含 JSON 負荷,顯式指定需要對消息執行的操作。
{ "status": "<status>" }
The following table shows the available status
values:
Status | Action |
---|---|
SUCCESS | The message is considered as processed successfully and dropped. |
RETRY | The message is retried. |
DROP | A warning is logged and the message is dropped. |
Any other status | The message is retried. |
下表顯示了 status的可用
值:
狀態 | 操作 |
---|---|
SUCCESS | 消息被視為已成功處理和丟棄。 |
RETRY | 重試消息。 |
DROP | 將記錄警告,並丟棄消息。 |
任何其他狀態 | 重試消息。 |
Competing consumers
消費者競爭
When scaling out an application that subscribes to a topic, you have to deal with competing consumers. Only one application instance should handle a message sent to the topic. Luckily, Dapr handles that problem. When multiple instances of a service with the same application-id subscribe to a topic, Dapr delivers each message to only one of them.
橫向擴展訂閱某個主題的應用程序時,必須處理消費者競爭。 只有一個應用程序實例應處理發送到主題的消息。 幸運的是,Dapr 處理這一問題。 當具有相同應用程序 id 的服務的多個實例訂閱主題時,Dapr 僅將每條消息傳遞給其中的一個。
SDKs
Making HTTP calls to the native Dapr APIs is time-consuming and abstract. Your calls are crafted at the HTTP level, and you'll need to handle plumbing concerns such as serialization and HTTP response codes. Fortunately, there's a more intuitive way. Dapr provides several language-specific SDKs for popular development platforms. At the time of this writing, Go, Node.js, Python, .NET, Java, and JavaScript are available.
對本機 Dapr Api 進行 HTTP 調用非常耗時且更抽象。 你的調用是HTTP 級別的,你將需要處理諸如序列化和 HTTP 響應代碼這樣的管道相關問題。 幸運的是,有一種更直觀的方式。 Dapr 為常用開發平台提供多種語言特定的 Sdk。 撰寫本文時,可以使用 Node.js、Python、.NET、Java 和 JavaScript。
Use the Dapr .NET SDK
使用 Dapr .NET SDK
For .NET Developers, the Dapr .NET SDK provides a more productive way of working with Dapr. The SDK exposes a DaprClient
class through which you can directly invoke Dapr functionality. It's intuitive and easy to use.
對於 .NET 開發人員而言, Dapr .NET SDK 提供了更高效的 Dapr 處理方式。 SDK 公開了一個 DaprClient
類,通過該類可以直接調用 Dapr 功能,直觀且易於使用。
To publish a message, the DaprClient
exposes a PublishEventAsync
method.
DaprClient
公開了一個 PublishEventAsync
方法來發布消息 。
var data = new OrderData { orderId = "123456", productId = "67890", amount = 2 }; var daprClient = new DaprClientBuilder().Build(); await daprClient.PublishEventAsync<OrderData>("pubsub", "newOrder", data);
- The first argument
pubsub
is the name of the Dapr component that provides the message broker implementation. We'll address components later in this chapter. - The second argument
neworder
provides the name of the topic to send the message to. - The third argument is the payload of the message.
- You can specify the .NET type of the message using the generic type parameter of the method.
- 第一個參數
pubsub
是提供消息代理實現的 Dapr 組件的名稱。 本章稍后將介紹這些組件。 - 第二個參數
neworder
提供要向其發送消息的主題的名稱。 - 第三個參數是消息的載體。
- 您可以使用方法的泛型類型參數來指定消息的 .NET 類型。
To receive messages, you bind an endpoint to a subscription for a registered topic. The AspNetCore library for Dapr makes this trivial. Assume, for example, that you have an existing ASP.NET WebAPI action method entitled CreateOrder
:
若要接收消息,請將終結點綁定到訂閱。 用於 Dapr 的 AspNetCore 庫使此變得簡單。 例如,假設你有一個名為 CreateOrder
的 ASP.NET WebAPI 操作方法 :
[HttpPost("/orders")] public async Task<ActionResult> CreateOrder(Order order)
You must add a reference to the Dapr.AspNetCore NuGet package in your project to consume the Dapr ASP.NET Core integration.
必須在項目中添加對 Dapr.AspNetCore NuGet 包的引用,才能使用 Dapr ASP.NET Core 集成。
To bind this action method to a topic, you decorate it with the Topic
attribute:
若要將此操作方法綁定到主題,請使用Topic特性
對其進行修飾 :
[Topic("pubsub", "newOrder")] [HttpPost("/orders")] public async Task<ActionResult> CreateOrder(Order order)
You specify two key elements with this attribute:
- The Dapr pub/sub component to target (in this case
pubsub
). - The topic to subscribe to (in this case
newOrder
).
指定此特性的兩個關鍵元素:
- Dapr的發布/訂閱組件(此處為pubsub)
- 訂閱的主題 (此處為
newOrder
) 。
Dapr then invokes that action method as it receives messages for that topic.
Dapr 將調用該操作方法,以接收該主題的消息。
You'll also need to enable ASP.NET Core to use Dapr. The Dapr .NET SDK provides several extension methods that can be invoked in the Startup
class.
還需要啟用 ASP.NET Core 來使用 Dapr。 Dapr .NET SDK 提供了可在Startup
類中調用的多個擴展方法 。
In the ConfigureServices
method, you must add the following extension method:
在 ConfigureServices
方法中,需要添加以下擴展方法:
public void ConfigureServices(IServiceCollection services) { // ... services.AddControllers().AddDapr(); }
Appending the AddDapr
extension-method to the AddControllers
extension-method registers the necessary services to integrate Dapr into the MVC pipeline. It also registers a DaprClient
instance into the dependency injection container, which then can be injected anywhere into your service.
將 AddDapr
擴展方法追加到 AddControllers
擴展方法會注冊必要的服務,以將 Dapr 集成到 MVC 管道中。 它還將 DaprClient
實例注冊到依賴關系注入容器,將來可以在任何需要DaprClient的服務中注入它。
In the Configure
method, you must add the following middleware components to enable Dapr:
在 Configure
方法中,必須添加以下中間件組件來啟用 Dapr:
public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { // ... app.UseCloudEvents(); app.UseEndpoints(endpoints => { endpoints.MapSubscribeHandler(); // ... }); }
The call to UseCloudEvents
adds CloudEvents middleware into to the ASP.NET Core middleware pipeline. This middleware will unwrap requests that use the CloudEvents structured format, so the receiving method can read the event payload directly.
對UseCloudEvents
的調用用於將 CloudEvents 中間件添加到 ASP.NET Core 中間件管道。 此中間件將解包使用 CloudEvents 結構化格式的請求,因此接收方法可以直接讀取事件載體。
CloudEvents is a standardized messaging format, providing a common way to describe event information across platforms. Dapr embraces CloudEvents. For more information about CloudEvents, see the cloudevents specification.
CloudEvents 是一種標准化的消息傳遞格式,提供跨平台描述事件信息的通用方式。 Dapr 采用 CloudEvents。 有關 CloudEvents 的詳細信息,請參閱 CloudEvents 規范。
The call to MapSubscribeHandler
in the endpoint routing configuration will add a Dapr subscribe endpoint to the application. This endpoint will respond to requests on /dapr/subscribe
. When this endpoint is called, it will automatically find all WebAPI action methods decorated with the Topic
attribute and instruct Dapr to create subscriptions for them.
終結點路由配置中對MapSubscribeHandler的調用會為應用程序添加Dapr 訂閱終結點。 此終結點將響應到/dapr/subscribe
上的請求 。 調用此終結點時,它將自動查找所有用Topic
特性修飾的WebAPI 操作方法, 並指示 Dapr 為它們創建訂閱。
Pub/sub components
Pub/sub 組件
Dapr pub/sub components handle the actual transport of the messages. Several are available. Each encapsulates a specific message broker product to implement the pub/sub functionality. At the time of writing, the following pub/sub components were available:
Dapr pub/sub 組件 處理消息的實際傳輸。 有多個底層實現可用。 每個都封裝特定消息代理產品以實現發布/訂閱功能。 撰寫本文時,可以使用以下發布/訂閱組件:
- Apache Kafka
- Azure Event Hubs
- Azure Service Bus
- AWS SNS/SQS
- GCP Pub/Sub
- Hazelcast
- MQTT
- NATS
- Pulsar
- RabbitMQ
- Redis Streams
Note
The Azure cloud stack has both messaging functionality (Azure Service Bus) and event streaming (Azure Event Hub) availability.
備注
Azure cloud stack 具有消息傳遞(Azure Service Bus)和事件流(Azure Event Hub) 功能。
These components are created by the community in a component-contrib repository on GitHub. You're encouraged to write your own Dapr component for a message broker that isn't yet supported.
這些組件由 GitHub 上的 component-contrib存儲庫中的社區創建。 建議為尚不受支持的消息代理編寫自己的 Dapr 組件。
Configure pub/sub components
配置發布/訂閱組件
Using a Dapr configuration file, you can specify the pub/sub component(s) to use. This configuration contains several fields. The name
field specifies the pub/sub component that you want to use. When sending or receiving a message, you need to specify this name (as you saw earlier in the PublishEventAsync
method signature).
使用 Dapr 配置文件,您可以指定要使用的發布/訂閱組件。 此配置包含多個字段。 name
字段指定要使用的發布/訂閱組件。 發送或接收消息時,需要指定此名稱, (如之前在PublishEventAsync
方法簽名) 中看到的那樣 。
Below you see an example of a Dapr configuration file for configuring a RabbitMQ message broker component:
下面你將看到一個用於配置 RabbitMQ 消息代理組件的 Dapr 配置文件示例:
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub-rq spec: type: pubsub.rabbitmq version: v1 metadata: - name: host value: "amqp://localhost:5672" - name: durable value: true
In this example, you can see that you can specify any message broker-specific configuration in the metadata
block. In this case, RabbitMQ is configured to create durable queues. But the RabbitMQ component has more configuration options. Each of the components' configuration will have its own set of possible fields. You can read which fields are available in the documentation of each pub/sub component.
在此示例中,可以看到,你可以在metadata
塊中指定任何特定於消息代理的配置 。 此處,RabbitMQ 被配置為創建持久隊列。 但 RabbitMQ 組件具有更多的配置選項。 每個組件的配置將有自己的一組字段。 您可以閱讀每個發布 /訂閱組件的文檔來了解其可用的字段。
Next to the programmatic way of subscribing to a topic from code, Dapr pub/sub also provides a declarative way of subscribing to a topic. This approach removes the Dapr dependency from the application code. Therefore, it also enables an existing application to subscribe to topics without any changes to the code. The following example shows a Dapr configuration file for configuring a subscription:
除了以編程方式訂閱主題外,Dapr pub/sub 還提供了一種聲明方式來訂閱主題。 此方法會從應用程序代碼中刪除 Dapr 依賴項。 然而,它還允許現有應用程序無需更改代碼,就能夠訂閱主題 。以下示例演示了用於配置訂閱的 Dapr 配置文件:
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: newOrder-subscription
spec:
pubsubname: pubsub
topic: newOrder
route: /orders
scopes:
- ServiceB
- ServiceC
You have to specify several elements with every subscription:
- The name of the Dapr pub/sub component you want to use (in this case
pubsub
). - The name of the topic to subscribe to (in this case
newOrder
). - The API operation that needs to be called for this topic (in this case
/orders
). - The scope can specify which services can publish and subscribe to a topic.
必須為每個訂閱指定多個元素:
- Dapr pub/sub 組件的名稱(此處為pubsub) 。
- 訂閱的主題名稱 (此處為
newOrder
) 。 - 需要為本主題調用 的API 操作 (此處為
/orders
) 。 - 作用域可以指定哪些服務可以發布和訂閱主題。
Reference application: eShopOnDapr
參考應用程序: eShopOnDapr
The accompanying eShopOnDapr app provides an end-to-end reference architecture for constructing a microservices application implementing Dapr. eShopOnDapr is an evolution of the widely popular eShopOnContainers app, created several years ago. Both versions use the pub/sub pattern for communicating integration events across microservices. Integration events include:
- When a user checks-out a shopping basket.
- When a payment for an order has succeeded.
- When the grace-period of a purchase has expired.
隨附的 eShopOnDapr 應用提供端到端參考架構,用於構造實現 Dapr 的微服務應用程序。 eShopOnDapr 是在幾年前創建的廣泛流行的 eShopOnContainers 應用程序的演變。 這兩個版本都使用發布/訂閱模式來跨微服務傳遞 集成事件 。 集成事件包括:
- 當用戶結賬時。
- 訂單的付款成功時。
- 購買的寬限期已過期時。
Eventing in eShopOnContainers is based on the following IEventBus
interface:
EShopOnContainers 中的事件基於以下 IEventBus
接口:
public interface IEventBus { void Publish(IntegrationEvent integrationEvent); void Subscribe<T, THandler>() where TEvent : IntegrationEvent where THandler : IIntegrationEventHandler<T>; }
Concrete implementations of this interface exist in eShopOnContainers for both RabbitMQ and Azure Service Bus. Each implementation included a great deal of custom plumbing code that was complex to understand and difficult to maintain.
eShopOnContainers 中存在此接口的具體實現(RabbitMQ 和Azure Service Bus) 。 每個實現都包含大量自定義的管道代碼,這些代碼非常復雜,難於理解和維護。
The newer eShopOnDapr significantly simplifies pub/sub behavior by using Dapr. For example, the IEventBus
interface was reduced to a single method:
較新的 eShopOnDapr 通過使用 Dapr 大大簡化了發布/訂閱行為。 例如, IEventBus
接口被縮減到單一方法:
public interface IEventBus { Task PublishAsync(IntegrationEvent integrationEvent); }
Publish events
發布事件
In the updated eShopOnDapr, a single DaprEventBus
implementation can support any Dapr-supported message broker. The following code block shows the simplified Publish method. Note how the PublishAsync
method uses the Dapr client to publish an event:
在更新了的 eShopOnDapr 中,一個 DaprEventBus
實現可以支持任何 Dapr 支持的消息代理。下面的代碼塊顯示了簡化的發布方法。 請注意該 PublishAsync
方法如何使用 DaprClient來發布事件:
public class DaprEventBus : IEventBus { private const string PubSubName = "pubsub"; private readonly DaprClient _daprClient; private readonly ILogger<DaprEventBus> _logger; public DaprEventBus(DaprClient daprClient, ILogger<DaprEventBus> logger) { _daprClient = daprClient ?? throw new ArgumentNullException(nameof(daprClient)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task PublishAsync(IntegrationEvent integrationEvent) { var topicName = integrationEvent.GetType().Name; // Dapr uses System.Text.Json which does not support serialization of a // polymorphic type hierarchy by default. Using object as the type // parameter causes all properties to be serialized. await _daprClient.PublishEventAsync<object>(PubSubName, topicName, integrationEvent); } }
As you can see in the code snippet, the topic name is derived from event type's name. Because all eShop services use the IEventBus
abstraction, retrofitting Dapr required absolutely no change to the mainline application code.
如代碼片段中所示,主題名稱從事件類型的名稱獲得。 因為所有 eShop services 都使用 IEventBus
抽象,替換底層組件完全不更改 主線應用程序代碼。
Important
The Dapr SDK uses System.Text.Json
to serialize/deserialize messages. However, System.Text.Json
doesn't serialize properties of derived classes by default. In the eShop code, an event is sometimes explicitly declared as an IntegrationEvent
, the base class for integration events. This is done because the concrete event type is determined dynamically at run time based on business logic. As a result, the event is serialized using the type information of the base class and not the derived class. To force System.Text.Json
to serialize all properties of the derived class in this case, the code uses object
as the generic type parameter. For more information, see the .NET documentation.
重要
Dapr SDK 使用 System.Text.Json
來序列化/反序列化消息。 但是, System.Text.Json
默認情況下不會序列化派生類的屬性。 在 eShop 代碼中,事件有時顯式聲明為 IntegrationEvent
(集成事件的基類)。 這是因為具體事件類型是在運行時基於業務邏輯動態確定的。 因此,使用基類而不是派生類的類型信息對事件進行序列化。 若要 System.Text.Json
在這種情況下強制序列化派生類的所有屬性,代碼需使用 object
作為泛型類型參數。 有關詳細信息,請參閱 .net 文檔。
With Dapr, the infrastructure code is dramatically simplified. It doesn't need to distinguish between the different message brokers. Dapr provides this abstraction for you. And if needed, you can easily swap out message brokers or configure multiple message broker components.
借助 Dapr,可大大簡化基礎結構代碼。 不需要區分不同的消息代理。 Dapr 為你提供此抽象。 如果需要,可以輕松地替換消息代理或配置多個消息代理組件。
Subscribe to events
訂閱事件
The earlier eShopOnContainers app contains SubscriptionManagers to handle the subscription implementation for each message broker. Each manager contains complex message broker-specific code for handling subscription events. To receive events, each service has to explicitly register a handler for each event-type.
早前的 eShopOnContainers 應用程序包含 訂閱管理器 來處理每個消息代理的訂閱實現。 每個管理器都包含用於處理訂閱事件的消息代理特定的復雜代碼。 若要接收事件,每個服務必須為每個事件類型顯式注冊一個處理程序。
eShopOnDapr streamlines the plumbing for event subscriptions by using Dapr ASP.NET Core libraries. Each event is handled by an action method in the controller. A Topic
attribute decorates the action method with the name of the corresponding topic to subscribe to. Here's a code snippet taken from the PaymentService
:
eShopOnDapr 使用 Dapr ASP.NET Core 庫優化了事件訂閱的管道。 每個事件都由控制器中的操作方法處理。 Topic
特性使用主題的名稱修飾操作方法。 下面是從PaymentService
中獲取的代碼片段 :
[Route("api/v1/[controller]")] [ApiController] public class IntegrationEventController : ControllerBase { private const string DAPR_PUBSUB_NAME = "pubsub"; private readonly IServiceProvider _serviceProvider; public IntegrationEventController(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; } [HttpPost("OrderStatusChangedToValidated")] [Topic(DAPR_PUBSUB_NAME, "OrderStatusChangedToValidatedIntegrationEvent")] public async Task OrderStarted(OrderStatusChangedToValidatedIntegrationEvent integrationEvent) { var handler = _serviceProvider.GetRequiredService<OrderStatusChangedToValidatedIntegrationEventHandler>(); await handler.Handle(integrationEvent); } }
In the Topic
attribute, the name of the .NET type of the event is used as the topic name. For handling the event, an event handler that already existed in the earlier eShopOnContainers code base is invoked. In the previous example, messages received from the OrderStatusChangedToValidatedIntegrationEvent
topic invoke the existing OrderStatusChangedToValidatedIntegrationEventHandler
event-handler. Because Dapr implements the underlying plumbing for subscriptions and message brokers, a large amount of original code became obsolete and was removed from the code-base. Much of this code was complex to understand and challenging to maintain.
在 Topic
特性中,事件的 .net 類型的名稱將用作主題名稱。 在處理事件時,將調用以前的 eShopOnContainers 代碼中已有的事件處理程序。 在上面的示例中,從OrderStatusChangedToValidatedIntegrationEvent
主題接收消息后調用了現有的 OrderStatusChangedToValidatedIntegrationEventHandler
事件處理程序。 由於 Dapr 實現了訂閱和消息代理的底層管道,因此大量原始代碼已過時,並已從代碼中刪除。 這些代碼對於理解和維護非常復雜。
Use pub/sub components
使用發布/訂閱組件
Within the eShopOnDapr repository, a deployment
folder contains files for deploying the application using different deployment modes: Docker Compose
and Kubernetes
. A dapr
folder exists within each of these folders that holds a components
folder. This folder holds a file eshop-pubsub.yaml
containing the configuration of the Dapr pub/sub component that the application will use for pub/sub behavior. As you saw in the earlier code snippets, the name of the pub/sub component used is pubsub
. Here's the content of the eshop-pubsub.yaml
file in the deployment/compose/dapr/components
folder:
在 eShopOnDapr 存儲庫中, deployment
文件夾包含使用不同部署模式(Docker Compose
和 Kubernetes)
部署應用程序的文件。 compose文件夾或kubernetes文件夾中的dapr文件夾中都有一個components文件夾(此句不會翻譯)。 此文件夾包含一個eshop-pubsub.yaml文件,該文件包含應用程序將使用的 Dapr pub/sub 組件的配置。 正如您在前面的代碼片段中所看到的那樣,所使用的 pub/sub 組件的名稱為 pubsub
。 下面是deployment/compose/dapr/components
文件夾中 eshop-pubsub.yaml
文件的內容 :
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub namespace: default spec: type: pubsub.nats version: v1 metadata: - name: natsURL value: nats://demo.nats.io:4222
The preceding configuration specifies the desired NATS message broker for this example. To change message brokers, you need only to configure a different message broker, such as RabbitMQ or Azure Service Bus and update the yaml file. With Dapr, there are no changes to your mainline service code when switching message brokers.
前面的配置指定此示例使用的 NATS 消息代理 。 若要更改消息代理,只需更新 yaml 文件來配置一個不同的消息代理,如 RabbitMQ 或 Azure Service Bus。 使用Dapr,切換消息代理時,無需對主線服務代碼進行任何更改。
Finally, you might ask, "Why would I need multiple message brokers in an application?". Many times a system will handle workloads with different characteristics. One event may occur 10 times a day, but another event occurs 5,000 times per second. You may benefit by partitioning messaging traffic to different message brokers. With Dapr, you can add multiple pub/sub component configurations, each with a different name.
最后,您可能會問: "為什么需要在一個應用程序中使用多個消息代理?"。 很多時候系統將處理具有不同特征的工作負荷。 一個事件可能一天發生10次,但另一個事件每秒發生5000次。 可以通過將消息傳送流量分區給不同消息代理來受益。 使用 Dapr,可以添加多個 pub/sub 組件配置,每個配置使用不同的名稱。
Summary
總結
The pub/sub pattern helps you decouple services in a distributed application. The Dapr publish & subscribe building block simplifies implementing this behavior in your application.
Pub/sub 模式可幫助你解耦分布式應用程序中的服務。 在應用程序中使用Dapr 發布 & 訂閱構建塊可簡化此工作。
Through Dapr pub/sub, you can publish messages to a specific topic. As well, the building block will query your service to determine which topic(s) to subscribe to.
通過 Dapr pub/sub,你可以將消息發布到特定 主題。 同時,構建塊將查詢你的服務,以確定訂閱的主題。
You can use Dapr pub/sub natively over HTTP or by using one of the language-specific SDKs, such as the .NET SDK for Dapr. The .NET SDK tightly integrates with the ASP.NET core platform.
你可以通過 HTTP 或使用特定於語言的 Sdk(如 .NET SDK for Dapr)使用 Dapr pub/sub。 .NET SDK 與 ASP.NET core 平台緊密集成。
With Dapr, you can plug a supported message broker product into your application. You can then swap message brokers without requiring code changes to your application.
使用 Dapr,可以將受支持的消息代理插入應用程序。 然后,你可以在無需對應用程序進行代碼更改的情況下替換消息代理。