ZeroMQ:19---模式之(請求-響應模式:ZMQ_REP、ZMQ_REQ、ZMQ_DEALER、ZMQ_ROUTER)


轉自:https://blog.csdn.net/qq_41453285/article/details/106878960

一、ØMQ模式總覽

    ØMQ支持多種模式,具體可以參閱:https://blog.csdn.net/qq_41453285/article/details/106865539
    本文介紹ØMQ的“請求-響應”模式

二、請求-響應模式

    請求-響應模式由http://rfc.zeromq.org/spec:28正式定義
    請求-應答模式應該是最常見的交互模式,如果連接之后,服務器終止,那么客戶端也終止,從崩潰的過程中恢復不太容易
    因此,做一個可靠的請求-應答模式很復雜,在很后面我們會有一部分系列文章介紹“可靠的請求-應答模式”
    “請求-響應模型”支持的套接字類型有4種:
        ZMQ_REP
        ZMQ_REQ
        ZMQ_DEALER
        ZMQ_ROUTER

三、“REQ-REP”套接字類型

    請求-響應模式用於將請求從ZMQ_REQ客戶端發送到一個或多個ZMQ_REP服務,並接收對每個發送的請求的后續答復
    REQ-REP套接字對是步調一致的。它們兩者的次序必須有規則,不能同時發送或接收,否則無效果

    ZMQ_REQ

        客戶端使用ZMQ_REQ類型的套接字向服務發送請求並從服務接收答復
        此套接字類型僅允許zmq_send(request)和后續zmq_recv(reply)調用交替序列。發送的每個請求都在所有服務中輪流輪詢,並且收到的每個答復都與最后發出的請求匹配
        如果沒有可用的服務,則套接字上的任何發送操作都應阻塞,直到至少有一項服務可用為止。REQ套接字不會丟棄消息

                                                                                 ZMQ_REQ特性摘要
    兼容的對等套接字    ZMQ_REP、ZMQ_ROUTER
    方向    雙向
    發送/接收模式    發送、接收、發送、接收......
    入網路由策略    最后一位(Last peer)

    外發路由策略
        輪詢
    靜音狀態下的操作    阻塞
    ZMQ_REP

        服務使用ZMQ_REP類型的套接字來接收來自客戶端的請求並向客戶端發送回復
        此套接字類型僅允許zmq_recv(request)和后續zmq_send(reply)調用的交替序列。接收到的每個請求都從所有客戶端中公平排隊,並且發送的每個回復都路由到發出最后一個請求的客戶端
        如果原始請求者不再存在,則答復將被靜默丟棄

                                                                               ZMQ_REP特性摘要
    兼容的對等套接字    ZMQ_REQ、ZMQ_DEALER
    方向    雙向
    發送/接收模式    接收、發送、接收、發送......
    入網路由策略    公平排隊

    外發路由策略
        最后一位(Last peer)

    演示案例

        本演示案例如下:
            服務端創建REP套接字,阻塞等待客戶端消息的到達,當客戶端有消息達到時給客戶端回送“World”字符串
            客戶端創建REP套接字,向服務端發送字符串“Hello”,然后等待服務端回送消息

        服務端代碼如下:

        // https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/hwserver.c
        // hwserver.c
        #include <stdio.h>
        #include <string.h>
        #include <assert.h>
        #include <stdlib.h>
        #include <unistd.h>
        #include <zmq.h>
         
        // 向socket發送數據, 數據為string
        static int s_send(void *socket, char *string);
        // 從socket接收數據, 並將數據以字符串的形式返回
        static char *s_recv(void *socket);
         
        int main()
        {
            // 1.創建上下文
            void *context = zmq_ctx_new();
         
            // 2.創建、綁定套接字
            void *responder = zmq_socket(context, ZMQ_REP);
            zmq_bind(responder, "tcp://*:5555");
         
            int rc;
            // 3.循環接收數據、發送數據
            while(1)
            {
                // 4.接收數據
                char *request = s_recv(responder);
                assert(request != NULL);
                printf("Request: %s\n", request);
                free(request);
         
                // 休眠1秒再繼續回復
                sleep(1);
         
                // 5.回送數據
                char *reply = "World";
                rc = s_send(responder, reply);
                assert(rc > 0);
            }
         
            // 6.關閉套接字、銷毀上下文
            zmq_close(responder);
            zmq_ctx_destroy(context);
         
            return 0;
        }
         
        static int s_send(void *socket, char *string)
        {
            int rc;
            
            zmq_msg_t msg;
            zmq_msg_init_size(&msg, 5);
            memcpy(zmq_msg_data(&msg), string, strlen(string));
            
            rc = zmq_msg_send(&msg, socket, 0);
            zmq_msg_close(&msg);
         
            return rc;
        }
         
        static char *s_recv(void *socket)
        {
            int rc;
            zmq_msg_t msg;
            zmq_msg_init(&msg);
            
            rc = zmq_msg_recv(&msg, socket, 0);
            if(rc == -1)
                return NULL;
            
            char *string = (char*)malloc(rc + 1);
            memcpy(string, zmq_msg_data(&msg), rc);
            zmq_msg_close(&msg);
            
            string[rc] = 0;
            return string;
        }

        還有一個使用C++ API編寫的服務端,可參閱:https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/hwserver.cpp
        客戶端代碼如下:

        // https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/hwclient.c
        // hwclient.c
        #include <stdio.h>
        #include <stdlib.h>
        #include <assert.h>
        #include <string.h>
        #include <zmq.h>
         
        // 向socket發送數據, 數據為string
        static int s_send(void *socket, char *string);
        // 從socket接收數據, 並將數據以字符串的形式返回
        static char *s_recv(void *socket);
         
        int main()
        {
            // 1.創建上下文
            void *context = zmq_ctx_new();
         
            // 2.創建套接字、連接服務器
            void *requester = zmq_socket(context, ZMQ_REQ);
            zmq_connect(requester, "tcp://localhost:5555");
         
            int rc;
            // 3.循環發送數據、接收數據
            while(1)
            {
                // 4.發送數據
                char *request = "Hello";
                rc = s_send(requester, request);
                assert(rc > 0);
         
                // 5.接收回復數據
                char *reply = s_recv(requester);
                assert(reply != NULL);
                printf("Reply: %s\n", reply);
                free(reply);
            }
         
            // 6.關閉套接字、銷毀上下文
            zmq_close(requester);
            zmq_ctx_destroy(context);
         
            return 0;
        }
         
        static int s_send(void *socket, char *string)
        {
            int rc;
            
            zmq_msg_t msg;
            zmq_msg_init_size(&msg, 5);
            memcpy(zmq_msg_data(&msg), string, strlen(string));
            
            rc = zmq_msg_send(&msg, socket, 0);
            zmq_msg_close(&msg);
         
            return rc;
        }
         
        static char *s_recv(void *socket)
        {
            int rc;
            zmq_msg_t msg;
            zmq_msg_init(&msg);
            
            rc = zmq_msg_recv(&msg, socket, 0);
            if(rc == -1)
                return NULL;
            
            char *string = (char*)malloc(rc + 1);
            memcpy(string, zmq_msg_data(&msg), rc);
            zmq_msg_close(&msg);
            
            string[rc] = 0;
            return string;
        }

        編譯並運行如下,左側為服務端,右側為客戶端:

        gcc -o hwserver hwserver.c -lzmq
        gcc -o hwclient hwclient.c -lzmq

四、“DEALER-ROUTER”套接字類型

    本文介紹“DEALER-ROUTER”的語法和代理演示案例,在后面的一個專題中我們將介紹如何使用“DEALER-ROUTER”來構建各種異步請求-應答流

    ZMQ_DEALER

        ZMQ_DEALER類型的套接字是用於擴展“請求/應答”套接字的高級模式
        發送消息時:當ZMQ_DEALER套接字由於已達到所有對等點的最高水位而進入靜音狀態時,或者如果根本沒有任何對等點,則套接字上的任何zmq_send()操作都應阻塞,直到靜音狀態結束或至少一個對等方變得可以發送;消息不會被丟棄
        接收消息時:發送的每條消息都是在所有連接的對等方之間進行輪詢,並且收到的每條消息都是從所有連接的對等方進行公平排隊的
        將ZMQ_DEALER套接字連接到ZMQ_REP套接字時,發送的每個消息都必須包含一個空的消息部分,定界符以及一個或多個主體部分

                                                                                 ZMQ_DEALER特性摘要
    兼容的對等套接字    ZMQ_ROUTER、ZMQ_REP、ZMQ_DEALER
    方向    雙向
    發送/接收模式    無限制
    入網路由策略    公平排隊

    外發路由策略
        輪詢
    靜音狀態下的操作    阻塞
    ZMQ_ROUTER

        ZMQ_ROUTER類型的套接字是用於擴展請求/答復套接字的高級套接字類型
        當收到消息時:ZMQ_ROUTER套接字在將消息傳遞給應用程序之前,應在消息部分之前包含消息的始發對等方的路由ID。接收到的消息從所有連接的同級之間公平排隊
        發送消息時:
            ZMQ_ROUTER套接字應刪除消息的第一部分,並使用它來確定消息應路由到的對等方的_routing id _。如果該對等點不再存在或從未存在,則該消息將被靜默丟棄
            但是,如果ZMQ_ROUTER_MANDATORY套接字選項設置為1,這兩種情況下套接字都將失敗並顯示EHOSTUNREACH
        高水位標記影響:
            當ZMQ_ROUTER套接字由於達到所有同位體的高水位線而進入靜音狀態時,發送到該套接字的任何消息都將被丟棄,直到靜音狀態結束為止。同樣,任何路由到已達到單個高水位標記的對等方的消息也將被丟棄
            如果ZMQ_ROUTER_MANDATORY套接字選項設置為1,則在兩種情況下套接字都應阻塞或返回EAGAIN
        ZMQ_ROUTER_MANDATORY套接字選項:
            當ZMQ_ROUTER套接字的ZMQ_ROUTER_MANDATORY標志設置為1時,套接字應在接收到來自一個或多個對等方的消息后生成ZMQ_POLLIN事件
            同樣,當至少一個消息可以發送給一個或多個對等方時,套接字將生成ZMQ_POLLOUT事件
        當ZMQ_REQ套接字連接到ZMQ_ROUTER套接字時,除了始發對等方的路由ID外,每個收到的消息都應包含一個空的定界符消息部分。因此,由應用程序看到的每個接收到的消息的整個結構變為:一個或多個路由ID部分,定界符部分,一個或多個主體部分。將回復發送到ZMQ_REQ套接字時,應用程序必須包括定界符部分

                                                                                 ZMQ_ROUTER特性摘要
    兼容的對等套接字    ZMQ_DEALER、ZMQ_REQ、ZMQ_ROUTER
    方向    雙向
    發送/接收模式    無限制
    入網路由策略    公平排隊

    外發路由策略
        看上面介紹
    靜音狀態下的操作    丟棄(見上面介紹)

    共享隊列/代理

        在“REP-REQ”的演示案例中,我們只有一個客戶端和一個服務端進行交流。但是實際中,我們通常允許多個客戶端與多個服務端之間相互交流
        將多個客戶端連接到多個服務器的方法有兩種:
            方法①:將每個客戶端都連接到多個服務端點
            方法②:使用代理
        方法①:
            原理:一種是將每個客戶端套接字連接到多個服務端點。REQ套接字隨后會將請求發送到服務端上。比如說一個客戶端連接了三個服務端點:A、B、C,之后發送請求R1、R4到服務A上,發送請求R2到服務B上、發送請求R3到服務C上(如下圖所示)
            對於這種設計來說,服務器屬於靜態部分,客戶端屬於動態部分,客戶端的增減無所謂,但是服務器的增減確實致命的。假設現在有100個客戶端都連接了服務器,如果此時新增了三台服務器,為了讓客戶端識別這新增的三台服務器,那么就需要將所有的客戶端都停止重新配置然后再重新啟動

        方法②:
            我們可以編寫一個小型消息排隊代理,使我們具備靈活性
            原理:該代理綁定到了兩個端點,一個用於客戶端的前端(ZMQ_ROUTER),另一個用於服務的后端(ZMQ_DEALER)。然后帶來使用zmq_poll()來輪詢這兩個套接字的活動,當有消息時,代理會將消息在兩個套接字之間頻繁地輸送
            該代理其實並不顯式管理任何隊列,其只負責消息的傳送,ØMQ會自動將消息在每個套接字上進行排隊
            使用zmq_poll()配合DEALER-ROUTER:
                在上面我們使用REQ-REP套接字時,會有一個嚴格同步的請求-響應對話,就是必須客戶端先發送一個請求,然后服務端讀取請求並發送應答,最后客戶端讀取應答,如此反復。如果客戶端或服務端嘗試破壞這種約束(例如,連續發送兩個請求,而沒有等待響應),那么將返回一個錯誤
                我們的代理必須是非阻塞的,可以使用zmq_poll()來輪詢任何一個套接字上的活動,但我們不能使用REQ-REQ。幸運地是,有兩個稱為DEALER和ROUTER的套接字,它們能我們可以執行無阻塞的請求-響應

    代理演示案例

        我們擴展了上面的“REQ-REP”演示案例:
            REQ和ROUTER交流,DEALER與REP交流。代理節點從一個套接字讀取消息,並將消息轉發到其他套接字

        客戶端的代碼如下:將REQ套接字連接到代理的ROUTER節點上,向ROUTER節點發送“Hello”,接收到“World”的回復

        // rrclient.c
        // https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/rrclient.c
        #include <stdio.h>
        #include <stdlib.h>
        #include <string.h>
        #include <zmq.h>
         
        // 向socket發送數據, 數據為string
        static int s_send(void *socket, char *string);
        // 從socket接收數據, 並將數據以字符串的形式返回
        static char *s_recv(void *socket);
         
        int main()
        {
            int rc;
            // 1.初始化上下文
            void *context = zmq_ctx_new();
         
            // 2.創建套接字、連接代理的ROUTER端
            void *requester = zmq_socket(context, ZMQ_REQ);
            rc = zmq_connect(requester, "tcp://localhost:5559");
            if(rc == -1)
            {
                perror("zmq_connect");
                zmq_close(requester);
                zmq_ctx_destroy(context);
                return -1;
            }
         
            // 3.循環發送、接收數據(10次)
            int request_nbr;
            for(request_nbr = 0; request_nbr < 10; request_nbr++)
            {
                // 4.先發送數據
                rc = s_send(requester, "Hello");
                if(rc < 0)
                {
                    perror("s_send");
                    zmq_close(requester);
                    zmq_ctx_destroy(context);
                    return -1;
                }
                
                // 5.等待響應
                char *reply = s_recv(requester);
                if(reply == NULL)
                {
                    perror("s_recv");
                    free(reply);
                    zmq_close(requester);
                    zmq_ctx_destroy(context);
                    return -1;
                }
                printf("Reply[%d]: %s\n", request_nbr + 1, reply);
                free(reply);
            }
         
            // 6.關閉套接字、銷毀上下文
            zmq_close(requester);
            zmq_ctx_destroy(context);
         
            return 0;
        }
         
        static int s_send(void *socket, char *string)
        {
            int rc;
            
            zmq_msg_t msg;
            zmq_msg_init_size(&msg, 5);
            memcpy(zmq_msg_data(&msg), string, strlen(string));
            
            rc = zmq_msg_send(&msg, socket, 0);
            zmq_msg_close(&msg);
         
            return rc;
        }
         
        static char *s_recv(void *socket)
        {
            int rc;
            zmq_msg_t msg;
            zmq_msg_init(&msg);
            
            rc = zmq_msg_recv(&msg, socket, 0);
            if(rc == -1)
                return NULL;
            
            char *string = (char*)malloc(rc + 1);
            memcpy(string, zmq_msg_data(&msg), rc);
            zmq_msg_close(&msg);
            
            string[rc] = 0;
            return string;
        }

        服務端的代碼如下:將REP套接字連接到代理的DEALER節點上

        // rrworker.c
        // https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/rrworker.c
        #include <stdio.h>
        #include <stdlib.h>
        #include <string.h>
        #include <unistd.h>
        #include <zmq.h>
         
        // 向socket發送數據, 數據為string
        static int s_send(void *socket, char *string);
        // 從socket接收數據, 並將數據以字符串的形式返回
        static char *s_recv(void *socket);
         
        int main()
        {
            int rc;
            // 1.初始化上下文
            void *context = zmq_ctx_new();
         
            // 2.創建套接字、連接代理的DEALER端
            void *responder = zmq_socket(context, ZMQ_REP);
            rc = zmq_connect(responder, "tcp://localhost:5560");
            if(rc == -1)
            {
                perror("zmq_connect");
                zmq_close(responder);
                zmq_ctx_destroy(context);
                return -1;
            }
         
            // 3.循環接收、響應
            while(1)
            {
                // 4.先等待接收數據
                char *request = s_recv(responder);
                if(request == NULL)
                {
                    perror("s_recv");
                    free(request);
                    zmq_close(responder);
                    zmq_ctx_destroy(context);
                    return -1;
                }
                printf("Request: %s\n", request);
                free(request);
         
                // 休眠1秒再進行響應
                sleep(1);
                
                // 5.響應
                rc = s_send(responder, "World");
                if(rc < 0)
                {
                    perror("s_send");
                    zmq_close(responder);
                    zmq_ctx_destroy(context);
                    return -1;
                }
            }
         
            // 6.關閉套接字、銷毀上下文
            zmq_close(responder);
            zmq_ctx_destroy(context);
         
            return 0;
        }
         
        static int s_send(void *socket, char *string)
        {
            int rc;
            
            zmq_msg_t msg;
            zmq_msg_init_size(&msg, 5);
            memcpy(zmq_msg_data(&msg), string, strlen(string));
            
            rc = zmq_msg_send(&msg, socket, 0);
            zmq_msg_close(&msg);
         
            return rc;
        }
         
        static char *s_recv(void *socket)
        {
            int rc;
            zmq_msg_t msg;
            zmq_msg_init(&msg);
            
            rc = zmq_msg_recv(&msg, socket, 0);
            if(rc == -1)
                return NULL;
            
            char *string = (char*)malloc(rc + 1);
            memcpy(string, zmq_msg_data(&msg), rc);
            zmq_msg_close(&msg);
            
            string[rc] = 0;
            return string;
        }

        代理端的代碼如下:
            創建一個ROUTER套接字與客戶端相連接,創建一個DEALER套接字與服務端相連接
            ROUTER套接字從客戶端接收請求數據,並把請求數據發送給服務端
            DEALER套接字從服務端接收響應數據,並把響應數據發送給客戶端

        // rrbroker.c
        // https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/rrbroker.c
        #include <stdio.h>
        #include <stdio.h>
        #include <stdlib.h>
        #include <string.h>
        #include <zmq.h>
         
        int main()
        {
            int rc;
            // 1.初始化上下文
            void *context = zmq_ctx_new();
         
            // 2.創建、綁定套接字
            void *frontend = zmq_socket(context, ZMQ_ROUTER);
            void *backend = zmq_socket(context, ZMQ_DEALER);
            // ZMQ_ROUTER綁定到5559, 接收客戶端的請求
            rc = zmq_bind(frontend, "tcp://*:5559");
            if(rc == -1)
            {
                perror("zmq_bind");
                zmq_close(frontend);
                zmq_close(backend);
                zmq_ctx_destroy(context);
                return -1;
            }
            // ZMQ_DEALER綁定到5560, 接收服務端的回復
            rc = zmq_bind(backend, "tcp://*:5560");
            if(rc == -1)
            {
                perror("zmq_bind");
                zmq_close(frontend);
                zmq_close(backend);
                zmq_ctx_destroy(context);
                return -1;
            }
            // 3.初始化輪詢集合
            zmq_pollitem_t items[] = {
                { frontend, 0, ZMQ_POLLIN, 0 },
                { backend, 0, ZMQ_POLLIN, 0 }
            };
         
            // 4.在套接字上切換消息
            while(1)
            {
                zmq_msg_t msg;
                //多部分消息檢測
                int more;     
         
                // 5.調用zmq_poll輪詢消息
                rc = zmq_poll(items, 2, -1);
                //zmq_poll出錯
                if(rc == -1)     
                {
                     perror("zmq_poll");
                    zmq_close(frontend);
                    zmq_close(backend);
                    zmq_ctx_destroy(context);
                    return -1;
                }
                //zmq_poll超時
                else if(rc == 0)
                    continue;
                else
                {
                    // 6.如果ROUTER套接字有數據來
                    if(items[0].revents & ZMQ_POLLIN)
                    {
                        while(1)
                        {
                            // 從ROUTER上接收數據, 這么數據是客戶端發送過來的"Hello"
                            zmq_msg_init(&msg);
                            zmq_msg_recv(&msg, frontend, 0);
         
                            // 查看是否是接收多部分消息, 如果后面還有數據要接收, 那么more會被置為1
                            size_t more_size = sizeof(more);
                            zmq_getsockopt(frontend, ZMQ_RCVMORE, &more, &more_size);
         
                            // 接收"Hello"之后, 將數據發送到DEALER上, DEALER會將"Hello"發送給服務端
                            zmq_msg_send(&msg, backend, more ? ZMQ_SNDMORE : 0);
                            zmq_msg_close(&msg);
         
                            // 如果沒有多部分數據可以接收了, 那么退出循環
                            if(!more)
                                break;
                        }
                    }
                    // 7.如果DEALER套接字有數據來
                    if(items[1].revents & ZMQ_POLLIN)
                    {
                        
                        while(1)
                        {
                            // 接收服務端的響應"World"
                            zmq_msg_init(&msg);
                            zmq_msg_recv(&msg, backend, 0);
         
                            // 查看是否是接收多部分消息, 如果后面還有數據要接收, 那么more會被置為1
                            size_t more_size = sizeof(more);
                            zmq_getsockopt(backend, ZMQ_RCVMORE, &more, &more_size);
         
                            // 接收"World"之后, 將數據發送到ROUTER上, ROUTER會將"World"發送給客戶端
                            zmq_msg_send(&msg, frontend, more ? ZMQ_SNDMORE : 0);
                            zmq_msg_close(&msg);
         
                            // 如果沒有多部分數據可以接收了, 那么退出循環
                            if(!more)
                                break;
                        }
                    }
                }
            }
         
            // 8.關閉套接字、銷毀上下文
            zmq_close(frontend);
            zmq_close(backend);
            zmq_ctx_destroy(context);
            
            return 0;
        }

        編譯如下:

        gcc -o rrclient rrclient.c -lzmq
        gcc -o rrworker rrworker.c -lzmq
        gcc -o rrbroker rrbroker.c -lzmq

        一次運行如下,左側為客戶端,中間為代理,右側為服務端

        下面運行兩個客戶端,0為代理,1、2為客戶端,3位服務端。可以看到客戶端的消息是有順序到達客戶端的,消息會自動進行排隊

    ØMQ自己提供了代理的接口zmq_proxy(),可以省略上面代碼的書寫,詳情可參閱:https://blog.csdn.net/qq_41453285/article/details/106887035
————————————————
版權聲明:本文為CSDN博主「江南、董少」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_41453285/article/details/106878960


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM