注意:使用的是vert.x3.0 僅支持到java8當中有一些lambda表達式。如不明確請自補java8新特性。
The Event Bus
event bus 是vert.x的神經系統。
每個vert.x的實例都有一個單一的event bus 實例。它是使用vertx.eventBus()方法獲得的。
event bus 同意程序中的不同語言編寫的模塊進行通信。不論他們是同樣的vert.x實例。還是不同的vert.x實例。
它甚至能夠橋接瀏覽器中執行的Javascript通信。
event bus能夠在分布式系統中的多個server節點之間進行點對點通信和多個瀏覽器。
event bus支持公布/訂閱模式。點對點模式,和請求/響應模式。
event bus的API是很easy的。它主要包含注冊消息處理事件。取消處理事件。發送和公布消息。
首先理論
尋址
event bus上的消息被發送到一個地址。
vert.x不包括不論什么花哨的尋址方案。
在vert.x中,一個地址就是一個簡單的String字符串。不論什么字符串都是有效的。只是最好的方法是使用某種有計划或者有規則的方案,比方使用一個私有的空間名稱。
一些有參考價值的樣例:europe.news.feed1, acme.games.pacman, sausages, and X。
事件-消息的處理程序
收到消息的處理程序,你在一個地址上注冊一個處理程序,來消息后將觸發這個處理程序。
同一個消息處理程序能夠注冊到不同的地址上,相同同一個地址也能注冊多個處理程序。
公布/訂閱模式
event bus 支持公布消息
消息被公布到一個地址。公布意味着將消息交給全部訂閱並注冊處理程序的地址來處理。
這跟大家熟悉的公布/訂閱模式沒有什么不同。
點對點和請求/響應模式
event bus 支持點對點消息傳遞。
消息被發送到一個地址。
vert.x將發送它到一個注冊消息處理程序的地址。
假設有多個處理程序注冊地址,vert.x將選擇一個來處理(採用非嚴格循環算法)。
強烈不推薦。
當接收到消息的程序處理完畢后,能夠決定是否回復。發送程序接到回復后也能夠進行響應回復,假設他們這樣做應答處理程序將被調用。
當接收方到返回發送方。這樣能夠無限反復,這又是一種常見的消息傳遞模式:請求/響應模式
最優傳輸
vert.x可以做到最優傳輸。不會有意識的丟失消息。這是很重要的。
然而,event bus的部分或所有失敗還是有可能造成消息丟失的。
假設你的應用程序很在乎消息的完整性和時序性。那么你的代碼處理應該是冪等的。以便在消息處理程序復蘇后又一次發送消息。
消息類型
開箱同意vert.x使用不論什么的原始/簡單類型,字符串或者緩沖區發送消息。
然而這里有一個不成文的規定或者說建議。那就是最好使用JSON格式的子串來進行消息的傳遞。
JSON字串在全部的編程語言中都是很easy創建。讀取和解析的。在vert.x下它已經變成一種通用語言了。
假設你不是必需使用JSON或者說你不想。
event bus 很靈活。
它還支持發送隨意對象。還能夠定義您想要發送的對象的編解碼器。
EVENT BUS 的API
讓我們跳進event bus的API。
獲得event bus 的對象
你能夠通過例如以下代碼獲得event bus的單一對象:
EventBus eb = vertx.eventBus();
注冊處理事件
使用以下這個簡單方法注冊一個消費處理程序:
EventBus eb = vertx.eventBus();
eb.consumer("news.uk.sport", message -> {
System.out.println("I have received a message: " + message.body());
});
當一個消息到達你的處理事件是。你的事件將被激活。並處理這個消息。
consumer()方法返回一個MessageConsumer的對象實例。這個對象隨后用於注銷處理程序,或者用處理程序作為流。
然而您也能夠使用consumer()返回MessageConsumer沒有處理程序,然后單獨設置處理程序。比如:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.consumer("news.uk.sport");consumer.handler(message -> {
System.out.println("I have received a message: " + message.body());
});
當在集群事件總線上注冊一個處理程序時,它能夠花一些時間登記到集群的全部節點上。
假設你希望在注冊完畢時得到通知的話,你能夠在MessageConsumer上注冊一個注冊完畢的處理程序:
consumer.completionHandler(res -> {
if (res.succeeded()) {
System.out.println("The handler registration has reached all nodes");
} else {
System.out.println("Registration failed!");
}
});
注銷處理事件
去除處理事件。叫做注銷。
假設你是集群事件總線, 假設你想當這個過程完畢時通知注銷,你能夠使用以下的方法:
consumer.unregister(res -> {
if (res.succeeded()) {
System.out.println("The handler un-registration has reached all nodes");
} else {
System.out.println("Un-registration failed!");
}
});
公布消息
公布消息很easy。僅僅須要把它公布到指定地址就可以:
eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");
這一消息將被交付全部訂閱news.uk.sport地址處理。
發送消息
發送消息將導致僅僅有一個注冊地址的處理程序接收到消息(多個注冊地址也僅僅有一個能收到)。
這就是點對點模式,選擇處理程序的方法採用非嚴格循環方式。
你可用用send()方法發送一條消息。
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");
未解決的指令包含在<stdiin>-include::override/eventbus_headers.adoc[] ==== The Message object
你的消息處理程序收到的是一個Message。
消息的body相應着是應該發送還是應該公布。
消息的headers是可用的。
回復消息
有時你發送消息后希望得到接收到消息的人的回復。
這就須要你使用請求-響應模式。
要做到這一點,在消息發送的時候,你能夠指定一個回復事件。
當你接收到消息的時候,你能夠通過調用reply()方法來應答。
當這一切發生的時候它會導致一個答復發送回發送方,發送方收到應答消息再做處理。
接收方:
MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
consumer.handler(message -> {
System.out.println("I have received a message: " + message.body());
message.reply("how interesting!");
});
發送方:
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {
if (ar.succeeded()) {
System.out.println("Received reply: " + ar.result().body());
}
});
相應答也能夠做應答。這樣你就能夠在兩個不同的程序中創建一個包括多個回合的對話。
發送超時
當你發送消息時和指定應答事件時你能夠通過DeliveryOptions指定超時時間。
假設應答事件不少於超時時間,這個應答事件將失敗。
默認的超時時間是30S。
發送失敗
消息發送失敗的其它原因,包含:
沒有可用的事件去發送消息
接收者已經明白使用失敗:失敗的消息
在全部情況下。應答事件將回復特定的失敗。
未解決的指令包括在<stdin> - include::override/eventbus.adoc[]==== Clustered Event Bus
event bus 不只存在於一個單一的Vert.x實例中,在一個集群中不同的Vert.x實例也能夠形成一個單一的,分布的事件總線。
集群編程
假設你創建一個Vert.x實例用於集群編程。你須要的得到一個關於集群事件總線配置
VertxOptions options = new VertxOptions();
Vertx.clusteredVertx(options, res -> {
if (res.succeeded()) {
Vertx vertx = res.result();
EventBus eventBus = vertx.eventBus();
System.out.println("We now have a clustered event bus: " + eventBus);
} else {
System.out.println("Failed: " + res.cause());
}});
你應該確保在你的類路徑中實現了一個ClusterManager,比如默認的:HazelcastClusterManager。
使用命令集群
你能夠使用命令行執行集群:vertx run my-verticle.js -cluster
Automatic clean-up in verticles
If you’re registering event bus handlers from inside verticles, those handlers will be automatically unregisteredwhen the verticle is undeployed.