ZMQ 模式學習


發布訂閱模式:

PUB發送,send。SUB接收,recv。和PUSH-PULL模式不同,PUB將消息同時發給和他建立的鏈接,類似於廣播。另外發布訂閱模式也可以使用訂閱過濾來實現只接收特定的消息。訂閱過濾是在服務器上進行過濾的,如果一個訂閱者設定了過濾,那么發布者將只發布滿足他訂閱條件的消息。
這個就是廣播和收聽的關系。PUB-SUB模式雖然沒有使用網絡的廣播功能,但是它內部是異步的。也就是一次發送沒有結束立刻開始下一次發送。
廣播所有client,沒有隊列緩存,斷開連接數據將永遠丟失。client可以進行數據過濾。
943117-20160705143357999-601098470.pngserver:

 1 #include <zmq.h>
 2 #include <stdio.h>
 3 #include <stdlib.h>
 4 #include "zmq_helper.h"
 5 
 6 int main(void)
 7 {
 8     void * context = zmq_ctx_new();
 9     void * socket = zmq_socket(context, ZMQ_PUB);
10     zmq_bind(socket, "tcp://*:5556");
11 
12     srandom((unsigned)time(NULL));
13 
14     while(1)
15     {
16         int zipcode = randof(100000);   // 郵編: 0 ~ 99999
17         int temp = randof(84) - 42;     // 溫度: -42 ~ 41
18         int relhumidity = randof(50) + 10;  // 相對濕度: 10 ~ 59
19 
20         char msg[20];
21         snprintf(msg, sizeof(msg), "%5d %d %d", zipcode, temp, relhumidity);
22         s_send(socket, msg);
23     }
24 
25     zmq_close(socket);
26     zmq_ctx_destroy(context);
27 
28     return 0;
29 
30 }

client:

#include <zmq.h>
#include <stdio.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket = zmq_socket(context, ZMQ_SUB);
    zmq_connect(socket, "tcp://localhost:5556");

    char * zipcode = "10001";
    zmq_setsockopt(socket, ZMQ_SUBSCRIBE, zipcode, strlen(zipcode));

    for(int i = 0; i < 50; ++i)
    {
        char * string = s_recv(socket);
        printf("[Subscriber] Received weather report msg: %s\n", string);
        free(string);
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);
    
    return 0;
}
  1. ZMQ_PUB類型的socket, 如果沒有任何client與其相連, 其所有消息都將被簡單就地拋棄
  2. ZMQ_SUB類型的socket, 即是client, 可以與多個ZMQ_PUB類型的socket相連, 即村民可以同時收聽多個msg 但必須為每個msg都設置過濾器. 否則默認情況下, zmq認為client不關心msg里的所有內容.
  3. 當一個cline收聽多個時, 接收消息采用公平隊列策略
  4. 如果存在至少一個clint在收聽, 那么這個消息就不會被隨意拋棄: 這句話的意思是, 當消息過多, 而client的消化能力比較低的話, 未發送的消息會緩存在msg里.
  5. 在ZMQ大版本號在3以上的版本里, 當msg與client的速度不匹配時. 若使用的傳輸層協議是tcpipc這種面向連接的協議, 則堆積的消息緩存在里, 當使用epgm這種協議時, 堆積的消息緩存了client里. 在ZMQ 大版本號為2的版本中, 所有情況下, 消息都將堆積在clinet里

 

Parallel Pipeline模式:

由三部分組成,push進行數據推送,work進行數據緩存,pull進行數據競爭獲取處理。區別於Publish-Subscribe存在一個數據緩存和處理負載。

當連接被斷開,數據不會丟失,重連后數據繼續發送到對端。
943117-20160705143418983-31884127.png

分治套路里有三個角色:

  1. Ventilator. 包工頭, 向手下各個工程隊分派任務. 一個.
  2. Worker. 工程隊, 從包工頭里接收任務, 干活. 多個.
  3. Sink. 甲方監理, 工程隊干完活后, 向甲方監理報告. 所以工程隊的活干完之后, 監理統一收集所有工程隊的成果. 一個.

包工頭代碼:

 1 #include <zmq.h>
 2 #include <stdio.h>
 3 #include <time.h>
 4 #include "zmq_helper.h"
 5 
 6 int main(void)
 7 {
 8     void * context = zmq_ctx_new();
 9     void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
10     void * socket_to_worker = zmq_socket(context, ZMQ_PUSH);
11     zmq_connect(socket_to_sink, "tcp://localhost:5558");
12     zmq_bind(socket_to_worker, "tcp://*:5557");
13 
14     printf("Press Enter when all workers get ready:");
15     getchar();
16     printf("Sending tasks to workers...\n");
17 
18     s_send(socket_to_sink, "Get ur ass up");    // 通知監理, 干活了
19 
20     srandom((unsigned)time(NULL));
21 
22     int total_ms = 0;
23     for(int i = 0; i < 100; ++i)
24     {
25         int workload = randof(100) + 1;     // 工作需要的耗時, 單位ms
26         total_ms += workload;
27         char string[10];
28         snprintf(string, sizeof(string), "%d", workload);
29         s_send(socket_to_worker, string);   // 將工作分派給工程隊
30     }
31 
32     printf("Total expected cost: %d ms\n", total_ms);
33 
34     zmq_close(socket_to_sink);
35     zmq_close(socket_to_worker);
36     zmq_ctx_destroy(context);
37 
38     return 0;
39 }

工程隊代碼:

 1 #include <zmq.h>
 2 #include <stdio.h>
 3 #include "zmq_helper.h"
 4 
 5 int main(void)
 6 {
 7     void * context = zmq_ctx_new();
 8     void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL);
 9     void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
10     zmq_connect(socket_to_ventilator, "tcp://localhost:5557");
11     zmq_connect(socket_to_sink, "tcp://localhost:5558");
12 
13     while(1)
14     {
15         char * msg = s_recv(socket_to_ventilator);
16         printf("Received msg: %s\n", msg);
17         fflush(stdout);
18         s_sleep(atoi(msg));     // 干活, 即睡眠指定毫秒
19         free(msg);
20         s_send(socket_to_sink, "DONE"); // 活干完了通知監理
21     }
22 
23     zmq_close(socket_to_ventilator);
24     zmq_close(socket_to_sink);
25     zmq_ctx_destroy(context);
26 
27     return 0;
28 }

監理代碼:

 1 #include <zmq.h>
 2 #include <stdio.h>
 3 #include "zmq_helper.h"
 4 
 5 int main(void)
 6 {
 7     void * context = zmq_ctx_new();
 8     void * socket_to_worker_and_ventilator = zmq_socket(context, ZMQ_PULL);
 9     zmq_bind(socket_to_worker_and_ventilator, "tcp://*:5558");
10 
11     char * msg = s_recv(socket_to_worker_and_ventilator);
12     printf("Received msg: %s", msg);    // 接收來自包工頭的開始干活的消息
13     free(msg);
14 
15     int64_t start_time = s_clock();
16 
17     for(int i = 0; i < 100; ++i)
18     {
19         // 接收100個worker干完活的消息
20         char * msg = s_recv(socket_to_worker_and_ventilator);
21         free(msg);
22 
23         if(i / 10 * 10 == i)
24             printf(":");
25         else
26             printf(".");
27         fflush(stdout);
28     }
29 
30     printf("Total elapsed time: %d ms]\n", (int)(s_clock() - start_time));
31 
32     zmq_close(socket_to_worker_and_ventilator);
33     zmq_ctx_destroy(context);
34 
35     return 0;
36 }

這個示例程序的邏輯流程是這樣的:

  1. 包工頭向兩個角色發送消息: 向工程隊發送共計100個任務, 向監理發送消息, 通知監理開始干活
  2. 工程隊接收來自包工頭的消息, 並按消息里的數值, 睡眠指定毫秒. 每個任務結束后都通知監理.
  3. 監理先是接收來自包工頭的消息, 開始計時. 然后統計來自工程隊的消息, 當收集到100個任務完成的消息后, 計算實際耗時.

包工頭里輸出的預計耗時是100個任務的共計耗時, 在監理那里統計的實際耗時則是由多個工程隊並行處理100個任務實際的耗時.

這里個例子中需要注意的點有:

  1. 這個例子中使用了ZMQ_PULLZMQ_PUSH兩種socket. 分別供消息分發方與消息接收方使用. 看起來略微有點類似於發布-訂閱套路, 具體之間的區別后續章節會講到.
  2. 工程隊上接包工頭, 下接監理. 在任務執行過程中, 你可以隨意的增加工程隊的數量.
  3. 我們通過讓包工頭通知監理, 以及手動輸入enter來啟動任務分發的方式, 手動同步了工程隊/包工頭/監理. PUSH/PULL模式雖然和PUB/SUB不一樣, 不會丟失消息. 但如果不手動同步的話, 最先建立連接的工程隊將幾乎把所有任務都接收到手, 導致后續完成連接的工程隊拿不到任務, 任務分配不平衡.
  4. 包工頭分派任務使用的是輪流/平均分配的方式.這是一種簡單的負載均衡
  5. 監理接收多個工程隊的消息, 使用的是公平隊列策略.

正確的處理context

你大致注意到了, 在上面的所有示例代碼中, 每次都以zmq_ctx_new()函數創建出一個名為context的變量, 目前你不需要了解它的細節, 這只是ZMQ庫的標准套路. 甚至於你將來都不需要了解這個context里面到底是什么. 但你必須要遵循zmq中關於這個context的一些編程規定:

  1. 在一個進程起始時調用zmq_ctx_new()創建context
  2. 在進程結束之前調用zmq_ctx_destroy()銷毀掉它

每個進程, 應該持有, 且應該只持有, 一個context. 當然, 目前來說, 你這樣理解就行了, 后續章節或許我們會深入探索一下context, 但目前, 請謹記, one context per process.

如果你在代碼中調用了fork系統調用, 那么請在子進程代碼區的開始處調用zmq_ctx_new(), 為子進程創建自己的context


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM