Vert X優勢:
1. 與基於阻塞 I/O 的傳統堆棧和框架相比,以更少的資源處理更多的請求。Vert.x 非常適合各種執行環境,包括虛擬機和容器等受限環境。
2. Vert.x 是一個工具包,不是一個框架,所以它自然是非常可組合和可嵌入的(不同語言都可以)。
Vert.x運行在Java虛擬機上,支持多種編程語言,Vert.x是高度模塊化的,同一個應用,你可以選擇多種編程語言同時開發。在Vert.x 2版本,也就是基於JDK7,還沒有lambda的時 候,一般來講,使用JavaScript作為開發語言相對較多,到Vert.x3的時代,因為JDK8的出現,Java已經作為Vert.x主流的開發語言,而Vert.x也被更多的開發者所接受。
3. 異步編程對你來說太難了?Vert.x 編程成為一種平易近人的體驗,同時又不犧牲正確性和性能。
Vert.x異步也帶來了編碼上的復雜性,想要編寫優美的異步代碼,就需要對lambda表達式、函數式編程、Reactive等技術非常熟悉才行,否則很容易導致你的代碼一團糟,完全沒 有可讀性。另外,異步模型的性能調優、異常處理與同步模型有很大差異,網絡中相關資料較少,使用中遇到問題排查困難,這也是目前國內架構師不願意選擇Vert.x的原因。
4. 相對完善的生態
請查詢官網支持: https://vertx.io/
5. 我們可以在同一個端口部署HTTP Server代碼兩次,並且對於每個實例都不期望出現由於TCP端口已經被占用而導致的任何錯誤。對於許多Web框架,我們需要選擇不同的TCP端口,並且有一個前端HTTP代理來執行端口之間的負載平衡。Vert.x則不需要這么做,多個Verticle可以共享相同的TCP端口號。傳入的連接只是簡單的通過接收線程以輪轉的方式分發。
額外:如果懂python的人,可以理解為:在python語言中,vertx 類似於 tornado
下面的所有操作以Java為例(以maven為例, pom依賴可參考官網):
1. 啟動一個web服務器,監聽8080端口:
HttpServer server = vertx.createHttpServer(); server.requestHandler(request -> { // This handler gets called for each request that arrives on the server HttpServerResponse response = request.response(); // response.putHeader("content-type", "text/plain"); // Write to the response and end it response.end("Hello World!"); }); server.listen(8080);
2. POST請求body獲取需要路由注冊
HttpServer server = vertx.createHttpServer(); Router router = Router.router(vertx); // post body router.route().handler(BodyHandler.create()
3. Api映射及請求方式
// 簡化版路由 router.route("/some/111").handler(routingContext -> { throw new RuntimeException("something happened!"); // HttpServerResponse response = routingContext.response(); // // 寫入響應並結束處理 // response.end("Hello World from Vert.x-Web!!!!!"); }); //指定get請求 router.route(HttpMethod.GET,"/some/path/").handler(routingContext -> { HttpServerResponse response = routingContext.response(); response.end("Hello World from Vert.x-Web!!!!!"); }); // 簡化方式指定 router.get("/get").handler(routingContext -> { HttpServerResponse response = routingContext.response(); response.end("返回值"); });
4. 多次寫入返回數據需要設置多次寫入集塊:setChunked(true)
router.get("/get").handler(routingContext -> { HttpServerResponse response = routingContext.response(); response.setChunked(true); response.write("11111111111111111111111\n"); response.end("22"); })
5. 通配符指定報錯路由
Router exceptionRouter = Router.router(vertx); exceptionRouter.route("/some/*").failureHandler(failureRoutingContext -> { int statusCode = failureRoutingContext.statusCode(); // 對於 RuntimeException 狀態碼會是 500,否則是 403 HttpServerResponse response = failureRoutingContext.response(); response.setStatusCode(statusCode).end("Sorry! Not today"); // HttpServerResponse response = routingContext.response(); });
6. 指定子路由
Router subRouter = Router.router(vertx); subRouter.get("/put").handler(routingContext -> { HttpServerResponse response = routingContext.response(); response.end("is end"); }); router.mountSubRouter("/sub", subRouter);
7. vertx 支持session,並注冊到路由
SessionStore store = LocalSessionStore.create(vertx); // 通過指定的 Map 名稱創建了一個本地會話存儲 // 這適用於您在同一個 Vert.x 實例中有多個應用,並且希望不同的應用使用不同的 Map 的情況 SessionStore store1 = LocalSessionStore.create(vertx, "myapp3.sessionmap"); // 通過指定的 Map 名稱創建了一個本地會話存儲 // 設置了檢查過期 Session 的周期為 10 秒 SessionStore store3 = LocalSessionStore.create(vertx, "myapp3.sessionmap", 10000); // // 創建了一個默認的集群會話存儲 SessionStore store4 = ClusteredSessionStore.create(vertx); // 通過指定的 Map 名稱創建了一個集群會話存儲 // 這適用於您在集群中有多個應用,並且希望不同的應用使用不同的 Map 的情況 SessionStore store2 = ClusteredSessionStore.create(vertx, "myclusteredapp3.sessionmap"); // 確保所有請求都會經過 session 處理器 SessionHandler sessionHandler = SessionHandler.create(store); router.route().handler(sessionHandler);
router.route("/111").handler(routingContext -> {
Session session = routingContext.session();
session.put("name", "hz");
session.get("name");
JsonObject jsonObject = session.remove("name");
session.destroy();
// session.setAccessed();
});
8. vertx cookie支持
// cookie router.route().handler(CookieHandler.create());
9. 相同路由情況下,可延遲調用下一個相同的路由, 這點和前端很相似
// 5 秒后調用下一個處理器 routingContext.vertx().setTimer(5000, tid -> routingContext.next());
10. 指定jwt驗證
// Add the JWT authenticator as the next handler to all routes.
router.route("/api*").handler(this.authenticator::authenticate); // Add handler to echo the user attributes back to the caller. router.get("/api/hello").handler(this::sayHello);
驗證邏輯, 其余創建等在jwt util中生成,走其他路由即可
// Handler methods public void authenticate(RoutingContext routingContext) { var authorizeHeader = routingContext.request().getHeader("Authorization"); LOGGER.debug("Authenticating request with Authorization = {}", authorizeHeader); if (authorizeHeader != null) { // We can only handle bearer tokens; let's make certain that's what we have. String[] components = authorizeHeader.split(" "); if (components.length > 1 && components[0].equalsIgnoreCase("bearer")) { var jwt = components[1]; LOGGER.debug("Will authenticate using JWT {}",jwt); this.authenticator.authenticate(new JsonObject().put("jwt",jwt), result -> { if( result.succeeded()){ LOGGER.debug("Successfully authenticated request."); // Add the user to the routing context and pass control to the // next handler. User user = result.result(); LOGGER.debug("User pricipal = {}", user.principal().toString()); routingContext.setUser(user); routingContext.next(); } else { // Unable to authenticate; request is forbidden. LOGGER.warn("Unable to authenticate request.",result.cause()); routingContext.response().setStatusCode(403).end(); } }); } else { routingContext.response().setStatusCode(400).end(); } } else { LOGGER.info("Request does not contain Authorization header; returning HTTP status code 401."); routingContext.response().setStatusCode(401).end(); } }
11. event bus
如果想要通過其他的開發語言或者在其他應用中調用Vert.x實例模塊,可以通過發起TCP請求,連接到EventBus,Event Bus使用的通信協議如下,但一定不要忘記,需要創建Event Bus Bride。
<Length: uInt32><{ type: String, address: String, (replyAddress: String)?, headers: JsonObject, body: JsonObject }: JsonObject>
服務提供者: 發布Service到總線上
@Override public void start() throws Exception { Service service = Service.create(vertx); new ServiceBinder(vertx).setAddress(address).register(Service.class, service); }
服務調用者:
@Override public void start() throws Exception { HttpServer httpServer = vertx.createHttpServer(); httpServer.requestHandler(request -> { // 獲取到response對象 HttpServerResponse response = request.response(); // 設置響應頭 response.putHeader("Content-type", "text/html;charset=utf-8"); // 通過配置action參數,指定要走哪一個方法 DeliveryOptions options = new DeliveryOptions(); options.addHeader("action", "sayHello"); // 這個是給方法傳入的參數 JsonObject config = new JsonObject(); config.put("name", "xiaozhang"); // 通過eventBus調用方法 vertx.eventBus().<JsonObject>send("service.demo.firstverticle", config, options, res -> { // 響應數據 response.end(res.result().body().getString("msg")); }); }); httpServer.listen(1234); }
12. 配置事件總線:事件總線中如何使用SSL連接代替普通TCP連接
事件總線是可配置的。在事件總線被集群時,這特別有用。在事件總下之下使用TCP連接發送和接收消息時,使用EventBusOptions讓你可以配置TCP連接的所有方面。因為事件總線扮演了一個服務器與客戶端的角色。配置與NetClientOptions和NetServerOptions類似。
VertxOptionsoptions = new VertxOptions() .setEventBusOptions(new EventBusOptions() .setSsl(true) .setKeyStoreOptions(newJksOptions().setPath("keystore.jks").setPassword("wibble")) .setTrustStoreOptions(newJksOptions().setPath("keystore.jks").setPassword("wibble")) .setClientAuth(ClientAuth.REQUIRED) ); Vertx.clusteredVertx(options,res -> {undefined if (res.succeeded()) {undefined Vertx vertx = res.result(); EventBus eventBus = vertx.eventBus(); System.out.println("We now have aclustered event bus: " + eventBus); } else {undefined System.out.println("Failed: " +res.cause()); } });
警示:為了集群模式中的安全,必須配置集群管理器使用加密和強制安全。可以能看集群管理器文檔獲取更多信息。事件總線的配置需要在所有的集群的節點上一至。EventBusOptions也讓你設置事件總線是否被集群。通過setClustered,getClusterHost,getClusterPort方法可以配置主機和端口。
13. 配置事件總線:容器中使用,配置公共的主機和端口
VertxOptionsoptions = new VertxOptions() .setEventBusOptions(new EventBusOptions() .setClusterPublicHost("whatever") .setClusterPublicPort(1234) ); Vertx.clusteredVertx(options,res -> {undefined if (res.succeeded()) {undefined Vertx vertx = res.result(); EventBus eventBus = vertx.eventBus(); System.out.println("We now have aclustered event bus: " + eventBus); } else {undefined System.out.println("Failed: " +res.cause()); } });
14. CORS支持
router.route() .handler( CorsHandler.create("vertx\\.io") .allowedMethod(HttpMethod.GET));
15. 注解使用
1. 將服務編寫為 Java 接口並使用注釋對其進行@ProxyGen
注釋
@ProxyGen public interface SomeDatabaseService { // A couple of factory methods to create an instance and a proxy static SomeDatabaseService create(Vertx vertx) { return new SomeDatabaseServiceImpl(vertx); } static SomeDatabaseService createProxy(Vertx vertx, String address) { return new SomeDatabaseServiceVertxEBProxy(vertx, address); } // Actual service operations here... void save(String collection, JsonObject document, Handler<AsyncResult<Void>> resultHandler); }
2. 還需要package-info.java
在定義接口的包中(或上方)某個位置的文件。需要對該包進行注釋,@ModuleGen
以便 Vert.x CodeGen 可以識別您的接口並生成適當的 EventBus 代 理代碼。 (給定接口,Vert.x 將生成通過事件總線訪問您的服務所需的所有樣板代碼,它還將為您的服務生成一個客戶端代理,因此您的客戶端可以為您的服務使用豐富的慣用 API,而不是必須手動制作要發送的事件總線消息。無論您的服務在事件總線上的實際位置如何(可能在不同的機器上),客戶端代理都將工作。)
@io.vertx.codegen.annotations.ModuleGen(groupPackage = "io.vertx.example", name = "services", useFutures = true) package io.vertx.example;
3、參數類型需要是字符串、Java原始數據類型、JSON對象或者數組、任何枚舉類型或者前面類型的java.util集合(List/Set/Map)。支持任意Java類的唯一方法是使用@DataObject注解,使它們作為Vert.x數據對象。傳遞其它類型的最后機會是服務引用類型。
4. Fluent注解是可選的,但是允許fluent接口,操作可以通過返回服務實例被鏈式調用(chained)。這對於代碼生成器非常有用,當服務將被其它JVM語言消費時。
16. 部署verticle :
standard verticle
Verticle myVerticle = new MyVerticle(); vertx.deployVerticle(myVerticle);
worker verticle
DeploymentOptions options = new DeploymentOptions().setWorker(true); vertx.deployVerticle(new MyVerticle(), options);
multi thread worker verticle
和普通的Worker verticles相比,唯一的區別在於,Multi-threaded worker verticles會在同一個時間點,有多個線程同時執行一個Worker Verticle實例
17.
拆分成分布式應用,創建一個HttpApp.java,在里面發布HttpVerticle。
package com.javafm.vertx; import io.vertx.core.Vertx; public class HttpApp { public static void main(String[] args) { Vertx.vertx().deployVerticle(new HttpVerticle()); } }
接着再編寫DatabaseApp.java,在里面發布DatabaseVerticle。
package com.javafm.vertx; import io.vertx.core.Vertx; public class DatabaseApp { public static void main(String[] args) { Vertx.vertx().deployVerticle(new DatabaseVerticle()); } }
現在我們分別啟動HttpApp.java和DatabaseApp.java,在訪問http://localhost:8080/user/1
接口。這個時候會發現接口出現錯誤了,沒有達到預期的效果。控制台出現這個錯誤(NO_HANDLERS,-1) No handlers for address com.javafm.vertx.database
。 那是因為將兩個Verticle拆分成獨立運行的進程時,也就是說分布式運行了,但兩個Verticle中的vertx無法通信,導致發布者與訂閱者沒有任何關聯關系。
那么為了解決這個問題,需要在pom.xml中引入一個組件,也就是說要做集群部署,要組網了。
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-hazelcast</artifactId> <version>3.8.5</version> </dependency>
接着修改HttpApp.java代碼,加入集群功能。
package com.javafm.vertx; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; public class HttpApp { public static void main(String[] args) { VertxOptions options = new VertxOptions(); Vertx.clusteredVertx(options, res -> { if (res.succeeded()) { res.result().deployVerticle(new HttpVerticle()); } }); } }
再修改DatabaseApp.java加入集群功能。
package com.javafm.vertx; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; public class DatabaseApp { public static void main(String[] args) { VertxOptions options = new VertxOptions(); Vertx.clusteredVertx(options, res -> { if (res.succeeded()) { res.result().deployVerticle(new DatabaseVerticle()); } }); } }
再分別啟動兩個主方法類,會發現控制台輸入的日志跟之前不一樣了,變多了。而且輸出了集群組網信息。大概像這樣:
3月 29, 2020 5:04:35 下午 com.hazelcast.internal.cluster.ClusterService
信息: [192.168.124.2]:5701 [dev] [3.12.2]
Members {size:1, ver:1} [
Member [192.168.124.2]:5701 - 7171dc38-7f0f-4f10-bb6a-409082deca8f this
]
這個時候再訪問之前的接口,發現有數據返回了。
在集群組網並沒有特殊配置,采用的是默認配置。如果你的分布式應用部署在不同的服務器和不同的網絡環境中,那么默認配置無法滿足需求,只要在resource目錄下添加hazelcast.xml文件進行網絡配置就可以了。hazelcast.xml文件的模板在hazelcast的jar包中有。
這里列出常用的兩種集群模式:
1、廣播multicast模式
<multicast enabled="true"> <multicast-group>224.2.2.3</multicast-group> <multicast-port>54327</multicast-port> </multicast>
如果為true,同一個網段內的vertx實例會自動發現,並集群。默認使用的也就是這種方式。
2、ip地址tcp-ip模式
<tcp-ip enabled="false"> <interface>127.0.0.1</interface> <member-list> <member>127.0.0.1</member> </member-list> </tcp-ip>
將enabled改為true,然后把集群的ip地址列在這里就可以了。配置完成后將應用打包部署在不同的服務器中就可以正常使用了。
18 vertx集群代碼展示
啟動類
package org.example; import com.hazelcast.config.Config; import com.hazelcast.config.FileSystemXmlConfig; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.eventbus.EventBusOptions; import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; import java.net.InetAddress; import java.net.UnknownHostException; /** * @Author: Administrator * @Description: * @Date: 2020/7/16 10:42 * @Version: 1.0 */ public class AppCluster { public static void main(String[] args) throws UnknownHostException { final VertxOptions vertxOptions = new VertxOptions(); EventBusOptions eventBusOptions = new EventBusOptions(); // 本機局域網Ip String hostAddress = InetAddress.getLocalHost().getHostAddress(); //集群方式啟動時監聽端口,用於接收數據 vertxOptions.setEventBusOptions(eventBusOptions).getEventBusOptions().setHost(hostAddress); //集群方式啟動 HazelcastClusterManager clusterManager = new HazelcastClusterManager(); vertxOptions.setClusterManager(clusterManager); Vertx.clusteredVertx(vertxOptions, res -> { Vertx result = res.result(); result.deployVerticle(new MainClusterVerticle(), r -> { if (r.succeeded()) { System.out.println(MainClusterVerticle.class.getName() + " --> 部署成功"); } else { r.cause().printStackTrace(); System.err.println(MainClusterVerticle.class.getName() + " --> 部署失敗, " + r.cause().getMessage()); } }); }); } }
Verticle類
package org.example; import io.vertx.core.AbstractVerticle; import io.vertx.core.json.JsonObject; public class MainClusterVerticle extends AbstractVerticle { public void start() { //to do something } }
cluster.xml
<?xml version="1.0" encoding="UTF-8"?> <!-- ~ Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. ~ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. ~ You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. --> <!-- The default Hazelcast configuration. This XML file is used when no cluster.xml is present. To learn how to configure Hazelcast, please see the schema at https://hazelcast.com/schema/config/hazelcast-config-3.12.xsd or the Reference Manual at https://hazelcast.org/documentation/ --> <!--suppress XmlDefaultAttributeValue --> <hazelcast xmlns="http://www.hazelcast.com/schema/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.hazelcast.com/schema/config http://www.hazelcast.com/schema/config/hazelcast-config-3.12.xsd"> <group> <name>dev</name> </group> //監控中心地址 <management-center enabled="true">http://192.168.2.112:7070/mancenter</management-center> <network> //TCP監聽端口5700,如果占用會使用下一個端口,一直到5800 <port auto-increment="true" port-count="100">5700</port> <outbound-ports> <!-- Allowed port range when connecting to other nodes. 0 or * means use system provided port. --> <ports>0</ports> </outbound-ports> <join> //采用組播方式集群 <multicast enabled="true"> <multicast-group>224.2.2.3</multicast-group> <multicast-port>54327</multicast-port> </multicast> <tcp-ip enabled="false"> <interface>192.168.2.112</interface> <member-list> <member>192.168.2.121</member> </member-list> </tcp-ip> <aws enabled="false"> <access-key>my-access-key</access-key> <secret-key>my-secret-key</secret-key> <!--optional, default is us-east-1 --> <region>us-west-1</region> <!--optional, default is ec2.amazonaws.com. If set, region shouldn't be set as it will override this property --> <host-header>ec2.amazonaws.com</host-header> <!-- optional, only instances belonging to this group will be discovered, default will try all running instances --> <security-group-name>hazelcast-sg</security-group-name> <tag-key>type</tag-key> <tag-value>hz-nodes</tag-value> </aws> <gcp enabled="false"> <zones>us-east1-b,us-east1-c</zones> </gcp> <azure enabled="false"> <client-id>CLIENT_ID</client-id> <client-secret>CLIENT_SECRET</client-secret> <tenant-id>TENANT_ID</tenant-id> <subscription-id>SUB_ID</subscription-id> <cluster-id>HZLCAST001</cluster-id> <group-name>GROUP-NAME</group-name> </azure> <kubernetes enabled="false"> <namespace>MY-KUBERNETES-NAMESPACE</namespace> <service-name>MY-SERVICE-NAME</service-name> <service-label-name>MY-SERVICE-LABEL-NAME</service-label-name> <service-label-value>MY-SERVICE-LABEL-VALUE</service-label-value> </kubernetes> <eureka enabled="false"> <self-registration>true</self-registration> <namespace>hazelcast</namespace> </eureka> <discovery-strategies> </discovery-strategies> </join> <interfaces enabled="false"> <interface>10.10.1.*</interface> </interfaces> <ssl enabled="false"/> <socket-interceptor enabled="false"/> <symmetric-encryption enabled="false"> <!-- encryption algorithm such as DES/ECB/PKCS5Padding, PBEWithMD5AndDES, AES/CBC/PKCS5Padding, Blowfish, DESede --> <algorithm>PBEWithMD5AndDES</algorithm> <!-- salt value to use when generating the secret key --> <salt>thesalt</salt> <!-- pass phrase to use when generating the secret key --> <password>thepass</password> <!-- iteration count to use when generating the secret key --> <iteration-count>19</iteration-count> </symmetric-encryption> <failure-detector> <icmp enabled="false"/> </failure-detector> </network> <partition-group enabled="false"/> <executor-service name="default"> <pool-size>16</pool-size> <!--Queue capacity. 0 means Integer.MAX_VALUE.--> <queue-capacity>0</queue-capacity> </executor-service> <security> <client-block-unmapped-actions>true</client-block-unmapped-actions> </security> <queue name="default"> <!-- Maximum size of the queue. When a JVM's local queue size reaches the maximum, all put/offer operations will get blocked until the queue size of the JVM goes down below the maximum. Any integer between 0 and Integer.MAX_VALUE. 0 means Integer.MAX_VALUE. Default is 0. --> <max-size>0</max-size> <!-- Number of backups. If 1 is set as the backup-count for example, then all entries of the map will be copied to another JVM for fail-safety. 0 means no backup. --> <backup-count>1</backup-count> <!-- Number of async backups. 0 means no backup. --> <async-backup-count>0</async-backup-count> <empty-queue-ttl>-1</empty-queue-ttl> <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy> </queue> <map name="default"> <!-- Data type that will be used for storing recordMap. Possible values: BINARY (default): keys and values will be stored as binary data OBJECT : values will be stored in their object forms NATIVE : values will be stored in non-heap region of JVM --> <in-memory-format>BINARY</in-memory-format> <!-- Metadata creation policy for this map. Hazelcast may process objects of supported types ahead of time to create additional metadata about them. This metadata then is used to make querying and indexing faster. Metadata creation may decrease put throughput. Valid values are: CREATE_ON_UPDATE (default): Objects of supported types are pre-processed when they are created and updated. OFF: No metadata is created. --> <metadata-policy>CREATE_ON_UPDATE</metadata-policy> <!-- Number of backups. If 1 is set as the backup-count for example, then all entries of the map will be copied to another JVM for fail-safety. 0 means no backup. --> <backup-count>1</backup-count> <!-- Number of async backups. 0 means no backup. --> <async-backup-count>0</async-backup-count> <!-- Maximum number of seconds for each entry to stay in the map. Entries that are older than <time-to-live-seconds> and not updated for <time-to-live-seconds> will get automatically evicted from the map. Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0 --> <time-to-live-seconds>0</time-to-live-seconds> <!-- Maximum number of seconds for each entry to stay idle in the map. Entries that are idle(not touched) for more than <max-idle-seconds> will get automatically evicted from the map. Entry is touched if get, put or containsKey is called. Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0. --> <max-idle-seconds>0</max-idle-seconds> <!-- Valid values are: NONE (no eviction), LRU (Least Recently Used), LFU (Least Frequently Used). NONE is the default. --> <eviction-policy>NONE</eviction-policy> <!-- Maximum size of the map. When max size is reached, map is evicted based on the policy defined. Any integer between 0 and Integer.MAX_VALUE. 0 means Integer.MAX_VALUE. Default is 0. --> <max-size policy="PER_NODE">0</max-size> <!-- `eviction-percentage` property is deprecated and will be ignored when it is set. As of version 3.7, eviction mechanism changed. It uses a probabilistic algorithm based on sampling. Please see documentation for further details --> <eviction-percentage>25</eviction-percentage> <!-- `min-eviction-check-millis` property is deprecated and will be ignored when it is set. As of version 3.7, eviction mechanism changed. It uses a probabilistic algorithm based on sampling. Please see documentation for further details --> <min-eviction-check-millis>100</min-eviction-check-millis> <!-- While recovering from split-brain (network partitioning), map entries in the small cluster will merge into the bigger cluster based on the policy set here. When an entry merge into the cluster, there might an existing entry with the same key already. Values of these entries might be different for that same key. Which value should be set for the key? Conflict is resolved by the policy set here. Default policy is PutIfAbsentMapMergePolicy There are built-in merge policies such as com.hazelcast.map.merge.PassThroughMergePolicy; entry will be overwritten if merging entry exists for the key. com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be added if the merging entry doesn't exist in the cluster. com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins. com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins. --> <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy> <!-- Control caching of de-serialized values. Caching makes query evaluation faster, but it cost memory. Possible Values: NEVER: Never cache deserialized object INDEX-ONLY: Caches values only when they are inserted into an index. ALWAYS: Always cache deserialized values. --> <cache-deserialized-values>INDEX-ONLY</cache-deserialized-values> </map> <!-- Configuration for an event journal. The event journal keeps events related to a specific partition and data structure. For instance, it could keep map add, update, remove, merge events along with the key, old value, new value and so on. --> <event-journal enabled="false"> <mapName>mapName</mapName> <capacity>10000</capacity> <time-to-live-seconds>0</time-to-live-seconds> </event-journal> <event-journal enabled="false"> <cacheName>cacheName</cacheName> <capacity>10000</capacity> <time-to-live-seconds>0</time-to-live-seconds> </event-journal> <!-- Configuration for a merkle tree. The merkle tree is a data structure used for efficient comparison of the difference in the contents of large data structures. The precision of such a comparison mechanism is defined by the depth of the merkle tree. --> <merkle-tree enabled="false"> <mapName>mapName</mapName> <depth>10</depth> </merkle-tree> <multimap name="default"> <backup-count>1</backup-count> <value-collection-type>SET</value-collection-type> <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy> </multimap> <replicatedmap name="default"> <in-memory-format>OBJECT</in-memory-format> <async-fillup>true</async-fillup> <statistics-enabled>true</statistics-enabled> <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy> </replicatedmap> <list name="default"> <backup-count>1</backup-count> <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy> </list> <set name="default"> <backup-count>1</backup-count> <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy> </set> <jobtracker name="default"> <max-thread-size>0</max-thread-size> <!-- Queue size 0 means number of partitions * 2 --> <queue-size>0</queue-size> <retry-count>0</retry-count> <chunk-size>1000</chunk-size> <communicate-stats>true</communicate-stats> <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy> </jobtracker> <semaphore name="default"> <initial-permits>0</initial-permits> <backup-count>1</backup-count> <async-backup-count>0</async-backup-count> </semaphore> <reliable-topic name="default"> <read-batch-size>10</read-batch-size> <topic-overload-policy>BLOCK</topic-overload-policy> <statistics-enabled>true</statistics-enabled> </reliable-topic> <ringbuffer name="default"> <capacity>10000</capacity> <backup-count>1</backup-count> <async-backup-count>0</async-backup-count> <time-to-live-seconds>0</time-to-live-seconds> <in-memory-format>BINARY</in-memory-format> <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy> </ringbuffer> <flake-id-generator name="default"> <prefetch-count>100</prefetch-count> <prefetch-validity-millis>600000</prefetch-validity-millis> <id-offset>0</id-offset> <node-id-offset>0</node-id-offset> <statistics-enabled>true</statistics-enabled> </flake-id-generator> <atomic-long name="default"> <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy> </atomic-long> <atomic-reference name="default"> <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy> </atomic-reference> <count-down-latch name="default"/> <serialization> <portable-version>0</portable-version> </serialization> <services enable-defaults="true"/> <lite-member enabled="false"/> <cardinality-estimator name="default"> <backup-count>1</backup-count> <async-backup-count>0</async-backup-count> <merge-policy batch-size="100">HyperLogLogMergePolicy</merge-policy> </cardinality-estimator> <scheduled-executor-service name="default"> <capacity>100</capacity> <durability>1</durability> <pool-size>16</pool-size> <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy> </scheduled-executor-service> <crdt-replication> <replication-period-millis>1000</replication-period-millis> <max-concurrent-replication-targets>1</max-concurrent-replication-targets> </crdt-replication> <pn-counter name="default"> <replica-count>2147483647</replica-count> <statistics-enabled>true</statistics-enabled> </pn-counter> <cp-subsystem> <cp-member-count>0</cp-member-count> <group-size>0</group-size> <session-time-to-live-seconds>300</session-time-to-live-seconds> <session-heartbeat-interval-seconds>5</session-heartbeat-interval-seconds> <missing-cp-member-auto-removal-seconds>14400</missing-cp-member-auto-removal-seconds> <fail-on-indeterminate-operation-state>false</fail-on-indeterminate-operation-state> <raft-algorithm> <leader-election-timeout-in-millis>2000</leader-election-timeout-in-millis> <leader-heartbeat-period-in-millis>5000</leader-heartbeat-period-in-millis> <max-missed-leader-heartbeat-count>5</max-missed-leader-heartbeat-count> <append-request-max-entry-count>100</append-request-max-entry-count> <commit-index-advance-count-to-snapshot>10000</commit-index-advance-count-to-snapshot> <uncommitted-entry-count-to-reject-new-appends>100</uncommitted-entry-count-to-reject-new-appends> <append-request-backoff-timeout-in-millis>100</append-request-backoff-timeout-in-millis> </raft-algorithm> </cp-subsystem> </hazelcast>
項目目錄結構
集群啟動及原理分析
該進程三個重要的TCP端口。
58327端口連接192.168.2.112:7070用於發送該vertx服務jvm數據。
58331端口用於集群中別的vertx服務通過eventbus向它發數據。
5700端口,實驗中觀察發現集群中啟動n個服務,每個服務都會與其他n-1個服務建立連接,並且通過心跳保持連接,這樣就可以在一個服務掛掉以后,別的服務會有斷開連接提示進而知道集群中某節點掛掉。
該進程一個重要UDP端口,組播端口,用於互相告知連接端口,如果上面的5700,進而建立連接形成集群。
第一個vertx服務啟動界面
第二個vertx服務啟動界面
第三個vertx服務啟動界面
可以看出每個服務都保存一份集群節點
從第一個進程(4980)的套接字看
可以看出第一個進程通過它的集群端口(5700端口)和其他兩個集群中的服務保持了TCP連接從而建立集群
從第二個進程的套接字看(3376)
可以看出第二個進程通過50674端口和第一個進程集群端口(5700端口)保持TCP連接建立集群,通過50787端口連接第三個進程的集群端口(5702端口)保持TCP連接建立集群
看圖可能會直觀一點
資料來自:如下評論 及 https://blog.csdn.net/elinespace/category_7675648.html, https://blog.csdn.net/xk4848123/article/details/107462242