Vertx使用EventBus發送接受自定義對象


先看官方文檔步驟:

需要一個編解碼器,看源碼:

可見內置了需要數據類型的實現,所以發送其他消息可以發送,但是如果發送自定義對象就需要自己實現編解碼邏輯了

一 自定義編解碼器

/**
 * 自定義對象編解碼器,兩個類型可用於消息轉換,即發送對象轉換為接受需要的對象
 */
public class CustomizeMessageCodec implements MessageCodec<OrderMessage, OrderMessage> {
    /**
     * 將消息實體封裝到Buffer用於傳輸
     * 實現方式:使用對象流從對象中獲取Byte數組然后追加到Buffer
     */
    @Override
    public void encodeToWire(Buffer buffer, OrderMessage orderMessage) {
        final ByteArrayOutputStream b = new ByteArrayOutputStream();
        try (ObjectOutputStream o = new ObjectOutputStream(b)){
            o.writeObject(orderMessage);
            o.close();
            buffer.appendBytes(b.toByteArray());
        } catch (IOException e) { e.printStackTrace(); }
    }
    //從Buffer中獲取消息對象
    @Override
    public OrderMessage decodeFromWire(int pos, Buffer buffer) {
        final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes());
        OrderMessage msg = null;
        try (ObjectInputStream o = new ObjectInputStream(b)){ msg = (OrderMessage) o.readObject();
        } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }
        return msg;
    }
    //消息轉換
    @Override
    public OrderMessage transform(OrderMessage orderMessage) {
        System.out.println("消息轉換---");//可對接受消息進行轉換,比如轉換成另一個對象等
        orderMessage.setName("姚振");
        return orderMessage;
    }
    @Override
    public String name() { return "myCodec"; }
    //識別是否是用戶自定義編解碼器,通常為-1
    @Override
    public byte systemCodecID() { return -1; }
    public static MessageCodec create() {
        return new CustomizeMessageCodec();
    }
}

 這里有一個點要注意,nam方法是必須的,且發送的時候一定要指明name

二 發送消息編寫

public class ProducerVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        EventBus eventBus = vertx.eventBus();
        //發布消息(群發)
        eventBus.publish("com.hou", "群發祝福!");
        //發送消息(單發),只會發送注冊此地址的一個,采用不嚴格的輪詢算法選擇
        DeliveryOptions options = new DeliveryOptions();//設置消息頭等
        options.addHeader("some-header", "some-value");
        eventBus.send("com.hou", "單發消息",options,ar->{
            if(ar.succeeded()) System.out.println("收到消費者確認信息:"+ar.result().body());
        });
        //發送自定義對象,需要編解碼器
        eventBus.registerCodec(CustomizeMessageCodec.create());//注冊編碼器
        DeliveryOptions options1 = new DeliveryOptions().setCodecName("myCodec");//必須指定名字
        OrderMessage orderMessage = new OrderMessage();
        orderMessage.setName("侯征");
        eventBus.send("com.hou", orderMessage, options1);
    }
}

 

三 接受消息Verticle編寫

public class ConsumerVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        //每個Vertx實例默認是單例
        EventBus eb = vertx.eventBus();
        //注冊處理器,消費com.hou發送的消息
        MessageConsumer<Object> consumer = eb.consumer("com.hou");//訂閱地址
        consumer.handler(message -> {//消息處理器
            if(message.body() instanceof OrderMessage){
                System.out.println("接受到對象: " + ((OrderMessage) message.body()).getName());
            }
            System.out.println("我是普通消費者: " + message.body());
            message.reply("收到了!"); // 回復生產者,send才能接受
        }).completionHandler(res -> {//注冊完成后通知事件,適用於集群中比較慢的情況下
                System.out.println("注冊處理器結果"+res.succeeded());
        });
        //撤銷處理器
        //consumer.unregister();
    }
}

 

四 注冊部署Verticcle

vertx.deployVerticle(ConsumerVerticle.class.getName());
        TimeUnit.SECONDS.sleep(1);
        vertx.deployVerticle(ProducerVerticle.class.getName());

 

五 測試

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM