zeromq中兩個dealer 通過一個router進行通信


發現有童鞋不是很清楚ZMQ中的“請求-回復”模式中的ROUTER怎么用,所以簡單介紹一下“請求-回復”模式的使用(最后付代碼)。

一、講一講

1、要使用zmq 通過一個router進行通信,你首先需要知道ZMQ中的“請求-回復”模式,不清楚的話可以先看一下下面這篇文章,連接如下:

http://www.cnblogs.com/fengbohello/p/4354989.html

在“請求-回復”模式中,router是一個比較特殊的 socket類型,它會把它接收到的第一個消息作為消息來源的標志,也就是消息源的identity;而在使用ZMQ_ROUTER類型的socket發送消息的時候呢,會把這個socket發送的第一個消息作為目的地址,及消息的目的identity。示意圖如下

1、identity 為 aaa 的 socket 發送了一個消息,這個消息由兩部分組成,一個是目的 socket的identity,名字為bbb,另一個是真正的消息,就是"hello"。

2、當aaa 發送的消息被其連接的router接收到之后呢,就不僅僅是剛剛的消息了,ZMQ的底層會偷偷的增加一個消息,那就是 aaa 的identity,所以在 router 看來呢,它接收到的其實是三部分的消息,第一個是消息的來源,第二個是目的地址(bbb 的 identity),第三部分就是真正要傳達的信息。

3、當router接收到這么一個消息的時候,會發現,這個消息來源於aaa,並且是發向bbb的,所以router就會發送如下消息:首先發送一個 bbb,表示要發給的目的地址的identity是bbb,然后發送aaa,最后是信息hello。

4、identity 為 bbb的dealer 接收到消息之后,就只有aaa和hello了。router發送的時候不是首先發送了一個bbb嗎,去哪里了呢?這次被ZMQ偷偷的拿走了。這就是router的神奇之處,它會看到ZMQ_DEALER和ZMQ_REQ/ZMQ_REP不能看到的東西

現在再把“請求-回復”模式的規則說一下:就是,ZMQ_ROUTER能夠看到消息的來源,以及消息的去向,並且ZMQ_ROUTER會把接收到的第一個消息作為消息來源的identity,把發送的第一個消息作為消息目的地址的identity。

二、下面是代碼,代碼由五個文件組成,還有一個makefile。

  我相信在你使用ZMQ的時候已經安裝好了ZMQ的鏈接庫,如果確實還沒有安裝好的話,按照下面這篇文章安裝就可以了。http://www.cnblogs.com/fengbohello/p/4046686.html

  注:代碼在CentOS下編譯並運行通過,其它機器沒有測試。

在本頁復制或者到我的百度網盤進行下載:dlr2rtr2dlr.rar  http://pan.baidu.com/s/1pJIICpt

comm.h

//comm.h
#ifndef _ZMQCOMM_H_
#define _ZMQCOMM_H_
#include <zmq.h>

#define NAME_LEN    256
#define MSG_LEN        1024

typedef struct {
    char szSrc[NAME_LEN];
    char szDst[NAME_LEN];
    char szMsg[MSG_LEN];
}Zmqmsg;

typedef struct {
    void * sock;
    int iType;
}ZmqSock;

void lockSocket();
void unlockSocket();

int  s_recv(ZmqSock * sock, Zmqmsg * zMsg);
int  s_send(ZmqSock * sock, Zmqmsg * zMsg);

#endif
View Code

comm.c

//comm.c
#include <string.h>
#include "comm.h"

void lockSocket()
{
    // lock
}
void unlockSocket()
{
    // unlock
}

int  s_recv(ZmqSock * zmqsock, Zmqmsg * zMsg)
{
    if(NULL == zmqsock || NULL == zMsg)
    {
        return -1;
    }
    int iRet = -1;
    lockSocket();
    int iType = 0;
    int len = sizeof(iType);
    void * sock = zmqsock->sock;
    do{
        iType = zmqsock->iType;
        if(ZMQ_ROUTER == iType)
        {
            printf("router:\n");
            errno = 0;
            if(zmq_recv(sock, zMsg->szSrc, sizeof(zMsg->szSrc), 0) < 0)
            {
                printf("send msg faild : [%s]\n", zmq_strerror(errno));
                break;
            }
            printf("recv : [%s]\n", zMsg->szSrc);
            if(zmq_recv(sock, NULL, 0, 0) < 0)
            {
                printf("send msg faild : [%s]\n", zmq_strerror(errno));
                break;
            }
            printf("recv : []\n");
        }
        else if (ZMQ_DEALER == iType)
        {
            printf("dealer:\n");
            if(zmq_recv(sock, NULL, 0, 0) < 0)
            {
                printf("send msg faild : [%s]\n", zmq_strerror(errno));
                break;
            }
            printf("recv : []\n");
        }
        else if (ZMQ_REQ == iType)
        {
            printf("req:\n");
        }
        else if (ZMQ_REP == iType)
        {
            printf("rep:\n");
        }

        if(zmq_recv(sock, zMsg->szDst, sizeof(zMsg->szDst), 0) < 0)
        {
            printf("send msg faild : [%s]\n", zmq_strerror(errno));
            break;
        }
        printf("recv : [%s]\n", zMsg->szDst);
        if(zmq_recv(sock, NULL, 0, 0) < 0)
        {
            printf("send msg faild : [%s]\n", zmq_strerror(errno));
            break;
        }
        printf("recv : []\n");
        if(zmq_recv(sock, zMsg->szMsg, sizeof(zMsg->szMsg), 0) < 0)
        {
            printf("send msg faild : [%s]\n", zmq_strerror(errno));
            break;
        }
        printf("recv : [%s]\n", zMsg->szMsg);
        iRet = 0;
    }while(0);
    unlockSocket();

    return iRet;
}

int  s_send(ZmqSock * zmqsock, Zmqmsg * zMsg)
{
    if(NULL == zmqsock || NULL == zMsg)
    {
        return -1;
    }
    int iRet = -1;
    lockSocket();
    int iType = zmqsock->iType;
    int len = sizeof(iType);
    void * sock = zmqsock->sock;
    do{
        if(ZMQ_ROUTER == iType)
        {
            printf("router:\n");
            if(zmq_send(sock, zMsg->szDst, strlen(zMsg->szDst), ZMQ_SNDMORE) < 0)
            {
                printf("send msg faild : [%s]\n", zmq_strerror(errno));
                break;
            }
            printf("send : [%s]\n", zMsg->szDst);
            if(zmq_send(sock, "", 0, ZMQ_SNDMORE) < 0)
            {
                printf("send msg faild : [%s]\n", zmq_strerror(errno));
                break;
            }
            printf("send : []\n");
            if(zmq_send(sock, zMsg->szSrc, strlen(zMsg->szSrc), ZMQ_SNDMORE) < 0)
            {
                printf("send msg faild : [%s]\n", zmq_strerror(errno));
                break;
            }
            printf("send : [%s]\n", zMsg->szSrc);
        }
        else if (ZMQ_DEALER == iType)
        {
            printf("dealer:\n");
            if(zmq_send(sock, "", 0, ZMQ_SNDMORE) < 0)
            {
                printf("send msg faild : [%s]\n", zmq_strerror(errno));
                break;
            }
            printf("send : []\n");
            if(zmq_send(sock, zMsg->szDst, strlen(zMsg->szDst), ZMQ_SNDMORE) < 0)
            {
                printf("send msg faild : [%s]\n", zmq_strerror(errno));
                break;
            }
            printf("send : [%s]\n", zMsg->szDst);
        }
        else if (ZMQ_REQ == iType || ZMQ_REP == iType)
        {
            printf("rex:\n");
            if(zmq_send(sock, zMsg->szDst, strlen(zMsg->szDst), ZMQ_SNDMORE) < 0)
            {
                printf("send msg faild : [%s]\n", zmq_strerror(errno));
                break;
            }
            printf("send : [%s]\n", zMsg->szDst);
        }

        if(zmq_send(sock, "", 0, ZMQ_SNDMORE) < 0)
        {
            printf("send msg faild : [%s]\n", zmq_strerror(errno));
            break;
        }
        printf("send : []\n");
        if(zmq_send(sock, zMsg->szMsg, strlen(zMsg->szMsg), 0) < 0)
        {
            printf("send msg faild : [%s]\n", zmq_strerror(errno));
            break;
        }
        printf("send : [%s]\n", zMsg->szMsg);
        iRet = 0;
    }while(0);
    unlockSocket();

    return iRet;
}
View Code

recv.c

//recv.c
//包含zmq的頭文件 
#include <zmq.h>
#include <stdio.h>
#include <string.h>
#include "comm.h"

int main(int argc, char * argv[])
{
    void * pCtx = NULL;
    void * pSock = NULL;
    const char * pAddr = "ipc://drd.ipc";

    //創建context,zmq的socket 需要在context上進行創建 
    if((pCtx = zmq_ctx_new()) == NULL)
    {
        return 0;
    }
    //創建zmq socket ,socket目前有6中屬性 ,這里使用dealer方式
    //具體使用方式請參考zmq官方文檔(zmq手冊) 
    if((pSock = zmq_socket(pCtx, ZMQ_DEALER)) == NULL)
    {
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    int iRcvTimeout = 5000;// millsecond
    //設置zmq的接收超時時間為5秒 
    if(zmq_setsockopt(pSock, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    char * pName = "recv";
    if(zmq_setsockopt(pSock, ZMQ_IDENTITY, pName, strlen(pName)) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    //綁定地址 ipc://drd.ipc 
    //也就是使用ipc協議進行通信,地址為當前目錄下的drd.ipc
    if(zmq_connect(pSock, pAddr) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    printf("bind at : %s\n", pAddr);
    ZmqSock zmqsock;
    zmqsock.iType = ZMQ_DEALER;
    zmqsock.sock = pSock;
    while(1)
    {
        printf("waitting...\n");
        errno = 0;
        //循環等待接收到來的消息,當超過5秒沒有接到消息時,
        //recv函數返回錯誤信息 ,並使用zmq_strerror函數進行錯誤定位 
        Zmqmsg zmsg;
        memset(&zmsg, 0, sizeof(zmsg));
        if(s_recv(&zmqsock, &zmsg) < 0)
        {
            printf("error = %s\n", zmq_strerror(errno));
            continue;
        }
        printf("------------------------\n");
    }

    zmq_close(pSock);
    zmq_ctx_destroy(pCtx);
    return 0;
}
View Code

router.c

//router.c
//包含zmq的頭文件 
#include <zmq.h>
#include <stdio.h>
#include <string.h>
#include "comm.h"

int main(int argc, char * argv[])
{
    void * pCtx = NULL;
    void * pSock = NULL;
    const char * pAddr = "ipc://drd.ipc";

    //創建context,zmq的socket 需要在context上進行創建 
    if((pCtx = zmq_ctx_new()) == NULL)
    {
        return 0;
    }
    //創建zmq socket ,socket目前有6中屬性 ,這里使用dealer方式
    //具體使用方式請參考zmq官方文檔(zmq手冊) 
    if((pSock = zmq_socket(pCtx, ZMQ_ROUTER)) == NULL)
    {
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    int iRcvTimeout = -1;// millsecond
    //設置zmq的接收超時時間為5秒 
    if(zmq_setsockopt(pSock, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    char * pName = "router";
    if(zmq_setsockopt(pSock, ZMQ_IDENTITY, pName, strlen(pName)) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    //綁定地址 ipc://drd.ipc 
    //也就是使用ipc協議進行通信,地址為當前目錄下的drd.ipc
    if(zmq_bind(pSock, pAddr) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    printf("bind at : %s\n", pAddr);
    ZmqSock zmqsock;
    zmqsock.iType = ZMQ_ROUTER;
    zmqsock.sock = pSock;
    while(1)
    {
        printf("waitting...\n");
        errno = 0;
        //循環等待接收到來的消息,當超過5秒沒有接到消息時,
        //recv函數返回錯誤信息 ,並使用zmq_strerror函數進行錯誤定位 
        Zmqmsg zmsg;
        memset(&zmsg, 0, sizeof(zmsg));
        if(s_recv(&zmqsock, &zmsg) < 0)
        {
            printf("error = %s\n", zmq_strerror(errno));
            continue;
        }
        if(s_send(&zmqsock, &zmsg) < 0)
        {
            printf("error = %s\n", zmq_strerror(errno));
            continue;
        }
    }

    zmq_close(pSock);
    zmq_ctx_destroy(pCtx);
    return 0;
}
View Code

send.c

//send.c
//包含zmq的頭文件 
#include <zmq.h>
#include <stdio.h>
#include <string.h>
#include "comm.h"

int main(int argc, char * argv[])
{
    void * pCtx = NULL;
    void * pSock = NULL;
    //使用ipc協議進行通信,需要連接的目標機器IP地址為drd.ipc
    const char * pAddr = "ipc://drd.ipc";

    //創建context 
    if((pCtx = zmq_ctx_new()) == NULL)
    {
        return 0;
    }
    //創建socket 
    if((pSock = zmq_socket(pCtx, ZMQ_DEALER)) == NULL)
    {
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    int iSndTimeout = 5000;// millsecond
    //設置接收超時 
    if(zmq_setsockopt(pSock, ZMQ_RCVTIMEO, &iSndTimeout, sizeof(iSndTimeout)) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    char * pName = "send";
    if(zmq_setsockopt(pSock, ZMQ_IDENTITY, pName, strlen(pName)) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    if(zmq_connect(pSock, pAddr) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    printf("connect to [%s]\n", pAddr);
    ZmqSock zmqsock;
    zmqsock.iType = ZMQ_DEALER;
    zmqsock.sock = pSock;
    //循環發送消息 
    while(1)
    {
        static int i = 0;
        Zmqmsg zmsg;
        memset(&zmsg, 0, sizeof(zmsg));
        snprintf(zmsg.szDst, sizeof(zmsg.szDst), "recv");
        snprintf(zmsg.szMsg, sizeof(zmsg.szMsg), "hello world : %3d", i++);
        printf("Enter to send...\n");
        if(s_send(&zmqsock, &zmsg) < 0)
        {
            fprintf(stderr, "send message faild\n");
        }
        printf("------------------------\n");
        getchar();
    }

    zmq_close(pSock);
    zmq_ctx_destroy(pCtx);
    return 0;
}
View Code

Makefile

.PHONY : dummy clean

CFLAGS    = -Wall
LDFLAGS    = -lzmq -lpthread

CC        = gcc -g
CXX        = g++ -g
MAKEF    = make -f Makefile
CPA        = cp -a
MAKE    = $(CC)

subdir-list         = $(patsubst %,_subdir_%,$(SUB_DIRS))
subdir-clean-list     = $(patsubst %,_subdir_clean_%,$(SUB_DIRS))

%.o: %.c
    $(MAKE) -o $@ -c $< $(CFLAGS)

%.o: %.cpp
    $(MAKE) -o $@ -c $< $(CFLAGS)

%.os: %.c
    $(MAKE) -fPIC -c $< -o $@ $(CFLAGS) 

%.os: %.cpp
    $(MAKE) -fPIC -c $< -o $@ $(CFLAGS) 

ALL_FILES    = recv send router

all : $(ALL_FILES)

recv : recv.o comm.o
    $(MAKE) -o $@ $(LDFLAGS) $^

send : send.o comm.o
    $(MAKE) -o $@ $(LDFLAGS) $^

router : router.o comm.o
    $(MAKE) -o $@ $(LDFLAGS) $^

clean : 
    rm -rf *.o
    rm -rf $(ALL_FILES)
View Code

 

作者 :風波

mail : fengbohello@qq.com


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM