Vert.x中EventBus中的使用



注意:使用的是vert.x3.0 僅支持到java8當中有一些lambda表達式。如不明確請自補java8新特性。

The Event Bus

event bus 是vert.x的神經系統。

每個vert.x的實例都有一個單一的event bus 實例。它是使用vertx.eventBus()方法獲得的。

event bus 同意程序中的不同語言編寫的模塊進行通信。不論他們是同樣的vert.x實例。還是不同的vert.x實例。

它甚至能夠橋接瀏覽器中執行的Javascript通信。

event bus能夠在分布式系統中的多個server節點之間進行點對點通信和多個瀏覽器。

event bus支持公布/訂閱模式。點對點模式,和請求/響應模式。

event bus的API是很easy的。它主要包含注冊消息處理事件。取消處理事件。發送和公布消息。

首先理論

尋址

event bus上的消息被發送到一個地址。

vert.x不包括不論什么花哨的尋址方案。

在vert.x中,一個地址就是一個簡單的String字符串。不論什么字符串都是有效的。只是最好的方法是使用某種有計划或者有規則的方案,比方使用一個私有的空間名稱。

一些有參考價值的樣例:europe.news.feed1, acme.games.pacman, sausages, and X。

事件-消息的處理程序

收到消息的處理程序,你在一個地址上注冊一個處理程序,來消息后將觸發這個處理程序。

同一個消息處理程序能夠注冊到不同的地址上,相同同一個地址也能注冊多個處理程序。

公布/訂閱模式

event bus 支持公布消息

消息被公布到一個地址。公布意味着將消息交給全部訂閱並注冊處理程序的地址來處理。

這跟大家熟悉的公布/訂閱模式沒有什么不同。

點對點和請求/響應模式

event bus 支持點對點消息傳遞。

消息被發送到一個地址。

vert.x將發送它到一個注冊消息處理程序的地址。

假設有多個處理程序注冊地址,vert.x將選擇一個來處理(採用非嚴格循環算法)。

強烈不推薦。

當接收到消息的程序處理完畢后,能夠決定是否回復。發送程序接到回復后也能夠進行響應回復,假設他們這樣做應答處理程序將被調用。

當接收方到返回發送方。這樣能夠無限反復,這又是一種常見的消息傳遞模式:請求/響應模式

最優傳輸

vert.x可以做到最優傳輸。不會有意識的丟失消息。這是很重要的。

然而,event bus的部分或所有失敗還是有可能造成消息丟失的。

假設你的應用程序很在乎消息的完整性和時序性。那么你的代碼處理應該是冪等的。以便在消息處理程序復蘇后又一次發送消息。

消息類型

開箱同意vert.x使用不論什么的原始/簡單類型,字符串或者緩沖區發送消息。

然而這里有一個不成文的規定或者說建議。那就是最好使用JSON格式的子串來進行消息的傳遞。

JSON字串在全部的編程語言中都是很easy創建。讀取和解析的。在vert.x下它已經變成一種通用語言了。

假設你不是必需使用JSON或者說你不想。

event bus 很靈活。

它還支持發送隨意對象。還能夠定義您想要發送的對象的編解碼器。

EVENT BUS 的API

讓我們跳進event bus的API。

獲得event bus 的對象

你能夠通過例如以下代碼獲得event bus的單一對象:

EventBus eb = vertx.eventBus();

注冊處理事件

使用以下這個簡單方法注冊一個消費處理程序:

EventBus eb = vertx.eventBus();

eb.consumer("news.uk.sport", message -> {

System.out.println("I have received a message: " + message.body());

});

當一個消息到達你的處理事件是。你的事件將被激活。並處理這個消息。

consumer()方法返回一個MessageConsumer的對象實例。這個對象隨后用於注銷處理程序,或者用處理程序作為流。

然而您也能夠使用consumer()返回MessageConsumer沒有處理程序,然后單獨設置處理程序。比如:

EventBus eb = vertx.eventBus();

MessageConsumer<String> consumer = eb.consumer("news.uk.sport");consumer.handler(message -> {

System.out.println("I have received a message: " + message.body());

});

當在集群事件總線上注冊一個處理程序時,它能夠花一些時間登記到集群的全部節點上。

假設你希望在注冊完畢時得到通知的話,你能夠在MessageConsumer上注冊一個注冊完畢的處理程序:

consumer.completionHandler(res -> {

if (res.succeeded()) {

System.out.println("The handler registration has reached all nodes");

} else {

System.out.println("Registration failed!");

}

});

注銷處理事件

去除處理事件。叫做注銷。

假設你是集群事件總線, 假設你想當這個過程完畢時通知注銷,你能夠使用以下的方法:

consumer.unregister(res -> {

if (res.succeeded()) {

System.out.println("The handler un-registration has reached all nodes");

} else {

System.out.println("Un-registration failed!");

}

});

公布消息

公布消息很easy。僅僅須要把它公布到指定地址就可以:

eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");

這一消息將被交付全部訂閱news.uk.sport地址處理。

發送消息

發送消息將導致僅僅有一個注冊地址的處理程序接收到消息(多個注冊地址也僅僅有一個能收到)。

這就是點對點模式,選擇處理程序的方法採用非嚴格循環方式。

你可用用send()方法發送一條消息。

eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");

未解決的指令包含在<stdiin>-include::override/eventbus_headers.adoc[] ==== The Message object

你的消息處理程序收到的是一個Message。

消息的body相應着是應該發送還是應該公布。

消息的headers是可用的。

回復消息

有時你發送消息后希望得到接收到消息的人的回復。

這就須要你使用請求-響應模式。

要做到這一點,在消息發送的時候,你能夠指定一個回復事件。

當你接收到消息的時候,你能夠通過調用reply()方法來應答。

當這一切發生的時候它會導致一個答復發送回發送方,發送方收到應答消息再做處理。

接收方:

MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");

consumer.handler(message -> {

System.out.println("I have received a message: " + message.body());

message.reply("how interesting!");

});

發送方:

eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {

if (ar.succeeded()) {

 System.out.println("Received reply: " + ar.result().body());
 }
 });

相應答也能夠做應答。這樣你就能夠在兩個不同的程序中創建一個包括多個回合的對話。

發送超時

當你發送消息時和指定應答事件時你能夠通過DeliveryOptions指定超時時間。

假設應答事件不少於超時時間,這個應答事件將失敗。

默認的超時時間是30S。

發送失敗

消息發送失敗的其它原因,包含:

沒有可用的事件去發送消息

接收者已經明白使用失敗:失敗的消息

在全部情況下。應答事件將回復特定的失敗。

未解決的指令包括在<stdin> - include::override/eventbus.adoc[]==== Clustered Event Bus

event bus 不只存在於一個單一的Vert.x實例中,在一個集群中不同的Vert.x實例也能夠形成一個單一的,分布的事件總線。

集群編程

假設你創建一個Vert.x實例用於集群編程。你須要的得到一個關於集群事件總線配置

VertxOptions options = new VertxOptions();

Vertx.clusteredVertx(options, res -> {

if (res.succeeded()) {

Vertx vertx = res.result();

EventBus eventBus = vertx.eventBus();

System.out.println("We now have a clustered event bus: " + eventBus);

} else {

System.out.println("Failed: " + res.cause());

}});

你應該確保在你的類路徑中實現了一個ClusterManager,比如默認的:HazelcastClusterManager。

使用命令集群

你能夠使用命令行執行集群:vertx run my-verticle.js -cluster

Automatic clean-up in verticles

If you’re registering event bus handlers from inside verticles, those handlers will be automatically unregisteredwhen the verticle is undeployed.





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM