使用ZMonitor解決jeroMQ(ZMQ)應用中PUB/SUB消息丟失的問題


應用:使用jeroMQ作為底層通信構件。首先建立一個REQ/REP連接,當需要進行大量數據交互時再建立一個PUB/SUB連接。服務器端采用bind,客戶端采用connect。

問題:在局域網條件下,當建立PUB/SUB連接時,前面的幾個重要的消息可能丟失。

解決方法:

(1)建立連接前,客戶端通過REQ/REP連接通知服務器要進行連接,服務器把客戶端的主題通知客戶端:

        //***!!! 在加入前建立連接,避免消息丟失
        //***!!! 第二個參數'0'表示請求連接,'1'表示連接成功,'2'表示連接失敗
        HelloMessage hello = new HelloMessage("0", "0");
        String hmess = IMessage.makeMessageFrame(RTIAmbServiceCode.HelloMessage.getIndex(),
                MessageRole.INVOKE, hello.toJson(), federationHandle);

        socket.send(hmess.getBytes(ZMQ.CHARSET), 0);
        byte[] creply = socket.recv(0);

上面的返回消息就包含了訂購主題

(2)客戶端向服務端提出連接請求

            //***!!!建立回調通道
            boolean suc = true;
            if(!messageReceiver.SetMessageWare(String.valueOf(federationHandle), federateHandle, federateReference)) {
                suc = false;
                disconectToRTI_core();
            }

 

    public boolean SetMessageWare(String federationHandle, String federateHandle, FederateAmbassador federateAmb) {
        MessageWare messageWare = new MessageWare(federationHandle, federateHandle, federateAmb );
        //建立連接
        ZMQ.Socket subscriber = context.createSocket(SocketType.SUB);
        //***!!!增加連接監控
        ZMonitor monitor = new ZMonitor(context, subscriber);
        monitor.add(ZMonitor.Event.ALL);
        monitor.start();
        
        //在poller中注冊
        String errorString = "ERROR: 連接錯誤,無法建立回調通道,.....邦員:." + federationHandle + ":" + federateHandle;
        //***!!!這里的簡化是為了下面的事件判定
        if(protocol.equals("tcp")) {
            subscriber.connect("tcp://"+ rtiHost + ":" + fport);
//            if(!subscriber.connect("tcp://"+ rtiHost + ":" + fport)) {
//                System.out.println(errorString);
//                return false;
//            }
        }
        else {
            subscriber.connect("ipc://"+ faIpcAddress);
//            if(!subscriber.connect("ipc://"+ faIpcAddress)) {
//                SRTI_core.fireRunningStatus(errorString);
//                System.out.println(errorString);
//                return false;
            //}           
        }
        //***!!!監控連接事件
        int times = 0;
        ZEvent cEvent = null;
        int wait_time = RTI.getCBCwaittime();
        int once_time = RTI.getCBConcetime();
        int round = wait_time / once_time;
        do {    
            //可能的事件有CONNECTION_CLOSE, CONNECTIION_DELEY,CONNECTED, HANDSHAKE_PROTOCOL, 其中
            //CONNECTED表示已經連接,HANDSHAKE_PROTOCOL表示完成了協議握手
            cEvent = monitor.nextEvent(once_time);
            if(cEvent != null) {
                System.out.println("MessageReceiver連接事件:" + cEvent.toString());
                if(cEvent.type == ZMonitor.Event.HANDSHAKE_PROTOCOL) {
                    break;
                }
            }
        }
        while(times++ != round);
        
        //***刪除監控器
        try {
            monitor.close();
            monitor.destroy();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            System.out.println("ZMonitor未能釋放");
            return false;
        }

        //***!!!如果沒有成功,返回false
 
        if(cEvent == null || cEvent.type != ZMonitor.Event.HANDSHAKE_PROTOCOL) {
            System.out.println(errorString);
            return false;
        }
        
        System.out.println("MessageWare與RTI建立PUB/SUB連接: 聯邦---- " + federationHandle + "  邦員---- " + federateHandle +" , 連接端點:" + "tcp://" + rtiHost + ":" + fport);
        System.out.println("訂購的主題: " + federationHandle + "." + federateHandle);
        subscriber.subscribe((federationHandle + "." + federateHandle).getBytes(ZMQ.CHARSET));        
        //subscriber.subscribe(("").getBytes(ZMQ.CHARSET));       
        int id = poller.register(subscriber, ZMQ.Poller.POLLIN);

        //將messageWare加入
        messageWare.socket = subscriber; 
        synchronized(messageWares) {

            messageWares.put(id, messageWare);
        }
        return true;
    }
 

(3)客戶端通過REQ/REP連接告訴服務端連接成功

            HelloMessage sucHelloMessage = new HelloMessage(federateHandle+"", suc == true? "1" : "2");
            String sucString = IMessage.makeMessageFrame(RTIAmbServiceCode.HelloMessage.getIndex(),
                    MessageRole.INVOKE, sucHelloMessage.toJson(), federationHandle);

            socket.send(sucString.getBytes(ZMQ.CHARSET), 0);
            socket.recv(0);
            if(suc == false)
                throw new RTIinternalError("ERROR: 回調連接錯誤......" + federationHandle + ":" + federateHandle);
            //String crepString = new String(creply);
            System.out.println("RTIambassador:SUB回調連接建立:" + hmess);

(4)此時服務器就可以開始發送PUB消息了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM