先看官方文檔步驟:
需要一個編解碼器,看源碼:
可見內置了需要數據類型的實現,所以發送其他消息可以發送,但是如果發送自定義對象就需要自己實現編解碼邏輯了
一 自定義編解碼器
/** * 自定義對象編解碼器,兩個類型可用於消息轉換,即發送對象轉換為接受需要的對象 */ public class CustomizeMessageCodec implements MessageCodec<OrderMessage, OrderMessage> { /** * 將消息實體封裝到Buffer用於傳輸 * 實現方式:使用對象流從對象中獲取Byte數組然后追加到Buffer */ @Override public void encodeToWire(Buffer buffer, OrderMessage orderMessage) { final ByteArrayOutputStream b = new ByteArrayOutputStream(); try (ObjectOutputStream o = new ObjectOutputStream(b)){ o.writeObject(orderMessage); o.close(); buffer.appendBytes(b.toByteArray()); } catch (IOException e) { e.printStackTrace(); } } //從Buffer中獲取消息對象 @Override public OrderMessage decodeFromWire(int pos, Buffer buffer) { final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes()); OrderMessage msg = null; try (ObjectInputStream o = new ObjectInputStream(b)){ msg = (OrderMessage) o.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return msg; } //消息轉換 @Override public OrderMessage transform(OrderMessage orderMessage) { System.out.println("消息轉換---");//可對接受消息進行轉換,比如轉換成另一個對象等 orderMessage.setName("姚振"); return orderMessage; } @Override public String name() { return "myCodec"; } //識別是否是用戶自定義編解碼器,通常為-1 @Override public byte systemCodecID() { return -1; } public static MessageCodec create() { return new CustomizeMessageCodec(); } }
這里有一個點要注意,nam方法是必須的,且發送的時候一定要指明name
二 發送消息編寫
public class ProducerVerticle extends AbstractVerticle { @Override public void start() throws Exception { EventBus eventBus = vertx.eventBus(); //發布消息(群發) eventBus.publish("com.hou", "群發祝福!"); //發送消息(單發),只會發送注冊此地址的一個,采用不嚴格的輪詢算法選擇 DeliveryOptions options = new DeliveryOptions();//設置消息頭等 options.addHeader("some-header", "some-value"); eventBus.send("com.hou", "單發消息",options,ar->{ if(ar.succeeded()) System.out.println("收到消費者確認信息:"+ar.result().body()); }); //發送自定義對象,需要編解碼器 eventBus.registerCodec(CustomizeMessageCodec.create());//注冊編碼器 DeliveryOptions options1 = new DeliveryOptions().setCodecName("myCodec");//必須指定名字 OrderMessage orderMessage = new OrderMessage(); orderMessage.setName("侯征"); eventBus.send("com.hou", orderMessage, options1); } }
三 接受消息Verticle編寫
public class ConsumerVerticle extends AbstractVerticle { @Override public void start() throws Exception { //每個Vertx實例默認是單例 EventBus eb = vertx.eventBus(); //注冊處理器,消費com.hou發送的消息 MessageConsumer<Object> consumer = eb.consumer("com.hou");//訂閱地址 consumer.handler(message -> {//消息處理器 if(message.body() instanceof OrderMessage){ System.out.println("接受到對象: " + ((OrderMessage) message.body()).getName()); } System.out.println("我是普通消費者: " + message.body()); message.reply("收到了!"); // 回復生產者,send才能接受 }).completionHandler(res -> {//注冊完成后通知事件,適用於集群中比較慢的情況下 System.out.println("注冊處理器結果"+res.succeeded()); }); //撤銷處理器 //consumer.unregister(); } }
四 注冊部署Verticcle
vertx.deployVerticle(ConsumerVerticle.class.getName()); TimeUnit.SECONDS.sleep(1); vertx.deployVerticle(ProducerVerticle.class.getName());
五 測試