一開始使用jeroMQ,由於java會自動回收資源,所以socket對象及context對象的清理比較簡單。斷開連接和關閉連接不需要考慮連接的狀態。所以程序比較簡單。
但在C/C++環境下,zmq連接的處理需要考慮如下情況:
(1)在zmq_recv()的阻塞狀態下,不可能斷開連接。
(2)在連接沒有斷開和關閉的情況下,context不能關閉(呈阻塞狀態)。
(3)在上面兩種情況下,可能會有內存泄漏的問題。
為了解決以上問題,在C/C++環境下,必須:
(1)如果沒有把握保證zmq_recv()一定能夠接收到消息,則必須設置其ZMQ_RCVTIMEO值,使其在接不到消息時可以退出阻塞,以判斷是否程序需要退出。
int recvTime = 500; zmq_setsockopt(_socket, ZMQ_RCVTIMEO, &recvTime, sizeof(recvTime));
ret1 = zmq_recv(_zsocket, sub, 128, 0); //接收消息 if(ret1 == -1) { int error = zmq_errno(); cout << "ERROR: messageReceiver socket: wrong message --" << zmq_strerror(error) << endl; if(getSimEnd()) { //closedeliver(); //stop deliver closeCallbackLink(); //stop the callback link cout << "messageReceiver thread closed......2" << endl; return; } }
void MessageReceiver::closeCallbackLink() { //QMutexLocker locker(&_mutex); if(_zsocket == nullptr) return; //setSimEnd(); //Sleep(500); if(_type == tcp) { char abuf[128]; memset(abuf, '\0', 128*sizeof(char)); sprintf_s(abuf, 128*sizeof(char), "tcp://%s:%d", _rtiHost.c_str(), fport); zmq_disconnect(this->_zsocket, abuf); zmq_close(_zsocket); _zsocket = NULL; return; } if(_type == ipc) { char abuf[128]; memset(abuf, '\0', 128*sizeof(char)); sprintf_s(abuf, 128*sizeof(char), "ipc://%s", _rtiIpcAddress.c_str()); zmq_disconnect(this->_zsocket, abuf); zmq_close(_zsocket); _zsocket = nullptr; return; } }
(2)在退出阻塞時,要判定是否程序需要退出,如果不退出,則再調用zmq_recv()接收消息,如此循環。
(3)為了加快斷開和關閉連接的速度,設置有限的ZMQ_LINGER值,使連接快速斷開。
int linger = 0; zmq_setsockopt(_socket, ZMQ_LINGER, &linger, sizeof(int64_t));
(4)在(1)中判定需要退出時,應當斷開連接和關閉連接,釋放socket。
(5)要想關閉context,必須關閉其上的所有socket,否則關閉context時會阻塞,導致程序無法退出。
(6)如果連接在線程內,則退出線程和關閉連接盡量同時完成。
在一個程序中,我在一個context上建立了兩個連接,分別被兩個線程使用,斷開連接並關閉socket后,發現關閉context時總是阻塞,百思不得其解。后來偶然發現,在建立其中一個連接時,使用了zmq_socket_monitor建立了一個監視連接,這個sokcet也是建立在上面的context上的,豁然開朗。將該連接斷開並將其socket關閉后,這個context被順利關閉。