Vert.x核心包各功能模塊詳解


源碼:https://github.com/dagger9527/vertx_demo

vert.x核心包是vertx-core。通常,只需要引入這個依賴就足以創建vert.x的http服務了。不過,vert.x為用戶提供了更為強大的擴展模塊,例如:vertx-web(創建http server,提供更強大的http服務),vertx-web-client(http客戶端)。本章內容將圍繞vertx-core模塊做詳細解釋。

首先引入vertx-core依賴

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-core</artifactId>
  <version>4.0.0-milestone4</version>
</dependency>

quickstart模塊已經介紹了如何去創建一個verticle實例,創建一個類去繼承AbstractVerticle,在start方法中創建一個http服務,由verticle實例管理HTTP服務的生命周期。一個verticle就可以看作一個線程。可以創建多個verticle為用戶提供服務。

創建Vertx時配置參數

VertxOptions對象使用

VertxOptions的方法都會返回當前VertxOptions對象實例,所以可以像這樣進行鏈式調用。

Vertx vertx = Vertx.vertx(
    new VertxOptions()
    .setWorkerPoolSize(10)
    .setEventBusOptions(
        new EventBusOptions()
    )
);

AbstractVerticle類

quickstart例子中可以看到,創建一個類通過繼承AbstractVerticle,然后重寫start方法來創建http服務,這樣做可以讓http服務的生命周期完全由verticle管理。

AbstractVerticle類里面有兩個屬性,分別是Vertx和Context的對象,開發者在繼承AbstractVerticle的子類里面可以直接使用這兩個對象。

例如vertx可以創建定時任務,向event bus中推送消息之類的事情。

context可以存儲公用數據,像servlet里面的HttpServletRequest、Session、ApplicationContext這些對象,在請求一個url時保存數據,在請求另一個url時再將數據取出來。

看一個context對象的使用例子,在請求/路徑,向context對象存放name=zhangsan數據。在請求/user拿到name值,並顯示到頁面上。

public class AbstractVerticleDemo extends AbstractVerticle {

  private Logger logger = LoggerFactory.getLogger(AbstractVerticleDemo.class);

  public static void main(String[] args) {
    Vertx.vertx().deployVerticle(AbstractVerticleDemo.class.getName());
  }

  @Override
  public void start() throws Exception {
    // 創建一個http服務器,並監聽8080端口
    vertx.createHttpServer(
      new HttpServerOptions()
        .setPort(8080)
    ).requestHandler(request -> {
      logger.info(request.path());
      String content = "<h1>Hello World</h1>";
      switch (request.path()) {
        case "/":
          context.put("name", "zhangsan");
          break;
        case "/user":
          content = "<h1>Hello World " + context.get("name") + "</h1>";
          break;
        default:
          break;
      }
      request
        .response()
        .putHeader("Content-Type", "text/html;charset=UTF-8")
        .end(content);
    }).listen();
    logger.info("MainVerticle Server Starting..");
  }
}

上面的例子展示了通過request.path()方法獲取請求路徑,通過判斷路徑來執行不同的處理邏輯。

vertx-core並不是處理http請求專業的模塊,后面vert.x提供了更專業的http server模塊 -> vertx-web

vertx創建定時任務

每隔delay毫秒執行一次handler方法。

long setPeriodic(long delay, Handler<Long> handler);

delay毫秒后,只執行一次handler方法。

long setTimer(long delay, Handler<Long> handler);

handler方法的參數和setTimer、setPeriodic的返回值是這個定時任務的id,可以通過這個id來開啟或取消這個任務。

可以通過cancelTimer方法並傳入定時任務的id來取消這個定時任務。

boolean cancelTimer(long id);

詳細例子

public class TimerTest {

  private static Logger logger = LoggerFactory.getLogger(VerticleDemo.class);

  public static void main(String[] args) {
    // 定時器
    timerTest();
  }

  /**
   * 間隔執行,執行多次
   */
  public static void timerTest() {
    Vertx vertx = Vertx.vertx();
    // 每隔1s執行一次,回調函數中的參數timerId和返回值timerId2是一樣的,可以通過這個值關閉這個定時器
    long timerId2 = vertx.setPeriodic(1000, timerId -> {
      System.out.println("當前定時器id是: " + timerId);
    });
    logger.info(timerId2);

    // 情況和setPeriodic方法類似,只不過setTimer方法只執行一次
    long timerId3 = vertx.setTimer(2000, timerId -> {
      // 關閉上面setPeriodic的定時任務
      vertx.cancelTimer(timerId2);
      // 關閉服務,不然控制台會一直等待
      vertx.close();
    });

  }

}

Event Bus

可以在event bus發送消息,進行消息通信。

發送的消息類型可以是基本類型、String、Buffer、json。

注冊消息地址,第一個參數是注冊地址,可通過send方法向這個地址上發送消息。第二個參數是回調參數,接收send方法發送的消息后,handler方法會被執行。

<T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);

向地址發送消息

EventBus send(String address, @Nullable Object message);

簡單的demo

Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();
// 注冊消費者,地址是local.message.address
MessageConsumer<Object> consumer = eventBus.consumer("local.message.address", message -> {
  // 打印消息內容
  logger.info(message.body());
});

/*
 * Vert.x 默認允許任何基本/簡單類型、String 或 Buffer 作為消息發送。
 * 不過在 Vert.x 中的通常做法是使用 JSON 格式來發送消息。
 */
// 向local.message.address發送一條字符串消息
eventBus.send("local.message.address", "First Message");

發送buffer,json及自定義對象的例子在github上,可以通過下載源碼看見。

public class EventBusDemo {

  private Logger logger = LoggerFactory.getLogger(EventBusDemo.class.getName());

  /**
   * send方法向消息消費者發送消息
   * send方法發送的消息只會傳遞給在該地址注冊的其中一個處理器,這就是點對點模式。Vert.x 使用不嚴格的輪詢算法來選擇綁定的處理器。
   */
  @Test
  public void sendMessageTest() throws InterruptedException {
    Vertx vertx = Vertx.vertx();
    EventBus eventBus = vertx.eventBus();
    // 注冊消費者,地址是local.message.address
    MessageConsumer<Object> consumer = eventBus.consumer("local.message.address", message -> {
      // 打印消息內容
      logger.info(message.body());
    });

    /*
     * Vert.x 默認允許任何基本/簡單類型、String 或 Buffer 作為消息發送。
     * 不過在 Vert.x 中的通常做法是使用 JSON 格式來發送消息。
     */
    // 向local.message.address發送一條字符串消息
    eventBus.send("local.message.address", "First Message");
    // 發送Buffer類型消息
    BufferImpl buffer = new BufferImpl();
    buffer.appendString("vert.x 你好?");
    eventBus.send("local.message.address", buffer);
    // 發送一條json類型消息
    eventBus.send("local.message.address", new JsonObject().put("username", "張三").put("age", 18));
    // 向向local.message.address發送一條自定義對象消息,需要自定義消息解碼器,實現MessageCodec
    // 注冊自定義消息解碼器
    CousumerMessageCodec cousumerMessageCodec = new CousumerMessageCodec();
    eventBus.registerCodec(cousumerMessageCodec);

    CousumerSendMessage cousumerSendMessage = new CousumerSendMessage();
    cousumerSendMessage.setUsername("大嘎嘎");
    cousumerSendMessage.setAge(18);
    // 發送自定義消息時,需要指定解碼器
    eventBus.send("local.message.address", cousumerSendMessage, new DeliveryOptions().setCodecName(cousumerMessageCodec.name()));

    // 注銷自定義消息解碼器
    eventBus.unregisterCodec(cousumerMessageCodec.name());

    // 因為eventBus.consumer中的回調方法是異步的,只有再發送消息時回調函數才會觸發,所以這里讓主程序先等待一會兒,不然程序馬上就會關閉,看不到回調函數的輸出結果
    Thread.sleep(1000);

    // 注銷消費處理器
    consumer.unregister(result -> {
      logger.info("消費者是否注銷成功:" + result.succeeded());
    });
  }

  /**
   * publish方法向消息消費者發送消息
   * 消息將會傳遞給所有在地址 local.message.address 上注冊過的處理器。
   */
  @Test
  public void publishMessageTest() throws InterruptedException {
    Vertx vertx = Vertx.vertx();
    EventBus eventBus = vertx.eventBus();
    // 接收send方法的消費者
    MessageConsumer<Object> sendConsumer = eventBus.consumer("send.message.address", message -> {
      logger.info("Send1:" + message.body());
    });
    MessageConsumer<Object> sendConsumer2 = eventBus.consumer("send.message.address", message -> {
      logger.info("Send2:" + message.body());
    });

    // 接收publish方法的消費者
    MessageConsumer<Object> publishConsumer = eventBus.consumer("publish.message.address", message -> {
      logger.info("Publish1:" + message.body());
    });
    MessageConsumer<Object> publishConsumer2 = eventBus.consumer("publish.message.address", message -> {
      logger.info("Publish2:" + message.body());
    });

    // send方法只會給其中一個在send.message.address地址上注冊過的處理器發送消息
    eventBus.send("send.message.address", "Send Message");
    // publish方法會將消息發送給所有在publish.message.address上注冊的處理器
    eventBus.publish("publish.message.address", "Publish Message");

    Thread.sleep(1000);
  }

  /**
   * 設置消息頭
   */
  @Test
  public void headerTest() throws InterruptedException {
    Vertx vertx = Vertx.vertx();
    EventBus eventBus = vertx.eventBus();

    eventBus.consumer("message.address", message -> {
      // 通過headers方法獲取頭信息
      logger.info(message.headers());
      logger.info(message.headers().get("some-header"));
    });

    // 設置頭信息
    DeliveryOptions options = new DeliveryOptions();
    options.addHeader("some-header", "some-value");
    // 還可以設置消息的超時時間,如果在這個時間內沒有收到應答,則會以失敗為參數調用應答處理器,默認30s
    options.setSendTimeout(1000 * 30);

    eventBus.send("message.address", "Yay! Someone kicked a ball", options);

    TimeUnit.SECONDS.sleep(1);
  }

  /**
   * 回復消息
   */
  @Test
  public void replyMessageTest() throws InterruptedException {
    Vertx vertx = Vertx.vertx();
    EventBus eventBus = vertx.eventBus();

    eventBus.consumer("message.address", message -> {
      // 通過headers方法獲取頭信息
      logger.info("handler1: " + message.body());
      message.reply("reply message");
    });

    eventBus.consumer("message.address", message -> {
      // 通過headers方法獲取頭信息
      logger.info("hander2: " + message.body());
      message.reply("reply message");
    });

    // 想接收回復的消息請用request方法,注意request只給其中一個在相同地址上注冊過的處理器發送消息
    eventBus.request("message.address", "Yay! Someone kicked a ball", message -> {
      logger.info(message.result().body());
    });

    TimeUnit.SECONDS.sleep(1);
  }

  /**
   * 集群模式啟動EventBus
   */
  @Test
  public void clusterEventBus() {
    VertxOptions options = new VertxOptions();

    // 對EventBus的一些配置
    options.setEventBusOptions(
      new EventBusOptions()
        .setSsl(true)
        .setKeyStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
        .setTrustStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
        .setClientAuth(ClientAuth.REQUIRED)
        .setClusterPublicHost("whatever")
        .setClusterPublicPort(1234)
    );

    // 創建集群模式的Verticle
    Vertx.clusteredVertx(options, res -> {
      if (res.succeeded()) {
        Vertx vertx = res.result();
        // 獲取集群模式下的EventBus
        EventBus eventBus = vertx.eventBus();
        System.out.println("We now have a clustered event bus: " + eventBus);
      } else {
        System.out.println("Failed: " + res.cause());
      }
    });
  }
}

注*:如果是junit測試方法,vertx線程不會被阻塞,所以會出現已經向eventbus發送了消息,消息處理端還沒接收到消息程序便關閉了。

如果用的是main方法測試,則不會有這個問題。

Buffer

可以通過Buffer.buffer()創建一個buffer對象。這時創建的buffer對象大小默認是0且內容為空,在往buffer里添加內容時buffer會自動擴容。當然也可以指定buffer對象的默認值和默認大小。如果一開始就知道buffer會有一定的內容的話,推薦在一開始便指定buffer的初始值或大小,這樣可以避免在添加的時候又動態擴容buffer而造成不必要的資源消耗。

// 指定初始大小
static Buffer buffer(int initialSizeHint);
// 初始值
static Buffer buffer(String string);
// 初始值和編碼
static Buffer buffer(String string, String enc);
// 初始內容,用bytes數組表示
static Buffer buffer(byte[] bytes);
// 初始內容用ByteBuf表示
static Buffer buffer(ByteBuf byteBuf);

詳細例子請參見BufferDemo測試類

public class BufferDemo {

  Logger logger = LoggerFactory.getLogger(BufferDemo.class);

  /**
   * 創建Buffer對象
   */
  @Test
  public void createBufferTest() {
    // 通過靜態方法創建Buffer對象
    Buffer emptyBuffer = Buffer.buffer();
    emptyBuffer.appendString("Hello Buffer");
    // 創建帶有初始值的Buffer對象,字符默認以UTF-8編碼
    Buffer helloBuffer = Buffer.buffer("Hello Buffer");
    // 創建初始值,並指定編碼
    Buffer encodingBuffer = Buffer.buffer("哈哈哈", "UTF-8");

    // 通過字節數組創建Buffer
    byte[] bytes = new byte[]{1, 3, 5};
    Buffer byteBuffer = Buffer.buffer(bytes);

    // 創建一個指定大小的Buffer,創建時使得這個buffer被分配到了更多的內存,比數據寫入時再動態的調整內存大小效率要高的多
    Buffer customizerSizeBuffer = Buffer.buffer(4);
    customizerSizeBuffer.appendString("Hello Buffer");

    logger.info(emptyBuffer);
    logger.info(helloBuffer);
    logger.info(encodingBuffer);
    logger.info(byteBuffer);
    logger.info(customizerSizeBuffer);
  }

  /**
   * 向Buffer中寫入數據
   */
  @Test
  public void writeBufferTest() {
    Buffer buffer = Buffer.buffer();

    // 向Buffer中寫入數據
    buffer.appendString("string");
    buffer.appendBytes(new byte[]{1, 3, 5});
    buffer.appendDouble(1.3);
    buffer.appendFloat(1.2F);
    buffer.appendInt(1);
    buffer.appendLong(1);
    buffer.appendShort((short) 1);

    // 寫入無符號數
    buffer.appendUnsignedByte((short) 1);
    buffer.appendUnsignedInt(1);
    buffer.appendUnsignedShort(1);

    // 還可以向Buffer中再寫入一個Buffer
    buffer.appendBuffer(Buffer.buffer("append Buffer"));
    logger.info(buffer);

    // 再Buffer指定位置寫入數據,將會替換原來位置上的值
    buffer.setString(6, "哇哈哈");
//    buffer.setUnsignedInt(6, 1);
    logger.info(buffer);
    // 長度
    logger.info(buffer.length());
    // 拷貝
    logger.info(buffer.copy());
    // 裁剪[0,6)
    logger.info(buffer.slice(0, 6));
  }

  /**
   * 從Buffer中讀取數據
   */
  @Test
  public void readerBufferTest() {
    Buffer buffer = Buffer.buffer("Hello Buffer");
    // Hello,讀取開始位置到結束位置的數據[0, 5)
    logger.info(buffer.getString(0, 5));

    // 循環讀取
    for (int i = 0; i < buffer.length(); i += 4) {
      logger.info("int value at " + i + " is: " + buffer.getString(i, i + 4));
    }
  }

JSON

vert.x內置了json模塊,可以方便vert.x程序對json的處理,它內置的json工具類依賴於jackson。

vert.x的JSON模塊有兩個工具類,JsonObject和JsonArray

通過JsonObject的構造方法可以將字符串、map對象、buffer對象轉成json對象

如果要將自定義對象轉成json類型,通過JsonObject.mapFrom(Object)方法,需要引入jackson-databind依賴,否則會拋出java.lang.UnsupportedOperationException: Mapping is not available without Jackson Databind on the classpath異常

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.9</version>    
</dependency>

JsonArray對象的用法與JsonObject類似,詳細demo參見JsonDemo測試類

public class JsonDemo {

  Logger logger = LoggerFactory.getLogger(JsonDemo.class);

  /**
   * 其它類型轉成json
   */
  @Test
  public void otherType2jsonObjectTest() {
    // 將字符串轉換成json對象
    String jsonString = "{\"foo\":\"bar\"}";
    JsonObject strJson = new JsonObject(jsonString);
    logger.info(strJson);

    // 將map轉換成json對象
    Map<String, Object> map = new HashMap<>();
    map.put("foo", "bar");
    map.put("xyz", 3);
    JsonObject mapJson = new JsonObject(map);
    logger.info(mapJson);

    // 將buffer轉成json
    BufferImpl buffer = new BufferImpl();
    buffer.appendString("{\"foo\":\"bar\"}");
    JsonObject bufferJson = new JsonObject(buffer);
    logger.info(bufferJson);

    // 將自定義對象轉換成json
    // 需要添加jackson-databind依賴,否則會發生java.lang.UnsupportedOperationException: Mapping is not available without Jackson Databind on the classpath
    User user = new User();
    user.setUsername("大哥哥");
    user.setAge(18);
    JsonObject customizerJson = JsonObject.mapFrom(user);
    logger.info(customizerJson);
  }

  /**
   * json轉其它類型
   */
  @Test
  public void jsonObject2otherTypeTest() {
    JsonObject json = new JsonObject().put("username", "張嘎").put("age", 6);
    // json轉自定義類型
    User user = json.mapTo(User.class);
    logger.info(user);

    // json轉map類型
    Map<String, Object> map = json.getMap();
    logger.info(map);

    // json轉string
    String str = json.toString();
    logger.info(str);

    // json轉buffer
    Buffer buffer = json.toBuffer();
    logger.info(buffer);
  }

  /**
   * JsonArray的使用和JsonObject大體類似
   */
  @Test
  public void jsonArrayTest() {
    // 通過字符串創建 JSON 數組
    String jsonString = "[\"foo\",\"bar\"]";
    JsonArray array = new JsonArray(jsonString);
    logger.info(array);

    // 使用add方法給json數組添加數據
    JsonArray array2 = new JsonArray().add("a").add("b").add("c");
    logger.info(array2);

    // 獲取第一個元素
    logger.info(array2.getString(0));
  }

}

運行阻塞式代碼

vert.x是異步處理請求的關鍵是因為所有任務都在Event Loop中被輪詢處理,每個任務的處理效率幾乎都是毫秒級的。異步的好處是可以更快的響應用戶請求。如果已知某一個代碼塊是耗時操作,耗時操作會阻塞Event Loop中的其它任務,那么需要使用executeBlocking方法將這塊會造成阻塞的代碼塊包起來。

blocking.complete(obj)聲明當前阻塞代碼塊已執行成功,然后將成功的結果作為傳遞給第二個回調函數,第二個回調函數通過result.result()即可獲得。

多個executeBlocking代碼塊會從上至下依次執行。

詳細例子:

public class ProcessBlockCodeDemo extends AbstractVerticle {

  public static void main(String[] args) {
    Vertx.vertx().deployVerticle(ProcessBlockCodeDemo.class.getName());
  }

  @Override
  public void start() throws Exception {
    // 創建http server
    this.vertx.createHttpServer(
      new HttpServerOptions()
        .setPort(8080)
    ).requestHandler(request -> {
      Map<String, Object> map = new HashMap<>();
      vertx.executeBlocking(blocking -> {
        map.put("sex", "男");
        blocking.complete();
      });
      vertx.executeBlocking(blocking -> {
        try {
          TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        map.put("name", "張三");
        map.put("age", 18);
        blocking.complete(map);
      }, result -> {
        JsonObject entries = new JsonObject((Map) result.result());
        request
          .response()
          .putHeader("Content-Type", "application/json;charset=UTF-8")
          .end(entries.toString());
      });
    }).listen();
  }
}

參考:https://vertxchina.github.io/vertx-translation-chinese/core/Core.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM