zeromq集群


ZeroMQ 是一個很有個性的項目,它原來是定位為“史上最快消息隊列”,所以名字里面有“MQ”兩個字母,但是后來逐漸演變發展,慢慢淡化了消息隊列的身影,改稱為消息內核,或者消息層了。從網絡通信的角度看,它處於會話層之上,應用層之下,有了它,你甚至不需要自己寫一行的socket函數調用就能完成復雜的網絡通信工作。

 

先看看這段局域網內的簡單通信:

receiver.cpp

//包含zmq的頭文件 
#include <zmq.h>
#include "stdio.h"

int main(int argc, char * argv[])
{
    void * pCtx = NULL;
    void * pSock = NULL;
    const char * pAddr = "tcp://*:7766";

    //創建context,zmq的socket 需要在context上進行創建 
    if((pCtx = zmq_ctx_new()) == NULL)
    {
        return 0;
    }
    //創建zmq socket ,socket目前有6中屬性 ,這里使用dealer方式
    //具體使用方式請參考zmq官方文檔(zmq手冊) 
    if((pSock = zmq_socket(pCtx, ZMQ_DEALER)) == NULL)
    {
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    int iRcvTimeout = 5000;// millsecond
    //設置zmq的接收超時時間為5秒 
    if(zmq_setsockopt(pSock, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    //綁定地址 tcp://*:7766 
    //也就是使用tcp協議進行通信,使用網絡端口 7766
    if(zmq_bind(pSock, pAddr) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    printf("bind at : %s\n", pAddr);
    while(1)
    {
        char szMsg[1024] = {0};
        printf("waitting...\n");
        errno = 0;
        //循環等待接收到來的消息,當超過5秒沒有接到消息時,
        //zmq_recv函數返回錯誤信息 ,並使用zmq_strerror函數進行錯誤定位 
        if(zmq_recv(pSock, szMsg, sizeof(szMsg), 0) < 0)
        {
            printf("error = %s\n", zmq_strerror(errno));
            continue;
        }
        printf("received message : %s\n", szMsg);
    }

    return 0;
}

 sender.cpp

//包含zmq的頭文件 
#include <zmq.h>
#include "stdio.h"

int main(int argc, char * argv[])
{
    void * pCtx = NULL;
    void * pSock = NULL;
    //使用tcp協議進行通信,需要連接的目標機器IP地址為192.168.1.2
    //通信使用的網絡端口 為7766 
    const char * pAddr = "tcp://192.168.1.2:7766";

    //創建context 
    if((pCtx = zmq_ctx_new()) == NULL)
    {
        return 0;
    }
    //創建socket 
    if((pSock = zmq_socket(pCtx, ZMQ_DEALER)) == NULL)
    {
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    int iSndTimeout = 5000;// millsecond
    //設置接收超時 
    if(zmq_setsockopt(pSock, ZMQ_RCVTIMEO, &iSndTimeout, sizeof(iSndTimeout)) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    //連接目標IP192.168.1.2,端口7766 
    if(zmq_connect(pSock, pAddr) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    //循環發送消息 
    while(1)
    {
        static int i = 0;
        char szMsg[1024] = {0};
        snprintf(szMsg, sizeof(szMsg), "hello world : %3d", i++);
        printf("Enter to send...\n");
        if(zmq_send(pSock, szMsg, sizeof(szMsg), 0) < 0)
        {
            fprintf(stderr, "send message faild\n");
            continue;
        }
        printf("send message : [%s] succeed\n", szMsg);
        getchar();
    }

    return 0;
}

 

從這個實例可以看出,因為zmq支持的是對等網絡,所以zmq並沒有accept和listen。

一般步驟是

zmq_ctx_new()->zmq_socket()->zmq_setsockopt()->zmq_bind()->zmq_recv()

或者

zmq_ctx_new()->zmq_socket()->zmq_setsockopt()->zmq_connect()->zmq_send()

這里有個點要注要下:

ZeroMQ是無代理的,其有高速數據傳輸能力,對IPC和socket有良好的封裝特性。而且bind程序和 connect程序無論誰先啟動,其實都不影響整個系統的正常運行。

 

zeromq模式

zeromq 有多種模式,常用的有三種:請求應答模式、訂閱發布模式、push pull模式 。

1. 請求應答模式(req 和 rep)

消息雙向的,有來有往,req端請求的消息,rep端必須答復給req端

2. 訂閱發布模式 (sub 和 pub)

消息單向的,有去無回的。可按照發布端可發布制定主題的消息,訂閱端可訂閱喜歡的主題,訂閱端只會收到自己已經訂閱的主題。發布端發布一條消息,可被多個訂閱端同時收到。

3. push pull模式

消息單向的,也是有去無回的。push的任何一個消息,始終只會有一個pull端收到消息.

后續的代理模式和路由模式等都是在三種基本模式上面的擴展或變異。

阻塞 和 非阻塞

以上三種基本模式都支持阻塞模式和非阻塞模式。req 和 rep的阻塞模式是這樣的(其實跟原生的socket實現也非常像)。大家用過socket的,客戶端要是先啟動的話,會連接失敗,或者是短時間內有超時問題。

 

參考:

zeromq實例

zeromq實現服務器間高性能通信

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM