利用ZeroMQ消息隊列實現多端口並發數據處理


這里至於ZeroMQ是什么就不過多贅述了,這是基礎的知識,不懂的讀者可以先去百度一下,這里也提供一個老哥的詳細解釋,貼上鏈接大家可以去看看https://www.cnblogs.com/chenny7/p/6245236.html。

先說一下為什么要這樣處理,本人是從事新能源電池檢測相關工作的,屬於工控的領域,有大量的指令下發(通過上位機的控制),指令下發以后會接收到下位機(電源模塊)上傳的數據,是一對多的模式(一個上位機,面對着多個電源模塊)。這里就有一個並發的問題,同時要接收多個電源模塊上傳的數據,而且是1秒鍾一條數據,這個並發量相對來說比較大,為了性能考慮,防止擁塞這里用到了ZeroMQ消息隊列,ZMQ對這一塊的處理相對合理。

這里只是一個模擬的程序,相當於一個demo,實際開發工程中要比這復制的多,因為上拋的數據還分很多種,比如記錄數據,告警數據,針床的數據等等。

 

 

用QT做了一個簡單的界面,這里的姓名  年齡  學號模擬的是上位機控制指令,當然實際的情況是上位機下發有相對應的協議,這個協議必須是上位機和下位機定義好。

 

首先是定義初始化上下文,這是ZeroMQ中必須要做的工作。

void ZeroMQComm::InitCtx()
{
    //1.建上下文
    m_ctx = zmq_init(1);
    if (!m_ctx)
    {
        printf("build zmq_init():%s\n", zmq_strerror(errno));
        return;
    }
    else
    {
        printf("創建上下文成功!\n");
    }
}

 

然后初始化socket,其實可以把ZMQ看成是對socket的進一步封裝,這里用到了3個端口,5000,5001,5002。為什么要用這三個端口呢?每一個端口代表了不同的信息,比如5000端口是作為的PUSH端,相當於一個分發任務的角色,這個demo中

當點擊發送指令的時候就會利用這個端口來給PULL端發送數據。5001端口(作為PULL端)是接收及格數據,5002端口(作為PULL端)是接收不及格數據,這兩個端口模式的是在實際的項目中的記錄數據和告警數據,記錄數據是通過5001端口上拋,告

警數據是由5002端口上拋。下面代碼中綁定的IP可以忽略,我是按照我測試的環境中來配置IP,讀者要進行的時候需要根據自己的實際情況來!

 

void ZeroMQComm::InitSockets()
{
    //1.創建發布Socket通訊對象
    //該對象用於下中位機發送控制指令
    if((m_sokt5000 = zmq_socket(m_ctx, ZMQ_PUSH)) == NULL)
    {
        printf("%s\n", zmq_strerror(errno));
        zmq_close(m_sokt5000);
        zmq_ctx_term(m_ctx);
        return;
     }
    else
    {
        printf("創建發送端口socket成功!\n");
    }


    int tmp_bet = zmq_bind(m_sokt5000, "tcp://10.168.205.73:5000");
    //printf("zmq_bind=%d\n", tmp_bet);
    if (tmp_bet < 0)
    {
        printf("bind port %s\n", zmq_strerror(errno));
        return;
    }
    else
    {
        printf("bind port success!\n");
    }
    

    //設置發送超時時間3秒
    int iSendTimeout = 3000;
    if (zmq_setsockopt(m_sokt5000, ZMQ_SNDTIMEO, &iSendTimeout, sizeof(iSendTimeout)) < 0)
    {
        zmq_close(m_sokt5000);
        zmq_ctx_destroy(m_ctx);
        return;
    }

    //2.創建中位機回復Socket
    m_sokt5001 = zmq_socket(m_ctx, ZMQ_PULL);
    zmq_connect(m_sokt5001, "tcp://10.168.205.73:5001");

    //設置接收超時時間5秒
    int iRcvTimeout = 5000;
    if (zmq_setsockopt(m_sokt5001, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
    {
        zmq_close(m_sokt5001);
        zmq_ctx_destroy(m_ctx);
        return;
    }


    //3.創建中位機警告信息Socket
    m_sokt5002 = zmq_socket(m_ctx, ZMQ_PULL);
    zmq_connect(m_sokt5002, "tcp://10.168.205.73:5002");

    if (zmq_setsockopt(m_sokt5002, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
    {
        zmq_close(m_sokt5002);
        zmq_ctx_destroy(m_ctx);
        return;
    }



}

 

這里是上面界面中"發送指令"按鈕的信號和槽,關於Qt的信號和槽也不贅述了,想要了解的自行去百度。

    connect(m_pBtnSendMedCom, SIGNAL(clicked()), this, SLOT(slot_sendmedcomsg()));

槽函數的實現是獲取到姓名 年齡 學號然后通過封裝的接口下發(從PUSH端到PULL端),對於每一個QLineEdit都了相應容錯處理,不能為空,然后通過QJsonObject以json的形式下發數據,m_VirUpComZMQ就是封裝的一個ZMQ的類,在最后通過SendCmd5000()

接口下發數據。

void CVirUpComWgt::slot_sendmedcomsg()
{
    QString GetNameStr = m_pLeditName->text();
    if (GetNameStr == "")
    {
        //printf("姓名不能為空!\n");
        QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("姓名不能為空"));
        return;
    }

    QString GetAgeStr = m_pLeditAge->text();

    if (GetAgeStr == "")
    {
        //printf("姓名不能為空!\n");
        QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("年齡不能為空"));
        return;
    }

    QString GetStuNoStr = m_pLeditStuno->text();

    if (GetStuNoStr == "")
    {
        //printf("姓名不能為空!\n");
        QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("學號不能為空"));
        return;
    }

    QJsonObject basemsg_json;
    basemsg_json.insert("name", GetNameStr.toLocal8Bit().data());
    basemsg_json.insert("age", GetAgeStr.toInt());
    basemsg_json.insert("stuno", GetStuNoStr.toInt());

    QJsonDocument document;
    document.setObject(basemsg_json);
    QByteArray basemsg_array = document.toJson(QJsonDocument::Compact);

    QString BaseMsgJsonStr(basemsg_array);
    qDebug() << QString::fromLocal8Bit("簡單的QtJson數據:") << BaseMsgJsonStr;

    //發送指令
    m_VirUpComZMQ->SendCmd5000(BaseMsgJsonStr.toLocal8Bit().data());

    
}

在模擬的這個上位機程序中還要對接收模塊做處理,當下發了指令以后會接收到上拋的數據,這些數據會通過5001端口和5002端口來接收,前面已經提到過,它們是PULL端。這里要創建兩個線程來專門接收上拋的數據。

void ZeroMQComm::CreateCommThread()
{
    DWORD dwThreadID = 0;
    m_handle5001 = CreateThread(NULL, 0, ZeroMQComm::Recv5001Msg, this, 0, &dwThreadID);
    m_handle5002 = CreateThread(NULL, 0, ZeroMQComm::Recv5002Msg, this, 0, &dwThreadID);
}

定時器的處理是為了定時刷新界面上接收到的數據,表示數據一直在發送中,這個就相當於實際項目中,電源模塊不停的給上位機上拋數據。

    m_timer = new QTimer(this);
    connect(m_timer, SIGNAL(timeout()), this, SLOT(slot_updaterecordmsg()));
    m_timer->start(50);

到此,上位機的模擬程序就差不多完成了。一些核心的東西沒有貼出來,大概的思路已經給讀者梳理情況了,其實也沒那么難,自己動手去寫一下很快就能明白。

 

模擬下位機的程序是完全可以倒推出來的。

比如下位機肯定也需要做上下文初始化和socket的初始化。這是ZMQ更古不變的原則,請參考上面模擬上位機的操作。

其次,下位機這邊肯定也需要一個線程來接收上位機下發的指令

void CVirMedComZMQ::CreateRcvUpComDataThread()
{
    DWORD dwThreadID = 0;
    m_handle5000 = CreateThread(NULL, 0, CVirMedComZMQ::Recv5000Msg, this, 0, &dwThreadID);
}

接收到了上位機的數據以后直接上拋數據給上位機,這里做的相對簡單了一些,只是做了指針的判空處理,實際項目中肯定對於接收到了上位機的數據是要做一定處理的。當接收到的數據不為空的情況下,直接通過5001端口和5002端口發送數據到上位機,

上位機的接收端口也是5001(PULL端)和5002(PULL端)。

DWORD WINAPI CVirMedComZMQ::Recv5000Msg(LPVOID para)
{
    CVirMedComZMQ* VirMedCom = (CVirMedComZMQ*)(para);
    if (!VirMedCom)
    {
        return 0;
    }

    while (true)
    {
        bool isAcceptMsg = VirMedCom->Is5000AcceptMsg();
        void* sokt5000 = VirMedCom->Get5000Socket();
        while (isAcceptMsg && sokt5000)
        {
            char* msg = s_recv(sokt5000);
            if (msg != NULL)
            {
                VirMedCom->RcvUpComData(msg);
                while (true)
                {
                    //通過5001端口發送及格門數數據
                    srand(clock());
                    QString PssNumStr = QString::number(rand() % 100);
                    QJsonObject passnum_json;
                    passnum_json.insert("Pass", PssNumStr.toInt());
                    QJsonDocument pass_document;
                    pass_document.setObject(passnum_json);
                    QByteArray passnum_array = pass_document.toJson(QJsonDocument::Compact);
                    QString PassNumJsonStr(passnum_array);
                    qDebug() << QString::fromLocal8Bit("發送及格門數數據:") << PassNumJsonStr;
                    VirMedCom->Send5001BaseData(PassNumJsonStr.toLocal8Bit().data());

                    //通過5002端口發送不及格門數數據
                    QString FailNumStr = QString::number(rand() % 100);
                    QJsonObject failnum_json;
                    failnum_json.insert("Fail", FailNumStr.toInt());
                    QJsonDocument fail_document;
                    fail_document.setObject(failnum_json);
                    QByteArray failnum_array = fail_document.toJson(QJsonDocument::Compact);
                    QString FailNumJsonStr(failnum_array);
                    qDebug() << QString::fromLocal8Bit("發送不及格門數數據:") << FailNumJsonStr;
                    VirMedCom->Send5002WarnData(FailNumJsonStr.toLocal8Bit().data());
                }
            }
            free(msg);
        }
        s_sleep(1);
    }
}

到此,整個模擬的上下位機已經完成,程序中還有很多地方不完善,讀者有問題可以一起討論。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM