源碼:https://github.com/dagger9527/vertx_demo
vert.x核心包是vertx-core。通常,只需要引入這個依賴就足以創建vert.x的http服務了。不過,vert.x為用戶提供了更為強大的擴展模塊,例如:vertx-web(創建http server,提供更強大的http服務),vertx-web-client(http客戶端)。本章內容將圍繞vertx-core模塊做詳細解釋。
首先引入vertx-core依賴
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.0.0-milestone4</version>
</dependency>
在quickstart模塊已經介紹了如何去創建一個verticle實例,創建一個類去繼承AbstractVerticle,在start方法中創建一個http服務,由verticle實例管理HTTP服務的生命周期。一個verticle就可以看作一個線程。可以創建多個verticle為用戶提供服務。
創建Vertx時配置參數
VertxOptions對象使用
VertxOptions的方法都會返回當前VertxOptions對象實例,所以可以像這樣進行鏈式調用。
Vertx vertx = Vertx.vertx(
new VertxOptions()
.setWorkerPoolSize(10)
.setEventBusOptions(
new EventBusOptions()
)
);
AbstractVerticle類
在quickstart例子中可以看到,創建一個類通過繼承AbstractVerticle,然后重寫start方法來創建http服務,這樣做可以讓http服務的生命周期完全由verticle管理。
AbstractVerticle類里面有兩個屬性,分別是Vertx和Context的對象,開發者在繼承AbstractVerticle的子類里面可以直接使用這兩個對象。
例如vertx可以創建定時任務,向event bus中推送消息之類的事情。
context可以存儲公用數據,像servlet里面的HttpServletRequest、Session、ApplicationContext這些對象,在請求一個url時保存數據,在請求另一個url時再將數據取出來。
看一個context對象的使用例子,在請求/路徑,向context對象存放name=zhangsan數據。在請求/user拿到name值,並顯示到頁面上。
public class AbstractVerticleDemo extends AbstractVerticle {
private Logger logger = LoggerFactory.getLogger(AbstractVerticleDemo.class);
public static void main(String[] args) {
Vertx.vertx().deployVerticle(AbstractVerticleDemo.class.getName());
}
@Override
public void start() throws Exception {
// 創建一個http服務器,並監聽8080端口
vertx.createHttpServer(
new HttpServerOptions()
.setPort(8080)
).requestHandler(request -> {
logger.info(request.path());
String content = "<h1>Hello World</h1>";
switch (request.path()) {
case "/":
context.put("name", "zhangsan");
break;
case "/user":
content = "<h1>Hello World " + context.get("name") + "</h1>";
break;
default:
break;
}
request
.response()
.putHeader("Content-Type", "text/html;charset=UTF-8")
.end(content);
}).listen();
logger.info("MainVerticle Server Starting..");
}
}
上面的例子展示了通過request.path()方法獲取請求路徑,通過判斷路徑來執行不同的處理邏輯。
vertx-core並不是處理http請求專業的模塊,后面vert.x提供了更專業的http server模塊 -> vertx-web
vertx創建定時任務
每隔delay毫秒執行一次handler方法。
long setPeriodic(long delay, Handler<Long> handler);
delay毫秒后,只執行一次handler方法。
long setTimer(long delay, Handler<Long> handler);
handler方法的參數和setTimer、setPeriodic的返回值是這個定時任務的id,可以通過這個id來開啟或取消這個任務。
可以通過cancelTimer方法並傳入定時任務的id來取消這個定時任務。
boolean cancelTimer(long id);
詳細例子
public class TimerTest {
private static Logger logger = LoggerFactory.getLogger(VerticleDemo.class);
public static void main(String[] args) {
// 定時器
timerTest();
}
/**
* 間隔執行,執行多次
*/
public static void timerTest() {
Vertx vertx = Vertx.vertx();
// 每隔1s執行一次,回調函數中的參數timerId和返回值timerId2是一樣的,可以通過這個值關閉這個定時器
long timerId2 = vertx.setPeriodic(1000, timerId -> {
System.out.println("當前定時器id是: " + timerId);
});
logger.info(timerId2);
// 情況和setPeriodic方法類似,只不過setTimer方法只執行一次
long timerId3 = vertx.setTimer(2000, timerId -> {
// 關閉上面setPeriodic的定時任務
vertx.cancelTimer(timerId2);
// 關閉服務,不然控制台會一直等待
vertx.close();
});
}
}
Event Bus
可以在event bus發送消息,進行消息通信。
發送的消息類型可以是基本類型、String、Buffer、json。
注冊消息地址,第一個參數是注冊地址,可通過send方法向這個地址上發送消息。第二個參數是回調參數,接收send方法發送的消息后,handler方法會被執行。
<T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);
向地址發送消息
EventBus send(String address, @Nullable Object message);
簡單的demo
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
// 注冊消費者,地址是local.message.address
MessageConsumer<Object> consumer = eventBus.consumer("local.message.address", message -> {
// 打印消息內容
logger.info(message.body());
});
/*
* Vert.x 默認允許任何基本/簡單類型、String 或 Buffer 作為消息發送。
* 不過在 Vert.x 中的通常做法是使用 JSON 格式來發送消息。
*/
// 向local.message.address發送一條字符串消息
eventBus.send("local.message.address", "First Message");
發送buffer,json及自定義對象的例子在github上,可以通過下載源碼看見。
public class EventBusDemo {
private Logger logger = LoggerFactory.getLogger(EventBusDemo.class.getName());
/**
* send方法向消息消費者發送消息
* send方法發送的消息只會傳遞給在該地址注冊的其中一個處理器,這就是點對點模式。Vert.x 使用不嚴格的輪詢算法來選擇綁定的處理器。
*/
@Test
public void sendMessageTest() throws InterruptedException {
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
// 注冊消費者,地址是local.message.address
MessageConsumer<Object> consumer = eventBus.consumer("local.message.address", message -> {
// 打印消息內容
logger.info(message.body());
});
/*
* Vert.x 默認允許任何基本/簡單類型、String 或 Buffer 作為消息發送。
* 不過在 Vert.x 中的通常做法是使用 JSON 格式來發送消息。
*/
// 向local.message.address發送一條字符串消息
eventBus.send("local.message.address", "First Message");
// 發送Buffer類型消息
BufferImpl buffer = new BufferImpl();
buffer.appendString("vert.x 你好?");
eventBus.send("local.message.address", buffer);
// 發送一條json類型消息
eventBus.send("local.message.address", new JsonObject().put("username", "張三").put("age", 18));
// 向向local.message.address發送一條自定義對象消息,需要自定義消息解碼器,實現MessageCodec
// 注冊自定義消息解碼器
CousumerMessageCodec cousumerMessageCodec = new CousumerMessageCodec();
eventBus.registerCodec(cousumerMessageCodec);
CousumerSendMessage cousumerSendMessage = new CousumerSendMessage();
cousumerSendMessage.setUsername("大嘎嘎");
cousumerSendMessage.setAge(18);
// 發送自定義消息時,需要指定解碼器
eventBus.send("local.message.address", cousumerSendMessage, new DeliveryOptions().setCodecName(cousumerMessageCodec.name()));
// 注銷自定義消息解碼器
eventBus.unregisterCodec(cousumerMessageCodec.name());
// 因為eventBus.consumer中的回調方法是異步的,只有再發送消息時回調函數才會觸發,所以這里讓主程序先等待一會兒,不然程序馬上就會關閉,看不到回調函數的輸出結果
Thread.sleep(1000);
// 注銷消費處理器
consumer.unregister(result -> {
logger.info("消費者是否注銷成功:" + result.succeeded());
});
}
/**
* publish方法向消息消費者發送消息
* 消息將會傳遞給所有在地址 local.message.address 上注冊過的處理器。
*/
@Test
public void publishMessageTest() throws InterruptedException {
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
// 接收send方法的消費者
MessageConsumer<Object> sendConsumer = eventBus.consumer("send.message.address", message -> {
logger.info("Send1:" + message.body());
});
MessageConsumer<Object> sendConsumer2 = eventBus.consumer("send.message.address", message -> {
logger.info("Send2:" + message.body());
});
// 接收publish方法的消費者
MessageConsumer<Object> publishConsumer = eventBus.consumer("publish.message.address", message -> {
logger.info("Publish1:" + message.body());
});
MessageConsumer<Object> publishConsumer2 = eventBus.consumer("publish.message.address", message -> {
logger.info("Publish2:" + message.body());
});
// send方法只會給其中一個在send.message.address地址上注冊過的處理器發送消息
eventBus.send("send.message.address", "Send Message");
// publish方法會將消息發送給所有在publish.message.address上注冊的處理器
eventBus.publish("publish.message.address", "Publish Message");
Thread.sleep(1000);
}
/**
* 設置消息頭
*/
@Test
public void headerTest() throws InterruptedException {
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
eventBus.consumer("message.address", message -> {
// 通過headers方法獲取頭信息
logger.info(message.headers());
logger.info(message.headers().get("some-header"));
});
// 設置頭信息
DeliveryOptions options = new DeliveryOptions();
options.addHeader("some-header", "some-value");
// 還可以設置消息的超時時間,如果在這個時間內沒有收到應答,則會以失敗為參數調用應答處理器,默認30s
options.setSendTimeout(1000 * 30);
eventBus.send("message.address", "Yay! Someone kicked a ball", options);
TimeUnit.SECONDS.sleep(1);
}
/**
* 回復消息
*/
@Test
public void replyMessageTest() throws InterruptedException {
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
eventBus.consumer("message.address", message -> {
// 通過headers方法獲取頭信息
logger.info("handler1: " + message.body());
message.reply("reply message");
});
eventBus.consumer("message.address", message -> {
// 通過headers方法獲取頭信息
logger.info("hander2: " + message.body());
message.reply("reply message");
});
// 想接收回復的消息請用request方法,注意request只給其中一個在相同地址上注冊過的處理器發送消息
eventBus.request("message.address", "Yay! Someone kicked a ball", message -> {
logger.info(message.result().body());
});
TimeUnit.SECONDS.sleep(1);
}
/**
* 集群模式啟動EventBus
*/
@Test
public void clusterEventBus() {
VertxOptions options = new VertxOptions();
// 對EventBus的一些配置
options.setEventBusOptions(
new EventBusOptions()
.setSsl(true)
.setKeyStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
.setTrustStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
.setClientAuth(ClientAuth.REQUIRED)
.setClusterPublicHost("whatever")
.setClusterPublicPort(1234)
);
// 創建集群模式的Verticle
Vertx.clusteredVertx(options, res -> {
if (res.succeeded()) {
Vertx vertx = res.result();
// 獲取集群模式下的EventBus
EventBus eventBus = vertx.eventBus();
System.out.println("We now have a clustered event bus: " + eventBus);
} else {
System.out.println("Failed: " + res.cause());
}
});
}
}
注*:如果是junit測試方法,vertx線程不會被阻塞,所以會出現已經向eventbus發送了消息,消息處理端還沒接收到消息程序便關閉了。
如果用的是main方法測試,則不會有這個問題。
Buffer
可以通過Buffer.buffer()創建一個buffer對象。這時創建的buffer對象大小默認是0且內容為空,在往buffer里添加內容時buffer會自動擴容。當然也可以指定buffer對象的默認值和默認大小。如果一開始就知道buffer會有一定的內容的話,推薦在一開始便指定buffer的初始值或大小,這樣可以避免在添加的時候又動態擴容buffer而造成不必要的資源消耗。
// 指定初始大小
static Buffer buffer(int initialSizeHint);
// 初始值
static Buffer buffer(String string);
// 初始值和編碼
static Buffer buffer(String string, String enc);
// 初始內容,用bytes數組表示
static Buffer buffer(byte[] bytes);
// 初始內容用ByteBuf表示
static Buffer buffer(ByteBuf byteBuf);
詳細例子請參見BufferDemo測試類
public class BufferDemo {
Logger logger = LoggerFactory.getLogger(BufferDemo.class);
/**
* 創建Buffer對象
*/
@Test
public void createBufferTest() {
// 通過靜態方法創建Buffer對象
Buffer emptyBuffer = Buffer.buffer();
emptyBuffer.appendString("Hello Buffer");
// 創建帶有初始值的Buffer對象,字符默認以UTF-8編碼
Buffer helloBuffer = Buffer.buffer("Hello Buffer");
// 創建初始值,並指定編碼
Buffer encodingBuffer = Buffer.buffer("哈哈哈", "UTF-8");
// 通過字節數組創建Buffer
byte[] bytes = new byte[]{1, 3, 5};
Buffer byteBuffer = Buffer.buffer(bytes);
// 創建一個指定大小的Buffer,創建時使得這個buffer被分配到了更多的內存,比數據寫入時再動態的調整內存大小效率要高的多
Buffer customizerSizeBuffer = Buffer.buffer(4);
customizerSizeBuffer.appendString("Hello Buffer");
logger.info(emptyBuffer);
logger.info(helloBuffer);
logger.info(encodingBuffer);
logger.info(byteBuffer);
logger.info(customizerSizeBuffer);
}
/**
* 向Buffer中寫入數據
*/
@Test
public void writeBufferTest() {
Buffer buffer = Buffer.buffer();
// 向Buffer中寫入數據
buffer.appendString("string");
buffer.appendBytes(new byte[]{1, 3, 5});
buffer.appendDouble(1.3);
buffer.appendFloat(1.2F);
buffer.appendInt(1);
buffer.appendLong(1);
buffer.appendShort((short) 1);
// 寫入無符號數
buffer.appendUnsignedByte((short) 1);
buffer.appendUnsignedInt(1);
buffer.appendUnsignedShort(1);
// 還可以向Buffer中再寫入一個Buffer
buffer.appendBuffer(Buffer.buffer("append Buffer"));
logger.info(buffer);
// 再Buffer指定位置寫入數據,將會替換原來位置上的值
buffer.setString(6, "哇哈哈");
// buffer.setUnsignedInt(6, 1);
logger.info(buffer);
// 長度
logger.info(buffer.length());
// 拷貝
logger.info(buffer.copy());
// 裁剪[0,6)
logger.info(buffer.slice(0, 6));
}
/**
* 從Buffer中讀取數據
*/
@Test
public void readerBufferTest() {
Buffer buffer = Buffer.buffer("Hello Buffer");
// Hello,讀取開始位置到結束位置的數據[0, 5)
logger.info(buffer.getString(0, 5));
// 循環讀取
for (int i = 0; i < buffer.length(); i += 4) {
logger.info("int value at " + i + " is: " + buffer.getString(i, i + 4));
}
}
JSON
vert.x內置了json模塊,可以方便vert.x程序對json的處理,它內置的json工具類依賴於jackson。
vert.x的JSON模塊有兩個工具類,JsonObject和JsonArray
通過JsonObject的構造方法可以將字符串、map對象、buffer對象轉成json對象
如果要將自定義對象轉成json類型,通過JsonObject.mapFrom(Object)方法,需要引入jackson-databind依賴,否則會拋出java.lang.UnsupportedOperationException: Mapping is not available without Jackson Databind on the classpath異常
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
JsonArray對象的用法與JsonObject類似,詳細demo參見JsonDemo測試類
public class JsonDemo {
Logger logger = LoggerFactory.getLogger(JsonDemo.class);
/**
* 其它類型轉成json
*/
@Test
public void otherType2jsonObjectTest() {
// 將字符串轉換成json對象
String jsonString = "{\"foo\":\"bar\"}";
JsonObject strJson = new JsonObject(jsonString);
logger.info(strJson);
// 將map轉換成json對象
Map<String, Object> map = new HashMap<>();
map.put("foo", "bar");
map.put("xyz", 3);
JsonObject mapJson = new JsonObject(map);
logger.info(mapJson);
// 將buffer轉成json
BufferImpl buffer = new BufferImpl();
buffer.appendString("{\"foo\":\"bar\"}");
JsonObject bufferJson = new JsonObject(buffer);
logger.info(bufferJson);
// 將自定義對象轉換成json
// 需要添加jackson-databind依賴,否則會發生java.lang.UnsupportedOperationException: Mapping is not available without Jackson Databind on the classpath
User user = new User();
user.setUsername("大哥哥");
user.setAge(18);
JsonObject customizerJson = JsonObject.mapFrom(user);
logger.info(customizerJson);
}
/**
* json轉其它類型
*/
@Test
public void jsonObject2otherTypeTest() {
JsonObject json = new JsonObject().put("username", "張嘎").put("age", 6);
// json轉自定義類型
User user = json.mapTo(User.class);
logger.info(user);
// json轉map類型
Map<String, Object> map = json.getMap();
logger.info(map);
// json轉string
String str = json.toString();
logger.info(str);
// json轉buffer
Buffer buffer = json.toBuffer();
logger.info(buffer);
}
/**
* JsonArray的使用和JsonObject大體類似
*/
@Test
public void jsonArrayTest() {
// 通過字符串創建 JSON 數組
String jsonString = "[\"foo\",\"bar\"]";
JsonArray array = new JsonArray(jsonString);
logger.info(array);
// 使用add方法給json數組添加數據
JsonArray array2 = new JsonArray().add("a").add("b").add("c");
logger.info(array2);
// 獲取第一個元素
logger.info(array2.getString(0));
}
}
運行阻塞式代碼
vert.x是異步處理請求的關鍵是因為所有任務都在Event Loop中被輪詢處理,每個任務的處理效率幾乎都是毫秒級的。異步的好處是可以更快的響應用戶請求。如果已知某一個代碼塊是耗時操作,耗時操作會阻塞Event Loop中的其它任務,那么需要使用executeBlocking方法將這塊會造成阻塞的代碼塊包起來。
blocking.complete(obj)聲明當前阻塞代碼塊已執行成功,然后將成功的結果作為傳遞給第二個回調函數,第二個回調函數通過result.result()即可獲得。
多個executeBlocking代碼塊會從上至下依次執行。
詳細例子:
public class ProcessBlockCodeDemo extends AbstractVerticle {
public static void main(String[] args) {
Vertx.vertx().deployVerticle(ProcessBlockCodeDemo.class.getName());
}
@Override
public void start() throws Exception {
// 創建http server
this.vertx.createHttpServer(
new HttpServerOptions()
.setPort(8080)
).requestHandler(request -> {
Map<String, Object> map = new HashMap<>();
vertx.executeBlocking(blocking -> {
map.put("sex", "男");
blocking.complete();
});
vertx.executeBlocking(blocking -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put("name", "張三");
map.put("age", 18);
blocking.complete(map);
}, result -> {
JsonObject entries = new JsonObject((Map) result.result());
request
.response()
.putHeader("Content-Type", "application/json;charset=UTF-8")
.end(entries.toString());
});
}).listen();
}
}
參考:https://vertxchina.github.io/vertx-translation-chinese/core/Core.html
