應用:使用jeroMQ作為底層通信構件。首先建立一個REQ/REP連接,當需要進行大量數據交互時再建立一個PUB/SUB連接。服務器端采用bind,客戶端采用connect。
問題:在局域網條件下,當建立PUB/SUB連接時,前面的幾個重要的消息可能丟失。
解決方法:
(1)建立連接前,客戶端通過REQ/REP連接通知服務器要進行連接,服務器把客戶端的主題通知客戶端:
//***!!! 在加入前建立連接,避免消息丟失 //***!!! 第二個參數'0'表示請求連接,'1'表示連接成功,'2'表示連接失敗 HelloMessage hello = new HelloMessage("0", "0"); String hmess = IMessage.makeMessageFrame(RTIAmbServiceCode.HelloMessage.getIndex(), MessageRole.INVOKE, hello.toJson(), federationHandle); socket.send(hmess.getBytes(ZMQ.CHARSET), 0); byte[] creply = socket.recv(0);
上面的返回消息就包含了訂購主題
(2)客戶端向服務端提出連接請求
//***!!!建立回調通道 boolean suc = true; if(!messageReceiver.SetMessageWare(String.valueOf(federationHandle), federateHandle, federateReference)) { suc = false; disconectToRTI_core(); }
public boolean SetMessageWare(String federationHandle, String federateHandle, FederateAmbassador federateAmb) { MessageWare messageWare = new MessageWare(federationHandle, federateHandle, federateAmb ); //建立連接 ZMQ.Socket subscriber = context.createSocket(SocketType.SUB); //***!!!增加連接監控 ZMonitor monitor = new ZMonitor(context, subscriber); monitor.add(ZMonitor.Event.ALL); monitor.start(); //在poller中注冊 String errorString = "ERROR: 連接錯誤,無法建立回調通道,.....邦員:." + federationHandle + ":" + federateHandle; //***!!!這里的簡化是為了下面的事件判定 if(protocol.equals("tcp")) { subscriber.connect("tcp://"+ rtiHost + ":" + fport); // if(!subscriber.connect("tcp://"+ rtiHost + ":" + fport)) { // System.out.println(errorString); // return false; // } } else { subscriber.connect("ipc://"+ faIpcAddress); // if(!subscriber.connect("ipc://"+ faIpcAddress)) { // SRTI_core.fireRunningStatus(errorString); // System.out.println(errorString); // return false; //} } //***!!!監控連接事件 int times = 0; ZEvent cEvent = null; int wait_time = RTI.getCBCwaittime(); int once_time = RTI.getCBConcetime(); int round = wait_time / once_time; do { //可能的事件有CONNECTION_CLOSE, CONNECTIION_DELEY,CONNECTED, HANDSHAKE_PROTOCOL, 其中 //CONNECTED表示已經連接,HANDSHAKE_PROTOCOL表示完成了協議握手 cEvent = monitor.nextEvent(once_time); if(cEvent != null) { System.out.println("MessageReceiver連接事件:" + cEvent.toString()); if(cEvent.type == ZMonitor.Event.HANDSHAKE_PROTOCOL) { break; } } } while(times++ != round); //***刪除監控器 try { monitor.close(); monitor.destroy(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.println("ZMonitor未能釋放"); return false; } //***!!!如果沒有成功,返回false if(cEvent == null || cEvent.type != ZMonitor.Event.HANDSHAKE_PROTOCOL) { System.out.println(errorString); return false; } System.out.println("MessageWare與RTI建立PUB/SUB連接: 聯邦---- " + federationHandle + " 邦員---- " + federateHandle +" , 連接端點:" + "tcp://" + rtiHost + ":" + fport); System.out.println("訂購的主題: " + federationHandle + "." + federateHandle); subscriber.subscribe((federationHandle + "." + federateHandle).getBytes(ZMQ.CHARSET)); //subscriber.subscribe(("").getBytes(ZMQ.CHARSET)); int id = poller.register(subscriber, ZMQ.Poller.POLLIN); //將messageWare加入 messageWare.socket = subscriber; synchronized(messageWares) { messageWares.put(id, messageWare); } return true; }
(3)客戶端通過REQ/REP連接告訴服務端連接成功
HelloMessage sucHelloMessage = new HelloMessage(federateHandle+"", suc == true? "1" : "2"); String sucString = IMessage.makeMessageFrame(RTIAmbServiceCode.HelloMessage.getIndex(), MessageRole.INVOKE, sucHelloMessage.toJson(), federationHandle); socket.send(sucString.getBytes(ZMQ.CHARSET), 0); socket.recv(0); if(suc == false) throw new RTIinternalError("ERROR: 回調連接錯誤......" + federationHandle + ":" + federateHandle); //String crepString = new String(creply); System.out.println("RTIambassador:SUB回調連接建立:" + hmess);
(4)此時服務器就可以開始發送PUB消息了