轉自:https://blog.csdn.net/qq_41453285/article/details/106878960
一、ØMQ模式總覽
ØMQ支持多種模式,具體可以參閱:https://blog.csdn.net/qq_41453285/article/details/106865539
本文介紹ØMQ的“請求-響應”模式
二、請求-響應模式
請求-響應模式由http://rfc.zeromq.org/spec:28正式定義
請求-應答模式應該是最常見的交互模式,如果連接之后,服務器終止,那么客戶端也終止,從崩潰的過程中恢復不太容易
因此,做一個可靠的請求-應答模式很復雜,在很后面我們會有一部分系列文章介紹“可靠的請求-應答模式”
“請求-響應模型”支持的套接字類型有4種:
ZMQ_REP
ZMQ_REQ
ZMQ_DEALER
ZMQ_ROUTER
三、“REQ-REP”套接字類型
請求-響應模式用於將請求從ZMQ_REQ客戶端發送到一個或多個ZMQ_REP服務,並接收對每個發送的請求的后續答復
REQ-REP套接字對是步調一致的。它們兩者的次序必須有規則,不能同時發送或接收,否則無效果
ZMQ_REQ
客戶端使用ZMQ_REQ類型的套接字向服務發送請求並從服務接收答復
此套接字類型僅允許zmq_send(request)和后續zmq_recv(reply)調用交替序列。發送的每個請求都在所有服務中輪流輪詢,並且收到的每個答復都與最后發出的請求匹配
如果沒有可用的服務,則套接字上的任何發送操作都應阻塞,直到至少有一項服務可用為止。REQ套接字不會丟棄消息
ZMQ_REQ特性摘要
兼容的對等套接字 ZMQ_REP、ZMQ_ROUTER
方向 雙向
發送/接收模式 發送、接收、發送、接收......
入網路由策略 最后一位(Last peer)
外發路由策略
輪詢
靜音狀態下的操作 阻塞
ZMQ_REP
服務使用ZMQ_REP類型的套接字來接收來自客戶端的請求並向客戶端發送回復
此套接字類型僅允許zmq_recv(request)和后續zmq_send(reply)調用的交替序列。接收到的每個請求都從所有客戶端中公平排隊,並且發送的每個回復都路由到發出最后一個請求的客戶端
如果原始請求者不再存在,則答復將被靜默丟棄
ZMQ_REP特性摘要
兼容的對等套接字 ZMQ_REQ、ZMQ_DEALER
方向 雙向
發送/接收模式 接收、發送、接收、發送......
入網路由策略 公平排隊
外發路由策略
最后一位(Last peer)
演示案例
本演示案例如下:
服務端創建REP套接字,阻塞等待客戶端消息的到達,當客戶端有消息達到時給客戶端回送“World”字符串
客戶端創建REP套接字,向服務端發送字符串“Hello”,然后等待服務端回送消息
服務端代碼如下:
// https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/hwserver.c
// hwserver.c
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
#include <zmq.h>
// 向socket發送數據, 數據為string
static int s_send(void *socket, char *string);
// 從socket接收數據, 並將數據以字符串的形式返回
static char *s_recv(void *socket);
int main()
{
// 1.創建上下文
void *context = zmq_ctx_new();
// 2.創建、綁定套接字
void *responder = zmq_socket(context, ZMQ_REP);
zmq_bind(responder, "tcp://*:5555");
int rc;
// 3.循環接收數據、發送數據
while(1)
{
// 4.接收數據
char *request = s_recv(responder);
assert(request != NULL);
printf("Request: %s\n", request);
free(request);
// 休眠1秒再繼續回復
sleep(1);
// 5.回送數據
char *reply = "World";
rc = s_send(responder, reply);
assert(rc > 0);
}
// 6.關閉套接字、銷毀上下文
zmq_close(responder);
zmq_ctx_destroy(context);
return 0;
}
static int s_send(void *socket, char *string)
{
int rc;
zmq_msg_t msg;
zmq_msg_init_size(&msg, 5);
memcpy(zmq_msg_data(&msg), string, strlen(string));
rc = zmq_msg_send(&msg, socket, 0);
zmq_msg_close(&msg);
return rc;
}
static char *s_recv(void *socket)
{
int rc;
zmq_msg_t msg;
zmq_msg_init(&msg);
rc = zmq_msg_recv(&msg, socket, 0);
if(rc == -1)
return NULL;
char *string = (char*)malloc(rc + 1);
memcpy(string, zmq_msg_data(&msg), rc);
zmq_msg_close(&msg);
string[rc] = 0;
return string;
}
還有一個使用C++ API編寫的服務端,可參閱:https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/hwserver.cpp
客戶端代碼如下:
// https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/hwclient.c
// hwclient.c
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <zmq.h>
// 向socket發送數據, 數據為string
static int s_send(void *socket, char *string);
// 從socket接收數據, 並將數據以字符串的形式返回
static char *s_recv(void *socket);
int main()
{
// 1.創建上下文
void *context = zmq_ctx_new();
// 2.創建套接字、連接服務器
void *requester = zmq_socket(context, ZMQ_REQ);
zmq_connect(requester, "tcp://localhost:5555");
int rc;
// 3.循環發送數據、接收數據
while(1)
{
// 4.發送數據
char *request = "Hello";
rc = s_send(requester, request);
assert(rc > 0);
// 5.接收回復數據
char *reply = s_recv(requester);
assert(reply != NULL);
printf("Reply: %s\n", reply);
free(reply);
}
// 6.關閉套接字、銷毀上下文
zmq_close(requester);
zmq_ctx_destroy(context);
return 0;
}
static int s_send(void *socket, char *string)
{
int rc;
zmq_msg_t msg;
zmq_msg_init_size(&msg, 5);
memcpy(zmq_msg_data(&msg), string, strlen(string));
rc = zmq_msg_send(&msg, socket, 0);
zmq_msg_close(&msg);
return rc;
}
static char *s_recv(void *socket)
{
int rc;
zmq_msg_t msg;
zmq_msg_init(&msg);
rc = zmq_msg_recv(&msg, socket, 0);
if(rc == -1)
return NULL;
char *string = (char*)malloc(rc + 1);
memcpy(string, zmq_msg_data(&msg), rc);
zmq_msg_close(&msg);
string[rc] = 0;
return string;
}
編譯並運行如下,左側為服務端,右側為客戶端:
gcc -o hwserver hwserver.c -lzmq
gcc -o hwclient hwclient.c -lzmq
四、“DEALER-ROUTER”套接字類型
本文介紹“DEALER-ROUTER”的語法和代理演示案例,在后面的一個專題中我們將介紹如何使用“DEALER-ROUTER”來構建各種異步請求-應答流
ZMQ_DEALER
ZMQ_DEALER類型的套接字是用於擴展“請求/應答”套接字的高級模式
發送消息時:當ZMQ_DEALER套接字由於已達到所有對等點的最高水位而進入靜音狀態時,或者如果根本沒有任何對等點,則套接字上的任何zmq_send()操作都應阻塞,直到靜音狀態結束或至少一個對等方變得可以發送;消息不會被丟棄
接收消息時:發送的每條消息都是在所有連接的對等方之間進行輪詢,並且收到的每條消息都是從所有連接的對等方進行公平排隊的
將ZMQ_DEALER套接字連接到ZMQ_REP套接字時,發送的每個消息都必須包含一個空的消息部分,定界符以及一個或多個主體部分
ZMQ_DEALER特性摘要
兼容的對等套接字 ZMQ_ROUTER、ZMQ_REP、ZMQ_DEALER
方向 雙向
發送/接收模式 無限制
入網路由策略 公平排隊
外發路由策略
輪詢
靜音狀態下的操作 阻塞
ZMQ_ROUTER
ZMQ_ROUTER類型的套接字是用於擴展請求/答復套接字的高級套接字類型
當收到消息時:ZMQ_ROUTER套接字在將消息傳遞給應用程序之前,應在消息部分之前包含消息的始發對等方的路由ID。接收到的消息從所有連接的同級之間公平排隊
發送消息時:
ZMQ_ROUTER套接字應刪除消息的第一部分,並使用它來確定消息應路由到的對等方的_routing id _。如果該對等點不再存在或從未存在,則該消息將被靜默丟棄
但是,如果ZMQ_ROUTER_MANDATORY套接字選項設置為1,這兩種情況下套接字都將失敗並顯示EHOSTUNREACH
高水位標記影響:
當ZMQ_ROUTER套接字由於達到所有同位體的高水位線而進入靜音狀態時,發送到該套接字的任何消息都將被丟棄,直到靜音狀態結束為止。同樣,任何路由到已達到單個高水位標記的對等方的消息也將被丟棄
如果ZMQ_ROUTER_MANDATORY套接字選項設置為1,則在兩種情況下套接字都應阻塞或返回EAGAIN
ZMQ_ROUTER_MANDATORY套接字選項:
當ZMQ_ROUTER套接字的ZMQ_ROUTER_MANDATORY標志設置為1時,套接字應在接收到來自一個或多個對等方的消息后生成ZMQ_POLLIN事件
同樣,當至少一個消息可以發送給一個或多個對等方時,套接字將生成ZMQ_POLLOUT事件
當ZMQ_REQ套接字連接到ZMQ_ROUTER套接字時,除了始發對等方的路由ID外,每個收到的消息都應包含一個空的定界符消息部分。因此,由應用程序看到的每個接收到的消息的整個結構變為:一個或多個路由ID部分,定界符部分,一個或多個主體部分。將回復發送到ZMQ_REQ套接字時,應用程序必須包括定界符部分
ZMQ_ROUTER特性摘要
兼容的對等套接字 ZMQ_DEALER、ZMQ_REQ、ZMQ_ROUTER
方向 雙向
發送/接收模式 無限制
入網路由策略 公平排隊
外發路由策略
看上面介紹
靜音狀態下的操作 丟棄(見上面介紹)
共享隊列/代理
在“REP-REQ”的演示案例中,我們只有一個客戶端和一個服務端進行交流。但是實際中,我們通常允許多個客戶端與多個服務端之間相互交流
將多個客戶端連接到多個服務器的方法有兩種:
方法①:將每個客戶端都連接到多個服務端點
方法②:使用代理
方法①:
原理:一種是將每個客戶端套接字連接到多個服務端點。REQ套接字隨后會將請求發送到服務端上。比如說一個客戶端連接了三個服務端點:A、B、C,之后發送請求R1、R4到服務A上,發送請求R2到服務B上、發送請求R3到服務C上(如下圖所示)
對於這種設計來說,服務器屬於靜態部分,客戶端屬於動態部分,客戶端的增減無所謂,但是服務器的增減確實致命的。假設現在有100個客戶端都連接了服務器,如果此時新增了三台服務器,為了讓客戶端識別這新增的三台服務器,那么就需要將所有的客戶端都停止重新配置然后再重新啟動
方法②:
我們可以編寫一個小型消息排隊代理,使我們具備靈活性
原理:該代理綁定到了兩個端點,一個用於客戶端的前端(ZMQ_ROUTER),另一個用於服務的后端(ZMQ_DEALER)。然后帶來使用zmq_poll()來輪詢這兩個套接字的活動,當有消息時,代理會將消息在兩個套接字之間頻繁地輸送
該代理其實並不顯式管理任何隊列,其只負責消息的傳送,ØMQ會自動將消息在每個套接字上進行排隊
使用zmq_poll()配合DEALER-ROUTER:
在上面我們使用REQ-REP套接字時,會有一個嚴格同步的請求-響應對話,就是必須客戶端先發送一個請求,然后服務端讀取請求並發送應答,最后客戶端讀取應答,如此反復。如果客戶端或服務端嘗試破壞這種約束(例如,連續發送兩個請求,而沒有等待響應),那么將返回一個錯誤
我們的代理必須是非阻塞的,可以使用zmq_poll()來輪詢任何一個套接字上的活動,但我們不能使用REQ-REQ。幸運地是,有兩個稱為DEALER和ROUTER的套接字,它們能我們可以執行無阻塞的請求-響應
代理演示案例
我們擴展了上面的“REQ-REP”演示案例:
REQ和ROUTER交流,DEALER與REP交流。代理節點從一個套接字讀取消息,並將消息轉發到其他套接字
客戶端的代碼如下:將REQ套接字連接到代理的ROUTER節點上,向ROUTER節點發送“Hello”,接收到“World”的回復
// rrclient.c
// https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/rrclient.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <zmq.h>
// 向socket發送數據, 數據為string
static int s_send(void *socket, char *string);
// 從socket接收數據, 並將數據以字符串的形式返回
static char *s_recv(void *socket);
int main()
{
int rc;
// 1.初始化上下文
void *context = zmq_ctx_new();
// 2.創建套接字、連接代理的ROUTER端
void *requester = zmq_socket(context, ZMQ_REQ);
rc = zmq_connect(requester, "tcp://localhost:5559");
if(rc == -1)
{
perror("zmq_connect");
zmq_close(requester);
zmq_ctx_destroy(context);
return -1;
}
// 3.循環發送、接收數據(10次)
int request_nbr;
for(request_nbr = 0; request_nbr < 10; request_nbr++)
{
// 4.先發送數據
rc = s_send(requester, "Hello");
if(rc < 0)
{
perror("s_send");
zmq_close(requester);
zmq_ctx_destroy(context);
return -1;
}
// 5.等待響應
char *reply = s_recv(requester);
if(reply == NULL)
{
perror("s_recv");
free(reply);
zmq_close(requester);
zmq_ctx_destroy(context);
return -1;
}
printf("Reply[%d]: %s\n", request_nbr + 1, reply);
free(reply);
}
// 6.關閉套接字、銷毀上下文
zmq_close(requester);
zmq_ctx_destroy(context);
return 0;
}
static int s_send(void *socket, char *string)
{
int rc;
zmq_msg_t msg;
zmq_msg_init_size(&msg, 5);
memcpy(zmq_msg_data(&msg), string, strlen(string));
rc = zmq_msg_send(&msg, socket, 0);
zmq_msg_close(&msg);
return rc;
}
static char *s_recv(void *socket)
{
int rc;
zmq_msg_t msg;
zmq_msg_init(&msg);
rc = zmq_msg_recv(&msg, socket, 0);
if(rc == -1)
return NULL;
char *string = (char*)malloc(rc + 1);
memcpy(string, zmq_msg_data(&msg), rc);
zmq_msg_close(&msg);
string[rc] = 0;
return string;
}
服務端的代碼如下:將REP套接字連接到代理的DEALER節點上
// rrworker.c
// https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/rrworker.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <zmq.h>
// 向socket發送數據, 數據為string
static int s_send(void *socket, char *string);
// 從socket接收數據, 並將數據以字符串的形式返回
static char *s_recv(void *socket);
int main()
{
int rc;
// 1.初始化上下文
void *context = zmq_ctx_new();
// 2.創建套接字、連接代理的DEALER端
void *responder = zmq_socket(context, ZMQ_REP);
rc = zmq_connect(responder, "tcp://localhost:5560");
if(rc == -1)
{
perror("zmq_connect");
zmq_close(responder);
zmq_ctx_destroy(context);
return -1;
}
// 3.循環接收、響應
while(1)
{
// 4.先等待接收數據
char *request = s_recv(responder);
if(request == NULL)
{
perror("s_recv");
free(request);
zmq_close(responder);
zmq_ctx_destroy(context);
return -1;
}
printf("Request: %s\n", request);
free(request);
// 休眠1秒再進行響應
sleep(1);
// 5.響應
rc = s_send(responder, "World");
if(rc < 0)
{
perror("s_send");
zmq_close(responder);
zmq_ctx_destroy(context);
return -1;
}
}
// 6.關閉套接字、銷毀上下文
zmq_close(responder);
zmq_ctx_destroy(context);
return 0;
}
static int s_send(void *socket, char *string)
{
int rc;
zmq_msg_t msg;
zmq_msg_init_size(&msg, 5);
memcpy(zmq_msg_data(&msg), string, strlen(string));
rc = zmq_msg_send(&msg, socket, 0);
zmq_msg_close(&msg);
return rc;
}
static char *s_recv(void *socket)
{
int rc;
zmq_msg_t msg;
zmq_msg_init(&msg);
rc = zmq_msg_recv(&msg, socket, 0);
if(rc == -1)
return NULL;
char *string = (char*)malloc(rc + 1);
memcpy(string, zmq_msg_data(&msg), rc);
zmq_msg_close(&msg);
string[rc] = 0;
return string;
}
代理端的代碼如下:
創建一個ROUTER套接字與客戶端相連接,創建一個DEALER套接字與服務端相連接
ROUTER套接字從客戶端接收請求數據,並把請求數據發送給服務端
DEALER套接字從服務端接收響應數據,並把響應數據發送給客戶端
// rrbroker.c
// https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/rrbroker.c
#include <stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <zmq.h>
int main()
{
int rc;
// 1.初始化上下文
void *context = zmq_ctx_new();
// 2.創建、綁定套接字
void *frontend = zmq_socket(context, ZMQ_ROUTER);
void *backend = zmq_socket(context, ZMQ_DEALER);
// ZMQ_ROUTER綁定到5559, 接收客戶端的請求
rc = zmq_bind(frontend, "tcp://*:5559");
if(rc == -1)
{
perror("zmq_bind");
zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(context);
return -1;
}
// ZMQ_DEALER綁定到5560, 接收服務端的回復
rc = zmq_bind(backend, "tcp://*:5560");
if(rc == -1)
{
perror("zmq_bind");
zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(context);
return -1;
}
// 3.初始化輪詢集合
zmq_pollitem_t items[] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
// 4.在套接字上切換消息
while(1)
{
zmq_msg_t msg;
//多部分消息檢測
int more;
// 5.調用zmq_poll輪詢消息
rc = zmq_poll(items, 2, -1);
//zmq_poll出錯
if(rc == -1)
{
perror("zmq_poll");
zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(context);
return -1;
}
//zmq_poll超時
else if(rc == 0)
continue;
else
{
// 6.如果ROUTER套接字有數據來
if(items[0].revents & ZMQ_POLLIN)
{
while(1)
{
// 從ROUTER上接收數據, 這么數據是客戶端發送過來的"Hello"
zmq_msg_init(&msg);
zmq_msg_recv(&msg, frontend, 0);
// 查看是否是接收多部分消息, 如果后面還有數據要接收, 那么more會被置為1
size_t more_size = sizeof(more);
zmq_getsockopt(frontend, ZMQ_RCVMORE, &more, &more_size);
// 接收"Hello"之后, 將數據發送到DEALER上, DEALER會將"Hello"發送給服務端
zmq_msg_send(&msg, backend, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&msg);
// 如果沒有多部分數據可以接收了, 那么退出循環
if(!more)
break;
}
}
// 7.如果DEALER套接字有數據來
if(items[1].revents & ZMQ_POLLIN)
{
while(1)
{
// 接收服務端的響應"World"
zmq_msg_init(&msg);
zmq_msg_recv(&msg, backend, 0);
// 查看是否是接收多部分消息, 如果后面還有數據要接收, 那么more會被置為1
size_t more_size = sizeof(more);
zmq_getsockopt(backend, ZMQ_RCVMORE, &more, &more_size);
// 接收"World"之后, 將數據發送到ROUTER上, ROUTER會將"World"發送給客戶端
zmq_msg_send(&msg, frontend, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&msg);
// 如果沒有多部分數據可以接收了, 那么退出循環
if(!more)
break;
}
}
}
}
// 8.關閉套接字、銷毀上下文
zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(context);
return 0;
}
編譯如下:
gcc -o rrclient rrclient.c -lzmq
gcc -o rrworker rrworker.c -lzmq
gcc -o rrbroker rrbroker.c -lzmq
一次運行如下,左側為客戶端,中間為代理,右側為服務端
下面運行兩個客戶端,0為代理,1、2為客戶端,3位服務端。可以看到客戶端的消息是有順序到達客戶端的,消息會自動進行排隊
ØMQ自己提供了代理的接口zmq_proxy(),可以省略上面代碼的書寫,詳情可參閱:https://blog.csdn.net/qq_41453285/article/details/106887035
————————————————
版權聲明:本文為CSDN博主「江南、董少」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_41453285/article/details/106878960