這里至於ZeroMQ是什么就不過多贅述了,這是基礎的知識,不懂的讀者可以先去百度一下,這里也提供一個老哥的詳細解釋,貼上鏈接大家可以去看看https://www.cnblogs.com/chenny7/p/6245236.html。
先說一下為什么要這樣處理,本人是從事新能源電池檢測相關工作的,屬於工控的領域,有大量的指令下發(通過上位機的控制),指令下發以后會接收到下位機(電源模塊)上傳的數據,是一對多的模式(一個上位機,面對着多個電源模塊)。這里就有一個並發的問題,同時要接收多個電源模塊上傳的數據,而且是1秒鍾一條數據,這個並發量相對來說比較大,為了性能考慮,防止擁塞這里用到了ZeroMQ消息隊列,ZMQ對這一塊的處理相對合理。
這里只是一個模擬的程序,相當於一個demo,實際開發工程中要比這復制的多,因為上拋的數據還分很多種,比如記錄數據,告警數據,針床的數據等等。
用QT做了一個簡單的界面,這里的姓名 年齡 學號模擬的是上位機控制指令,當然實際的情況是上位機下發有相對應的協議,這個協議必須是上位機和下位機定義好。
首先是定義初始化上下文,這是ZeroMQ中必須要做的工作。
void ZeroMQComm::InitCtx() { //1.建上下文 m_ctx = zmq_init(1); if (!m_ctx) { printf("build zmq_init():%s\n", zmq_strerror(errno)); return; } else { printf("創建上下文成功!\n"); } }
然后初始化socket,其實可以把ZMQ看成是對socket的進一步封裝,這里用到了3個端口,5000,5001,5002。為什么要用這三個端口呢?每一個端口代表了不同的信息,比如5000端口是作為的PUSH端,相當於一個分發任務的角色,這個demo中
當點擊發送指令的時候就會利用這個端口來給PULL端發送數據。5001端口(作為PULL端)是接收及格數據,5002端口(作為PULL端)是接收不及格數據,這兩個端口模式的是在實際的項目中的記錄數據和告警數據,記錄數據是通過5001端口上拋,告
警數據是由5002端口上拋。下面代碼中綁定的IP可以忽略,我是按照我測試的環境中來配置IP,讀者要進行的時候需要根據自己的實際情況來!
void ZeroMQComm::InitSockets() { //1.創建發布Socket通訊對象 //該對象用於下中位機發送控制指令 if((m_sokt5000 = zmq_socket(m_ctx, ZMQ_PUSH)) == NULL) { printf("%s\n", zmq_strerror(errno)); zmq_close(m_sokt5000); zmq_ctx_term(m_ctx); return; } else { printf("創建發送端口socket成功!\n"); } int tmp_bet = zmq_bind(m_sokt5000, "tcp://10.168.205.73:5000"); //printf("zmq_bind=%d\n", tmp_bet); if (tmp_bet < 0) { printf("bind port %s\n", zmq_strerror(errno)); return; } else { printf("bind port success!\n"); } //設置發送超時時間3秒 int iSendTimeout = 3000; if (zmq_setsockopt(m_sokt5000, ZMQ_SNDTIMEO, &iSendTimeout, sizeof(iSendTimeout)) < 0) { zmq_close(m_sokt5000); zmq_ctx_destroy(m_ctx); return; } //2.創建中位機回復Socket m_sokt5001 = zmq_socket(m_ctx, ZMQ_PULL); zmq_connect(m_sokt5001, "tcp://10.168.205.73:5001"); //設置接收超時時間5秒 int iRcvTimeout = 5000; if (zmq_setsockopt(m_sokt5001, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0) { zmq_close(m_sokt5001); zmq_ctx_destroy(m_ctx); return; } //3.創建中位機警告信息Socket m_sokt5002 = zmq_socket(m_ctx, ZMQ_PULL); zmq_connect(m_sokt5002, "tcp://10.168.205.73:5002"); if (zmq_setsockopt(m_sokt5002, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0) { zmq_close(m_sokt5002); zmq_ctx_destroy(m_ctx); return; } }
這里是上面界面中"發送指令"按鈕的信號和槽,關於Qt的信號和槽也不贅述了,想要了解的自行去百度。
connect(m_pBtnSendMedCom, SIGNAL(clicked()), this, SLOT(slot_sendmedcomsg()));
槽函數的實現是獲取到姓名 年齡 學號然后通過封裝的接口下發(從PUSH端到PULL端),對於每一個QLineEdit都了相應容錯處理,不能為空,然后通過QJsonObject以json的形式下發數據,m_VirUpComZMQ就是封裝的一個ZMQ的類,在最后通過SendCmd5000()
接口下發數據。
void CVirUpComWgt::slot_sendmedcomsg() { QString GetNameStr = m_pLeditName->text(); if (GetNameStr == "") { //printf("姓名不能為空!\n"); QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("姓名不能為空")); return; } QString GetAgeStr = m_pLeditAge->text(); if (GetAgeStr == "") { //printf("姓名不能為空!\n"); QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("年齡不能為空")); return; } QString GetStuNoStr = m_pLeditStuno->text(); if (GetStuNoStr == "") { //printf("姓名不能為空!\n"); QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("學號不能為空")); return; } QJsonObject basemsg_json; basemsg_json.insert("name", GetNameStr.toLocal8Bit().data()); basemsg_json.insert("age", GetAgeStr.toInt()); basemsg_json.insert("stuno", GetStuNoStr.toInt()); QJsonDocument document; document.setObject(basemsg_json); QByteArray basemsg_array = document.toJson(QJsonDocument::Compact); QString BaseMsgJsonStr(basemsg_array); qDebug() << QString::fromLocal8Bit("簡單的QtJson數據:") << BaseMsgJsonStr; //發送指令 m_VirUpComZMQ->SendCmd5000(BaseMsgJsonStr.toLocal8Bit().data()); }
在模擬的這個上位機程序中還要對接收模塊做處理,當下發了指令以后會接收到上拋的數據,這些數據會通過5001端口和5002端口來接收,前面已經提到過,它們是PULL端。這里要創建兩個線程來專門接收上拋的數據。
void ZeroMQComm::CreateCommThread() { DWORD dwThreadID = 0; m_handle5001 = CreateThread(NULL, 0, ZeroMQComm::Recv5001Msg, this, 0, &dwThreadID); m_handle5002 = CreateThread(NULL, 0, ZeroMQComm::Recv5002Msg, this, 0, &dwThreadID); }
定時器的處理是為了定時刷新界面上接收到的數據,表示數據一直在發送中,這個就相當於實際項目中,電源模塊不停的給上位機上拋數據。
m_timer = new QTimer(this); connect(m_timer, SIGNAL(timeout()), this, SLOT(slot_updaterecordmsg())); m_timer->start(50);
到此,上位機的模擬程序就差不多完成了。一些核心的東西沒有貼出來,大概的思路已經給讀者梳理情況了,其實也沒那么難,自己動手去寫一下很快就能明白。
模擬下位機的程序是完全可以倒推出來的。
比如下位機肯定也需要做上下文初始化和socket的初始化。這是ZMQ更古不變的原則,請參考上面模擬上位機的操作。
其次,下位機這邊肯定也需要一個線程來接收上位機下發的指令
void CVirMedComZMQ::CreateRcvUpComDataThread() { DWORD dwThreadID = 0; m_handle5000 = CreateThread(NULL, 0, CVirMedComZMQ::Recv5000Msg, this, 0, &dwThreadID); }
接收到了上位機的數據以后直接上拋數據給上位機,這里做的相對簡單了一些,只是做了指針的判空處理,實際項目中肯定對於接收到了上位機的數據是要做一定處理的。當接收到的數據不為空的情況下,直接通過5001端口和5002端口發送數據到上位機,
上位機的接收端口也是5001(PULL端)和5002(PULL端)。
DWORD WINAPI CVirMedComZMQ::Recv5000Msg(LPVOID para) { CVirMedComZMQ* VirMedCom = (CVirMedComZMQ*)(para); if (!VirMedCom) { return 0; } while (true) { bool isAcceptMsg = VirMedCom->Is5000AcceptMsg(); void* sokt5000 = VirMedCom->Get5000Socket(); while (isAcceptMsg && sokt5000) { char* msg = s_recv(sokt5000); if (msg != NULL) { VirMedCom->RcvUpComData(msg); while (true) { //通過5001端口發送及格門數數據 srand(clock()); QString PssNumStr = QString::number(rand() % 100); QJsonObject passnum_json; passnum_json.insert("Pass", PssNumStr.toInt()); QJsonDocument pass_document; pass_document.setObject(passnum_json); QByteArray passnum_array = pass_document.toJson(QJsonDocument::Compact); QString PassNumJsonStr(passnum_array); qDebug() << QString::fromLocal8Bit("發送及格門數數據:") << PassNumJsonStr; VirMedCom->Send5001BaseData(PassNumJsonStr.toLocal8Bit().data()); //通過5002端口發送不及格門數數據 QString FailNumStr = QString::number(rand() % 100); QJsonObject failnum_json; failnum_json.insert("Fail", FailNumStr.toInt()); QJsonDocument fail_document; fail_document.setObject(failnum_json); QByteArray failnum_array = fail_document.toJson(QJsonDocument::Compact); QString FailNumJsonStr(failnum_array); qDebug() << QString::fromLocal8Bit("發送不及格門數數據:") << FailNumJsonStr; VirMedCom->Send5002WarnData(FailNumJsonStr.toLocal8Bit().data()); } } free(msg); } s_sleep(1); } }
到此,整個模擬的上下位機已經完成,程序中還有很多地方不完善,讀者有問題可以一起討論。