ZMQ指南(第三章:高級請求-應答模式)


# ZMQ 第三章 高級請求-應答模式

     在第二章中我們通過開發一系列的小應用來熟悉ØMQ的基本使用方法,每個應用會引入一些新的特性。本章會沿用這種方式,來探索更多建立在ØMQ請求-應答模式之上的高級工作模式。

本章涉及的內容有:

* 在請求-應答模式中創建和使用消息信封
* 使用REQ、REP、DEALER和ROUTER套接字
* 使用標識來手工指定應答目標
* 使用自定義離散路由模式
* 使用自定義最近最少使用路由模式
* 構建高層消息封裝類
* 構建基本的請求應答代理
* 合理命名套接字
* 模擬client-worker集群
* 構建可擴展的請求-應答集群雲
* 使用管道套接字監控線程

### Request-Reply Envelopes

     在請求-應答模式中,信封里保存了應答目標的位置。這就是為什么ØMQ網絡雖然是無狀態的,但仍能完成請求-應答的過程。

     在一般使用過程中,你並不需要知道請求-應答信封的工作原理。使用REQ、REP時,ØMQ會自動處理消息信封。下一章講到的裝置(device),使用時也只需保證讀取和寫入所有的信息即可。       ØMQ使用多段消息的方式來存儲信封,所以在復制消息時也會復制信封。

     然而,在使用高級請求-應答模式之前是需要了解信封這一機制的,以下是信封機制在ROUTER中的工作原理:

    * 從ROUTER中讀取一條消息時,ØMQ會包上一層信封,上面注明了消息的來源。
     * 向ROUTER寫入一條消息時(包含信封),ØMQ會將信封拆開,並將消息遞送給相應的對象。

     如果將從ROUTER A中獲取的消息(包含信封)寫入ROUTER B(即將消息發送給一個DEALER,該DEALER連接到了ROUTER),那么在從ROUTER B中獲取該消息時就會包含兩層信封。

     信封機制的根本作用是讓ROUTER知道如何將消息遞送給正確的應答目標,你需要做的就是在程序中保留好該信封。回顧一下REP套接字,它會將收到消息的信封逐個拆開,將消息本身傳送給應用程序。而在發送時,又會在消息外層包裹該信封,發送給ROUTER,從而傳遞給正確的應答目標。

我們可以使用上述原理建立起一個ROUTER-DEALER裝置:

```
[REQ] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [ROUTER--DEALER] <--> [REP]
...etc.
```

當你用REQ套接字去連接ROUTER套接字,並發送一條請求消息,你會從ROUTER中獲得一條如下所示的消息:


* 第三幀是應用程序發送給REQ套接字的消息;
* 第二幀的空信息是REQ套接字在發送消息給ROUTER之前添加的;
* 第一幀即信封,是由ROUTER套接字添加的,記錄了消息的來源。

     如果我們在一條裝置鏈路上傳遞該消息,最終會得到包含多層信封的消息。最新的信封會在消息的頂部


以下將詳述我們在請求-應答模式中使用到的四種套接字類型:

     * DEALER是一種負載均衡,它會將消息分發給已連接的節點,並使用公平隊列的機制處理接受到的消息。DEALER的作用就像是PUSH和PULL的結合。

     * REQ發送消息時會在消息頂部插入一個空幀,接受時會將空幀移去。其實REQ是建立在DEALER之上的,但REQ只有當消息發送並接受到回應后才能繼續運行。

     * ROUTER在收到消息時會在頂部添加一個信封,標記消息來源。發送時會通過該信封決定哪個節點可以獲取到該條消息。

     * REP在收到消息時會將第一個空幀之前的所有信息保存起來,將原始信息傳送給應用程序。在發送消息時,REP會用剛才保存的信息包裹應答消息。REP其實是建立在ROUTER之上的,但和REQ一樣,必須完成接受和發送這兩個動作后才能繼續。

      REP要求消息中的信封由一個空幀結束,所以如果你沒有用REQ發送消息,則需要自己在消息中添加這個空幀。

     你肯定會問,ROUTER是怎么標識消息的來源的?答案當然是套接字的標識。我們之前講過,一個套接字可能是瞬時的,它所連接的套接字(如 ROUTER)則會給它生成一個標識,與之相關聯。一個套接字也可以顯式地給自己定義一個標識,這樣其他套接字就可以直接使用了。

     這是一個瞬時的套接字,ROUTER會自動生成一個UUID來標識消息的來源。



這是一個 持久的套接字,標識由消息來源自己指定。




     下面讓我們在實例中觀察上述兩種操作。下列程序會打印出ROUTER從兩個REP套接字中獲得的消息,其中一個沒有指定標識,另一個指定了“Hello”作為標識。

**identity.c**

```c
//
// 以下程序演示了如何在請求-應答模式中使用套接字標識。
// 需要注意的是s_開頭的函數是由zhelpers.h提供的。
// 我們沒有必要重復編寫那些代碼。
//
#include "zhelpers.h"
 
int main (void)
{
    void *context = zmq_init (1);
 
    void *sink = zmq_socket (context,  ZMQ_ROUTER);
    zmq_bind (sink, " inproc://example");
 
    // 第一個套接字由0MQ自動設置標識
    void *anonymous = zmq_socket (context,  ZMQ_REQ);
    zmq_connect (anonymous, " inproc://example");
    s_send (anonymous, "ROUTER uses a generated UUID");
    s_dump (sink);
 
    // 第二個由自己設置
    void *identified = zmq_socket (context,  ZMQ_REQ);
     zmq_setsockopt (identified, ZMQ_IDENTITY, "Hello", 5);
    zmq_connect (identified, " inproc://example");
    s_send (identified, "ROUTER socket uses REQ's socket identity");
    s_dump (sink);
 
    zmq_close (sink);
    zmq_close (anonymous);
    zmq_close (identified);
    zmq_term (context);
    return 0;
}
```

運行結果:

```
----------------------------------------
[017] 00314F043F46C441E28DD0AC54BE8DA727
[000]
[026] ROUTER uses a generated UUID
----------------------------------------
[005] Hello
[000]
[038] ROUTER socket uses REQ's socket identity
```

### 自定義“請求-應答”路由

     我們已經看到ROUTER套接字是如何使用信封將消息發送給正確的應答目標的,下面我們從一個角度來定義ROUTER:在發送消息時使用一定格式的信封提供正確的路由目標,ROUTER就能夠將該條消息異步地發送給對應的節點

     所以說ROUTER的行為是完全可控的。在深入理解這一特性之前,讓我們先近距離觀察一下REQ和REP套接字,賦予他們一些鮮活的角色:

     * REQ是一個“媽媽”套接字,不會耐心聽別人說話,但會不斷地拋出問題尋求解答。REQ是嚴格同步的,它永遠位於消息鏈路的請求端;
     * REP則是一個“爸爸”套接字,只會回答問題,不會主動和別人對話。REP也是嚴格同步的,並一直位於應答端。

     關於“媽媽”套接字,正如我們小時候所經歷的,只能等她向你開口時你們才能對話。媽媽不像爸爸那么開明,也不會像DEALER套接字一樣接受模棱兩可的回答。所以,想和REQ套接字對話只有等它主動發出請求后才行,之后它就會一直等待你的回答,不管有多久。

     “爸爸”套接字則給人一種強硬、冷漠的感覺,他只做一件事:無論你提出什么問題,都會給出一個精確的回答。不要期望一個REP套接字會主動和你對話或是將你倆的交談傳達給別人,它不會這么做的。

     我們通常認為請求-應答模式一定是有來有往、有去有回的過程,但實際上這個過程是可以異步進行的。我們只需獲得相應節點的地址,即可通過ROUTER套接字來異步地發送消息。ROUTER是ZMQ中唯一一個可以定位消息來源的套接字。

我們對請求-應答模式下的路由做一個小結:

     * 對於瞬時的套接字,ROUTER會動態生成一個UUID來標識它,因此從ROUTER中獲取到的消息里會包含這個標識;
     * 對於持久的套接字,可以自定義標識,ROUTER會如直接將該標識放入消息之中;
     * 具有顯式聲明標識的節點可以連接到其他類型的套接字;
     * 節點可以通過配置文件等機制提前獲知對方節點的標識,作出相應的處理。

我們至少有三種模式來實現和ROUTER的連接:

* ROUTER-DEALER
* ROUTER-REQ
* ROUTER-REP

     每種模式下我們都可以完全掌控消息的路由方式,但不同的模式會有不一樣的應用場景和消息流,下一節開始我們會逐一解釋。

自定義路由也有一些注意事項:

     * 自定義路由讓節點能夠控制消息的去向,這一點有悖ØMQ的規則。使用自定義路由的唯一理由是ØMQ缺乏更多的路由算法供我們選擇;
     * 未來的ØMQ版本可能包含一些我們自定義的路由方式,這意味着我們現在設計的代碼可能無法在新版本的ØMQ中運行,或者成為一種多余;
     * 內置的路由機制是可擴展的,且對裝置友好,但自定義路由就需要自己解決這些問題。

     所以說自定義路由的成本是比較高的,更多情況下應當交由ØMQ來完成。不過既然我們已經講到這兒了,就繼續深入下去吧!

### ROUTER-DEALER路由

      ROUTER-DEALDER是一種最簡單的路由方式。將ROUTER和多個DEALER相連接,用一種合適的算法來決定如何分發消息給DEALER。DEALER可以是一個黑洞(只負責處理消息,不給任何返回)、代理(將消息轉發給其他節點)或是服務(會發送返回信息)。

     如果你要求DEALER能夠進行回復,那就要保證只有一個ROUTER連接到DEALER,因為DEALER並不知道哪個特定的節點在聯系它,如果有多個節點,它會做負載均衡,將消息分發出去。但如果DEALER是一個黑洞,那就可以連接任何數量的節點。

      ROUTER-DEALER路由可以用來做什么呢?如果DEALER會將它完成任務的時間回復給ROUTER,那ROUTER就可以知道這個 DEALER的處理速度有多快了。因為ROUTER和DEALER都是異步的套接字,所以我們要用zmq_poll()來處理這種情況。

     下面例子中的兩個DEALER不會返回消息給ROUTER,我們的路由采用加權隨機算法:發送兩倍多的信息給其中的一個DEALER。



**rtdealer.c**

```c
//
// 自定義ROUTER-DEALER路由
//
// 這個實例是單個進程,這樣方便啟動。
// 每個線程都有自己的ZMQ上下文,所以可以認為是多個進程在運行。
//
#include "zhelpers.h"
#include <pthread.h>
 
// 這里定義了兩個worker,其代碼是一樣的。
//
static void *worker_task_a (void *args)
{
    void *context = zmq_init (1);
    void *worker = zmq_socket (context,  ZMQ_DEALER);
    zmq_setsockopt (worker,  ZMQ_IDENTITY, "A", 1);
    zmq_connect (worker, "ipc://routing.ipc");
 
    int total = 0;
    while (1) {
        // 我們只接受到消息的第二部分
        char *request = s_recv (worker);
        int finished = (strcmp (request, "END") == 0);
        free (request);
        if (finished) {
            printf ("A received: %d\n", total);
            break;
        }
        total++;
    }
    zmq_close (worker);
    zmq_term (context);
    return NULL;
}
 
static void *worker_task_b (void *args)
{
    void *context = zmq_init (1);
    void *worker = zmq_socket (context, ZMQ_DEALER);
    zmq_setsockopt (worker,  ZMQ_IDENTITY, "B", 1);
    zmq_connect (worker, "ipc://routing.ipc");
 
    int total = 0;
    while (1) {
        // 我們只接受到消息的第二部分
        char *request = s_recv (worker);
        int finished = (strcmp (request, "END") == 0);
        free (request);
        if (finished) {
            printf ("B received: %d\n", total);
            break;
        }
        total++;
    }
    zmq_close (worker);
    zmq_term (context);
    return NULL;
}
 
int main (void)
{
    void *context = zmq_init (1);
    void *client = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (client, "ipc://routing.ipc");
 
    pthread_t worker;
    pthread_create (&worker, NULL, worker_task_a, NULL);
    pthread_create (&worker, NULL, worker_task_b, NULL);
 
     // 等待線程連接至套接字,否則我們發送的消息將不能被正確路由
    sleep (1);
 
    // 發送10個任務,給A兩倍多的量
    int task_nbr;
    srandom ((unsigned) time (NULL));
    for (task_nbr = 0; task_nbr < 10; task_nbr++) {
        // 發送消息的兩個部分:第一部分是目標地址
        if (randof (3) > 0)
            s_sendmore (client, "A");s_sendmore        else
            s_sendmore (client, "B");
 
        // 然后是任務
        s_send (client, "This is the workload");
    }
    s_sendmore (client, "A");
    s_send (client, "END");
 
    s_sendmore (client, "B");
    s_send (client, "END");
 
    zmq_close (client);
    zmq_term (context);
    return 0;
}
```

對上述代碼的兩點說明:

     *  ROUTER並不知道DEALER何時會准備好,我們可以用信號機制來解決 ,但為了不讓這個例子太過復雜,我們就用sleep(1)的方式來處理。如果沒有這句話,那ROUTER一開始發出的消息將無法被路由,ØMQ會丟棄這些消息。
     *  需要注意的是,除了ROUTER會丟棄無法路由的消息外,PUB套接字當沒有SUB連接它時也會丟棄發送出去的消息 。其他套接字則會將無法發送的消息存儲起來,直到有節點來處理它們。

在將消息路由給DEALER時,我們手工建立了這樣一個信封:




      ROUTER套接字會移除第一幀,只將第二幀的內容傳遞給相應的DEALER。當DEALER發送消息給ROUTER時,只會發送一幀,ROUTER會在外層包裹一個信封(添加第一幀),返回給我們。

     如果你定義了一個非法的信封地址,ROUTER會直接丟棄該消息,不作任何提示。對於這一點我們也無能為力,因為出現這種情況只有兩種可能,一是要送達的目標節點不復存在了,或是程序中錯誤地指定了目標地址。如何才能知道消息會被正確地路由?唯一的方法是讓路由目標發送一些反饋消息給我們。后面幾章會講述這一點。

      DEALER的工作方式就像是PUSH和PULL的結合。但是,我們不能用PULL或PUSH去構建請求-應答模式。

### 最近最少使用算法路由(LRU模式)

     我們之前講過REQ套接字永遠是對話的發起方,然后等待對方回答。這一特性可以讓我們能夠保持多個REQ套接字等待調配。換句話說,REQ套接字會告訴我們它已經准備好了。

     你可以將ROUTER和多個REQ相連,請求-應答的過程如下:

* REQ發送消息給ROUTER
* ROUTER返回消息給REQ
* REQ發送消息給ROUTER
* ROUTER返回消息給REQ
* ...

     和DEALER相同,REQ只能和一個ROUTER連接,除非你想做類似多路冗余路由這樣的事(我甚至不想在這里解釋),其復雜度會超過你的想象並迫使你放棄的。



      ROUTER-REQ模式可以用來做什么?最常用的做法久是最近最少使用算法(LRU)路由了,ROUTER發出的請求會讓等待最久的REQ來處理。請看示例:


自定義ROUTER-REQ路由:
//
#include "zhelpers.h"
#include <pthread.h>
 
#define NBR_WORKERS 10
 
static void *worker_task(void *args) {
    void *context = zmq_init(1);
    void *worker = zmq_socket(context,  ZMQ_REQ);
 
    // s_set_id()函數會根據套接字生成一個可打印的字符串,
    // 並以此作為該套接字的標識。
    s_set_id(worker);
    zmq_connect(worker, "ipc://routing.ipc");
 
    int total = 0;
    while (1)
    {
        // 告訴ROUTER我已經准備好了
        s_send(worker, " ready");
 
        // 從ROUTER中獲取工作,直到收到結束的信息
        char *workload = s_recv(worker);
        int finished = (strcmp(workload, "END") == 0);
        free(workload);
        if (finished) {
            printf("Processed: %d tasks\n", total);
            break;
        }
        total++;
 
        // 隨機等待一段時間
        s_sleep(randof(1000) + 1);
    }
    zmq_close(worker);
    zmq_term(context);
    return NULL;
}
 
int main(void) {
    void *context = zmq_init(1);
    void *client = zmq_socket(context,  ZMQ_ROUTER);
    zmq_bind(client, "ipc://routing.ipc");
    srandom((unsigned) time(NULL));
 
    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        pthread_t worker;
        pthread_create(&worker, NULL, worker_task, NULL);
    }
    int task_nbr;
    for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++) {
        // 最近最少使用的worker就在消息隊列中
        char *address = s_recv(client);
        char *empty = s_recv(client);
        free(empty);
        char *ready = s_recv(client);
        free(ready);
 
        s_sendmore(client, address);
        s_sendmore(client, "");
        s_send(client, "This is the workload");
        free(address);
    }
    // 通知所有REQ套接字結束工作
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        char *address = s_recv(client);
        char *empty = s_recv(client);
        free(empty);
        char *ready = s_recv(client);
        free(ready);
 
        s_sendmore(client, address);
        s_sendmore(client, "");
        s_send(client, "END");
        free(address);
    }
    zmq_close(client);
    zmq_term(context);
    return 0;
}
```
     在這個示例中,實現LRU算法並沒有用到特別的數據結構,因為ØMQ的消息隊列機制已經提供了等價的實現。一個更為實際的LRU算法應該將已准備好的worker收集起來,保存在一個隊列中進行分配。以后我們會講到這個例子。

     程序的運行結果會將每個worker的執行次數打印出來。由於REQ套接字會隨機等待一段時間,而我們也沒有做負載均衡,所以我們希望看到的是每個worker執行相近的工作量。這也是程序執行的結果。

```
Processed: 8 tasks
Processed: 8 tasks
Processed: 11 tasks
Processed: 7 tasks
Processed: 9 tasks
Processed: 11 tasks
Processed: 14 tasks
Processed: 11 tasks
Processed: 11 tasks
Processed: 10 tasks
```

關於以上代碼的幾點說明:

     * 我們不需要像前一個例子一樣等待一段時間,因為REQ套接字會明確告訴ROUTER它已經准備好了。

     * 我們使用了zhelpers.h提供的s_set_id()函數來為套接字生成一個可打印的字符串標識,這是為了讓例子簡單一些。在現實環境中,REQ套 接字都是匿名的,你需要直接調用zmq_recv()和zmq_send()來處理消息,因為s_recv()和s_send()只能處理字符串標識的套 接字。

     * 更糟的是,我們使用了隨機的標識,不要在現實環境中使用隨機標識的持久套接字,這樣做會將節點消耗殆盡。

     * 如果你只是將上面的代碼拷貝過來,沒有充分理解,那你就像是看到蜘蛛人從屋頂上飛下來,你也照着做了,后果自負吧。

在將消息路由給REQ套接字時,需要注意一定的格式,即地址-空幀-消息:



### 使用地址進行路由

     在經典的請求-應答模式中,ROUTER一般不會和REP套接字通信,而是由DEALER去和REP通信。DEALER會將消息隨機分發給多個REP,並獲得結果。ROUTER更適合和REQ套接字通信

     我們應該記住,ØMQ的經典模型往往是運行得最好的,畢竟人走得多的路往往是條好路,如果不按常理出牌,那很有可能會跌入無敵深潭。下面我們就將ROUTER和REP進行連接,看看會發生什么。

REP套接字有兩個特點:

* 它 需要完成完整的請求-應答周期
* 它 可以接受任意大小的信封,並能完整地返回該信封

     在一般的請求-應答模式中,REP是匿名的,可以隨時替換。因為我們這里在將自定義路由,就要做到將一條消息發送給REP A,而不是REP B。這樣才能保證網絡的一端是你,另一端是特定的REP。

      ØMQ的核心理念之一是周邊的節點應該盡可能的智能,且數量眾多,而中間件則是固定和簡單的。這就意味着周邊節點可以向其他特定的節點發送消息,比如 可以連接到一個特定的REP。這里我們先不討論如何在多個節點之間進行路由,只看最后一步中ROUTER如何和特定的REP通信的。


這張圖描述了以下事件:

* client有一條消息,將來會通過另一個ROUTER將該消息發送回去。這條信息包含了兩個地址、一個空幀、以及消息內容;
* client將該條消息發送給了ROUTER,並指定了REP的地址;
* ROUTER將該地址移去,並以此決定其下哪個REP可以獲得該消息;
* REP收到該條包含地址、空幀、以及內容的消息;
* REP將空幀之前的所有內容移去,交給worker去處理消息;
* worker處理完成后將回復交給REP;
REP將之前保存好的信封包裹住該條回復,並發送給ROUTER;
* ROUTER在該條回復上又添加了一個注明REP的地址的幀。

這個過程看起來很復雜,但還是有必要取了解清楚的。 只要記住,REP套接字會原封不動地將信封返回回去

**rtpapa.c**


自定義ROUTER-REP路由:
//
#include "zhelpers.h"
 
// 這里使用一個進程來強調事件發生的順序性
int main (void)
{
    void *context = zmq_init (1);
 
    void *client = zmq_socket (context,  ZMQ_ROUTER);
    zmq_bind (client, "ipc://routing.ipc");
 
    void *worker = zmq_socket (context,  ZMQ_REP);
    zmq_setsockopt (worker,  ZMQ_IDENTITY, " A", 1);
    zmq_connect (worker, "ipc://routing.ipc");
 
    // 等待worker連接
    sleep (1);
 
    // 發送REP的標識、地址、空幀、以及消息內容
    s_sendmore (client, "A");      // REP的標識
    s_sendmore (client, "address 3");
    s_sendmore (client, "address 2");
    s_sendmore (client, "address 1");
    s_sendmore (client, "");      // 空幀
    s_send (client, "This is the workload");           // 消息內容
 
     // worker只會得到消息內容
    s_dump (worker);
 
    // worker不需要處理信封
    s_send (worker, "This is the reply");
 
    // 看看ROUTER里收到了什么
    s_dump (client);
 
    zmq_close (client);
    zmq_close (worker);
    zmq_term (context);
    return 0;
}
```

運行結果

```
----------------------------------------
[020] This is the workload
----------------------------------------
[001] A
[009] address 3
[009] address 2
[009] address 1
[000]
[017] This is the reply
```

關於以上代碼的幾點說明:

     * 在現實環境中,ROUTER和REP套接字處於不同的節點。本例沒有啟用多進程,為的是讓事件的發生順序更為清楚。

     * zmq_connect()並不是瞬間完成的,REP和ROUTER連接的時候是會花費一些時間的。在現實環境中,ROUTER無從得知REP是否已經連接成功了,除非得到REP的某些回應。本例中使用sleep(1)來處理這一問題,如果不這樣做,那REP將無法獲得消息(自己嘗試一下吧)。

     * 我們使用REP的套接字標識來進行路由,如果你不信,可以將消息發送給B,看看A能不能收到。

     * 本例中的s_dump()等函數來自於zhelpers.h文件,可以看到在進行套接字連接時代碼都是一樣的,所以我們才能在ØMQ API的基礎上搭建上層的API。等今后我們討論到復雜應用程序的時候再詳細說明。

要將消息路由給REP,我們需要創建它能辨別的信封:




### 請求-應答模式下的消息代理

     這一節我們將對如何使用ØMQ消息信封做一個回顧,並嘗試編寫一個通用的消息代理裝置。我們會建立一個隊列裝置來連接多個client和worker,裝置的路由算法可以由我們自己決定。這里我們選擇最近最少使用算法,因為這和負載均衡一樣比較實用。

     首先讓我們回顧一下經典的請求-應答模型,嘗試用它建立一個不斷增長的巨型服務網絡。最基本的請求-應答模型是:




     這個模型支持多個REP套接字,但如果我們想支持多個REQ套接字,就需要增加一個中間件,它通常是ROUTER和DEALER的結合體,簡單將兩個套接字之間的信息進行搬運,因此可以用現成的ZMQ_QUEUE裝置來實現:

```
+--------+ +--------+ +--------+
| Client | | Client | | Client |
+--------+ +--------+ +--------+
| REQ | | REQ | | REQ |
+---+----+ +---+----+ +---+----+
    | | |
    +-----------+-----------+
                |
            +---+----+
            |  ROUTER |
            +--------+
            | Device |
            +--------+
            |  DEALER  |       DEADER自帶的負載均衡算法
            +---+----+
                |
    +-----------+-----------+
    | | |
+---+----+ +---+----+ +---+----+
| REP | | REP | | REP |
+--------+ +--------+ +--------+
| Worker | | Worker | | Worker |
+--------+ +--------+ +--------+


Figure # - Stretched request-reply
```

     這種結構的關鍵在於,ROUTER會將消息來自哪個REQ記錄下來,生成一個信封。DEALER和REP套接字在傳輸消息的過程中不會丟棄或更改信封的內容,這樣當消息返回給ROUTER時,它就知道應該發送給哪個REQ了。這個模型中的REP套接字是匿名的,並沒有特定的地址,所以只能提供同一種服務。

     上述結構中,對REP的路由我們使用了DEADER自帶的負載均衡算法。但是,我們想用LRU算法來進行路由,這就要用到ROUTER-REP模式:


     這個ROUTER-ROUTER的LRU隊列不能簡單地在兩個套接字間搬運消息,以下代碼會比較復雜,不過在請求-應答模式中復用性很高。

**lruqueue.c**

```c
//
// 使用LRU算法的裝置
// client和worker處於不同的線程中
//
#include "zhelpers.h"
#include <pthread.h>
 
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
 
// 出隊操作,使用一個可存儲任何類型的數組實現
#define DEQUEUE(q) memmove (&(q)[0], &(q)[1], sizeof (q) - sizeof (q [0]))
 
// 使用REQ套接字實現基本的請求-應答模式
// 由於s_send()和s_recv()不能處理0MQ的二進制套接字標識,
// 所以這里會生成一個可打印的字符串標識。
//
static void *client_task (void *args)
{
    void *context = zmq_init (1);
    void *client = zmq_socket (context,  ZMQ_REQ);
    s_set_id (client); // 設置可打印的標識
    zmq_connect (client, "ipc:// frontend.ipc");
 
    // 發送請求並獲取應答信息
    s_send (client, "HELLO");
    char *reply =  s_recv (client);
    printf ("Client: %s\n", reply);
    free (reply);
    zmq_close (client);
    zmq_term (context);
    return NULL;
}
 
// worker使用REQ套接字實現LRU算法
//
static void *worker_task (void *args)
{
    void *context = zmq_init (1);
    void *worker = zmq_socket (context,  ZMQ_REQ);
    s_set_id (worker); // 設置可打印的標識
    zmq_connect (worker, "ipc:// backend.ipc");
 
    // 告訴代理worker已經准備好
    s_send (worker, "READY");
 
    while (1) {
        //  將消息中空幀之前的所有內容(信封)保存起來
        // 本例中空幀之前只有一幀,但可以有更多。
        char *address = s_recv (worker);      //1、獲取信封地址
        char *empty = s_recv (worker);         //2、獲取 空幀
        assert (*empty == 0);
        free (empty);
 
        // 獲取請求,並發送回應
        char *request = s_recv (worker);       //3、獲取 信息
        printf ("Worker: %s\n", request);
        free (request);
 
        //封裝消息
        s_sendmore (worker, address);      //地址信封
        s_sendmore (worker, "");              // 空幀
        s_send (worker, "OK");                //真實消息
        free (address);
    }
    zmq_close (worker);
    zmq_term (context);
    return NULL;
}
 
int main (void)
{
    // 准備0MQ上下文和套接字
    void *context = zmq_init (1);
    void * frontend = zmq_socket (context,  ZMQ_ROUTER);
    void * backend = zmq_socket (context,  ZMQ_ROUTER);
    zmq_bind (frontend, "ipc://frontend.ipc");
    zmq_bind (backend, "ipc://backend.ipc");
 
    int client_nbr;
    for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) {
        pthread_t client;
        pthread_create (&client, NULL, client_task, NULL);
    }
    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_task, NULL);
    }

    // LRU邏輯
    // - 一直從backend中獲取消息;當有超過一個worker空閑時才從frontend獲取消息。
    // - 當woker回應時,會將該worker標記為已准備好,並轉發woker的回應給client
    // - 如果client發送了請求,就將該請求轉發給下一個worker
 
    // 存放可用worker的隊列
    int available_workers = 0;
    char *worker_queue [10];
 
    while (1) {
         zmq_pollitem_t items [] = {
            { backend, 0, ZMQ_POLLIN, 0 },
            { frontend, 0, ZMQ_POLLIN, 0 }
        };
        zmq_poll (items, available_workers? 2: 1, -1);
 
        // 處理backend中worker的隊列
         if (items [0].revents & ZMQ_POLLIN) {
            // 將worker的地址入隊
            char *worker_addr = s_recv (backend);
            assert (available_workers < NBR_WORKERS);
            worker_queue [available_workers++] = worker_addr;
 
            // 跳過空幀
            char *empty = s_recv (backend);
            assert (empty [0] == 0);
            free (empty);
 
            // 第三幀是“READY”或是一個client的地址
            char *client_addr = s_recv (backend);
 
            // 如果是一個應答消息,則轉發給client
            if (strcmp (client_addr, "READY") != 0) {
                empty = s_recv (backend);
                assert (empty [0] == 0);
                free (empty);
                char *reply = s_recv (backend);
                s_sendmore (frontend, client_addr);
                s_sendmore (frontend, "");
                s_send (frontend, reply);      //真實發送
                free (reply);
                if (--client_nbr == 0)
                    break; // 處理N條消息后退出
            }
            free (client_addr);
        }

         if (items [1].revents & ZMQ_POLLIN) {
            // 獲取下一個client的請求,交給空閑的worker處理
             // client請求的消息格式是:[client地址][空幀][請求內容]
            char *client_addr = s_recv (frontend);      
            char *empty = s_recv (frontend);
            assert (empty [0] == 0);
            free (empty);
            char *request = s_recv (frontend);
 
            //下面會是一個消息
            s_sendmore (backend, worker_queue [0]);
            s_sendmore (backend, "");
            s_sendmore (backend, client_addr);
            s_sendmore (backend, "");
            s_send (backend, request);      //真實發送
 
            free (client_addr);
            free (request);
 
            // 將該worker的地址出隊
            free (worker_queue [0]);
            DEQUEUE (worker_queue);
            available_workers--;
        }
    }
    zmq_close (frontend);
    zmq_close (backend);
    zmq_term (context);
    return 0;
}
```

這段程序有兩個關鍵點:1、各個套接字是如何處理信封的;2、LRU算法。我們先來看信封的格式。

     我們知道REQ套接字在發送消息時會向頭部添加一個空幀,接收時又會自動移除。我們要做的就是在傳輸消息時滿足REQ的要求,處理好空幀。另外還要注意,ROUTER會在所有收到的消息前添加消息來源的地址

     現在我們就將完整的請求-應答流程走一遍,我們將client套接字的標識設為“CLIENT”,worker的設為“WORKER”。以下是client發送的消息:


代理從ROUTER中獲取到的消息格式如下:



     代理會從LRU隊列中獲取一個空閑woker的地址,作為信封附加在消息之上,傳送給ROUTER。注意要添加一個空幀(因為接受方: REQ(worker)收到消息時,會將信封和空幀移去



REQ(worker)收到消息時,會將信封和空幀移去




     可以看到,worker收到的消息和client端ROUTER收到的消息是一致的。worker需要將該消息中的信封保存起來,只對消息內容做操作

在返回的過程中:

     * worker通過REQ傳輸給device消息:       \[client地址\]\[空幀\]\[應答內容\];
     * device從worker端的ROUTER中獲取到:\[worker地址\]\[空幀\]\[client地址\]\[空幀\]\[應答內容\];
     * device將worker地址保存起來,並發送:\[client地址\]\[空幀\]\[應答內容\]給client端的ROUTER;
     * client從REQ中獲得到:       \[應答內容\]。

然后再看看 LRU算法,它要求client和worker都使用REQ套接字,並正確的存儲和返回消息信封,具體如下:

     * 創建一組poll,不斷地從backend(worker端的ROUTER)獲取消息;只有當有空閑的worker時才從frontend(client端的ROUTER)獲取消息;
 
     * 循環執行poll
 
     * 如果backend有消息,只有兩種情況:1)READY消息(該worker已准備好,等待分配);2)應答消息(需要轉發給client)。兩種情況下我們都會保存worker的地址,放入LRU隊列中,如果有應答內容,則轉發給相應的client。

     * 如果frontend有消息,我們從LRU隊列中取出下一個worker,將該請求發送給它。這就需要發送[worker地址][空幀][client地址][空幀][請求內容]到worker端的ROUTER。

     我們可以對該算法進行擴展,如在worker啟動時做一個自我測試,計算出自身的處理速度,並隨READY消息發送給代理,這樣代理在分配工作時就可以做相應的安排。

### ØMQ上層API的封裝

使用ØMQ提供的API操作多段消息時是很麻煩的,如以下代碼:

```c
while (1) 
{
    // 將消息中空幀之前的所有內容(信封)保存起來,
    // 本例中空幀之前只有一幀,但可以有更多。
    char *address = s_recv (worker);
    char *empty = s_recv (worker);
    assert (*empty == 0);
    free (empty);
 
    // 獲取請求,並發送回應
    char *request = s_recv (worker);
    printf ("Worker: %s\n", request);
    free (request);
    s_sendmore (worker, address);
    s_sendmore (worker, "");
    s_send (worker, "OK");
    free (address);
}
```

     這段代碼不滿足重用的需求,因為它只能處理一個幀的信封。事實上,以上代碼已經做了一些封裝了,如果調用ØMQ底層的API的話,代碼就會更加冗長:

```c
while (1)
 {
    // 將消息中空幀之前的所有內容(信封)保存起來,
    // 本例中空幀之前只有一幀,但可以有更多。
    zmq_msg_t address;
    zmq_msg_init (&address);
    zmq_recv (worker, &address, 0);
 
    zmq_msg_t empty;
    zmq_msg_init (&empty);
    zmq_recv (worker, &empty, 0);
 
    // 獲取請求,並發送回應
    zmq_msg_t payload;
    zmq_msg_init (&payload);
    zmq_recv (worker, &payload, 0);
 
    int char_nbr;
    printf ("Worker: ");
    for (char_nbr = 0; char_nbr < zmq_msg_size (&payload); char_nbr++)
        printf ("%c", *(char *) (zmq_msg_data (&payload) + char_nbr));
    printf ("\n");
 
    zmq_msg_init_size (&payload, 2);
    memcpy (zmq_msg_data (&payload), "OK", 2);
 
    zmq_send (worker, &address, ZMQ_SNDMORE);
    zmq_close (&address);
    zmq_send (worker, &empty, ZMQ_SNDMORE);
    zmq_close (&empty);
    zmq_send (worker, &payload, 0);
    zmq_close (&payload);
}
```

     我們理想中的API是可以一步接收和處理完整的消息,包括信封。ØMQ底層的API並不是為此而涉及的,但我們可以在它上層做進一步的封裝,這也是學習ØMQ的過程中很重要的內容。

     想要編寫這樣一個API還是很有難度的,因為我們要避免過於頻繁地復制數據。此外,ØMQ用“消息”來定義多段消息和多段消息中的一部分,同時,消息又可以是字符串消息或者二進制消息,這也給編寫API增加的難度。

     解決方法之一是使用新的命名方式:字符串(s_send()和s_recv()中已經在用了)、幀(消息的一部分)、消息(一個或多個幀)。以下是用新的API重寫的worker:

```c
while (1) {
    zmsg_t *zmsg = zmsg_recv (worker);
    zframe_print (zmsg_last (zmsg), "Worker: ");
    zframe_reset (zmsg_last (zmsg), "OK", 2);
    zmsg_send (&zmsg, worker);
}
```

     用4行代碼代替22行代碼是個不錯的選擇,而且更容易讀懂。我們可以用這種理念繼續編寫其他的API,希望可以實現以下功能:

     * 自動處理套接字。每次都要手動關閉套接字是很麻煩的事,手動定義過期時間也不是太有必要,所以,如果能在關閉上下文時自動關閉套接字就太好了。

     * 便捷的線程管理。基本上所有的ØMQ應用都會用到多線程,但POSIX的多線程接口用起來並不是太方便,所以也可以封裝一下。

     * 便捷的時鍾管理。想要獲取毫秒數、或是暫停運行幾毫秒都不太方便,我們的API應該提供這個接口。

     * 一個能夠替代zmq_poll()的反應器。poll循環很簡單,但比較笨拙,會造成重復代碼:計算時間、處理套接字中的信息等。若有一個簡單的反應器來處理套接字的讀寫以及時間的控制,將會很方便。

     * 恰當地處理Ctrl-C按鍵。我么已經看到如何處理中斷了,最好這一機制可以用到所有的程序里。

     我們可以用czmq來實現以上的需求。這個擴展很早就有了,提供了很多ØMQ的上層封裝,甚至是數據結構(哈希、鏈表等)。

以下是用czmq重寫的LRU代理:

**lruqueue2.c**

```c
//
// LRU消息隊列裝置,使用czmq庫實現
//
#include "czmq.h"
 
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // worker准備就緒的信息
 
// 使用REQ套接字實現基本的請求-應答模式
//
static void *client_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *client = zsocket_new (ctx,  ZMQ_REQ);
    zsocket_connect (client, "ipc://frontend.ipc");
 
    // 發送請求並接收應答
    while (1) {
        zstr_send (client, "HELLO");
        char *reply = zstr_recv (client);
        if (!reply)
            break;
        printf ("Client: %s\n", reply);
        free (reply);
        sleep (1);
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
// worker使用REQ套接字,實現LRU路由
//
static void *
worker_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx, ZMQ_REQ);
    zsocket_connect (worker, "ipc://backend.ipc");
 
    // 告知代理worker已准備就緒
    zframe_t *frame = zframe_new (LRU_READY, 1);
    zframe_send (&frame, worker, 0);
 
    // 接收消息並處理
    while (1) {
        zmsg_t *msg = zmsg_recv (worker);
        if (!msg)
            break; // 終止
        //zframe_print (zmsg_last (msg), "Worker: ");
        zframe_reset (zmsg_last (msg), "OK", 2);
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "ipc://frontend.ipc");
    zsocket_bind (backend, "ipc://backend.ipc");
 
    int client_nbr;
    for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
        zthread_new (ctx, client_task, NULL);
    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
        zthread_new (ctx, worker_task, NULL);
 
    // LRU邏輯
    // - 一直從backend中獲取消息;當有超過一個worker空閑時才從frontend獲取消息。
    // - 當woker回應時,會將該worker標記為已准備好,並轉發woker的回應給client
    // - 如果client發送了請求,就將該請求轉發給下一個worker
 
    // 存放可用worker的隊列
    zlist_t *workers = zlist_new ();
 
    while (1) {
        // 初始化poll
        zmq_pollitem_t items [] = {
            { backend, 0, ZMQ_POLLIN, 0 },
            { frontend, 0, ZMQ_POLLIN, 0 }
        };
        // 當有可用的worker時,從frontend獲取消息
        int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
        if (rc == -1)
            break; // 中斷
 
        // 對backend發來的消息進行處理
        if (items [0].revents & ZMQ_POLLIN) {
            // 使用worker的地址進行LRU路由
            zmsg_t *msg = zmsg_recv (backend);
            if (!msg)
                break; // 中斷
            zframe_t *address = zmsg_unwrap (msg);
            zlist_append (workers, address);
 
            // 如果不是READY消息,則轉發給client
            zframe_t *frame = zmsg_first (msg);
            if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
                zmsg_destroy (&msg);
            else
                zmsg_send (&msg, frontend);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            // 獲取client發來的請求,轉發給worker
            zmsg_t *msg = zmsg_recv (frontend);
            if (msg) {
                zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
                zmsg_send (&msg, backend);
            }
        }
    }
    // 如果完成了,則進行一些清理工作
    while (zlist_size (workers)) {
        zframe_t *frame = (zframe_t *) zlist_pop (workers);
        zframe_destroy (&frame);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return 0;
}
```

      czmq提供了一個簡單的中斷機制,當按下Ctrl-C時程序會終止ØMQ的運行,並返回-1,errno設置為EINTR。程序中斷時,czmq的recv方法會返回NULL,所以你可以用下面的代碼來作判斷:

```c
while (1) {
    zstr_send (client, "HELLO");
     char *reply = zstr_recv (client);
    if (!reply)
        break; // 中斷
    printf ("Client: %s\n", reply);
    free (reply);
    sleep (1);
}
```

如果使用zmq_poll()函數,則可以這樣判斷:

int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
if (rc == -1)
    break; // 中斷

上例中還是使用了原生的zmq_poll()方法,也可以使用czmq提供的zloop反應器來實現,zloop反應器可以做到:

     * 從任意套接字上獲取消息,也就是說只要套接字有消息就可以觸發函數;
     * 停止讀取套接字上的消息;
     * 設置一個時鍾,定時地讀取消息。

      zloop內部當然是使用zmq_poll()實現的,但它可以做到動態地增減套接字上的監聽器,重構poll池,並根據poll的超時時間來計算下一個時鍾觸發事件

使用這種反應器模式后,我們的代碼就更簡潔了:

zloop_t *reactor = zloop_new ();
zloop_reader (reactor, self->backend, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);

     對消息的實際處理放在了程序的其他部分,並不是所有人都會喜歡這種風格,但zloop的確是將定時器和套接字的行為融合在了一起。在以后的例子中,我們會用zmq_poll()來處理簡單的示例,使用zloop來處理復雜的。

下面我們用zloop來重寫LRU隊列裝置

**lruqueue3.c**

```c
//
// LRU隊列裝置,使用czmq及其反應器模式實現
//
#include "czmq.h"
 
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // woker已准備就緒的消息
 
// 使用REQ實現基本的請求-應答模式
//
static void *client_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *client = zsocket_new (ctx,  ZMQ_REQ);
    zsocket_connect (client, "ipc://frontend.ipc");
 
    // 發送請求並接收應答
    while (1) {
        zstr_send (client, "HELLO");
        char *reply = zstr_recv (client);
        if (!reply)
            break;
        printf ("Client: %s\n", reply);
        free (reply);
        sleep (1);
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
// worker使用REQ套接字來實現路由
//
static void *worker_task (void *arg_ptr)
{
    zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx,  ZMQ_REQ);
    zsocket_connect (worker, "ipc://backend.ipc");
 
    // 告訴代理worker已經准備就緒
    zframe_t *frame = zframe_new (LRU_READY, 1);
    zframe_send (&frame, worker, 0);
 
    // 獲取消息並處理
    while (1) {
        zmsg_t *msg = zmsg_recv (worker);
        if (!msg)
            break; // 中斷
        //zframe_print (zmsg_last (msg), "Worker: ");
        zframe_reset (zmsg_last (msg), "OK", 2);
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
// LRU隊列處理器結構,將要傳給反應器
typedef struct {
    void *frontend; // 監聽client
    void *backend; // 監聽worker
    zlist_t *workers; // 可用的worker列表
} lruqueue_t;
 
 
// 處理frontend端的消息
int s_handle_frontend (zloop_t *loop, void *socket, void *arg)
{
    lruqueue_t *self = (lruqueue_t *) arg;
    zmsg_t *msg = zmsg_recv (self->frontend);
    if (msg) {
        zmsg_wrap (msg, (zframe_t *) zlist_pop (self->workers));
        zmsg_send (&msg, self->backend);
 
        // 如果沒有可用的worker,則停止監聽frontend
        if (zlist_size (self->workers) == 0)
            zloop_cancel (loop, self->frontend);
    }
    return 0;
}
 
// 處理backend端的消息
int s_handle_backend (zloop_t *loop, void *socket, void *arg)
{
    // 使用worker的地址進行LRU路由
    lruqueue_t *self = (lruqueue_t *) arg;
    zmsg_t *msg = zmsg_recv (self->backend);
    if (msg) {
        zframe_t *address = zmsg_unwrap (msg);
        zlist_append (self->workers, address);
 
        // 當有可用worker時增加frontend端的監聽
        if (zlist_size (self->workers) == 1)
            zloop_reader (loop, self->frontend, s_handle_frontend, self);
 
        // 如果是worker發送來的應答,則轉發給client
        zframe_t *frame = zmsg_first (msg);
        if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
            zmsg_destroy (&msg);
        else
            zmsg_send (&msg, self->frontend);
    }
    return 0;
}
 
int main (void)
{
    zctx_t *ctx = zctx_new ();
    lruqueue_t *self = (lruqueue_t *) zmalloc (sizeof (lruqueue_t));
    self->frontend = zsocket_new (ctx, ZMQ_ROUTER);
    self->backend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (self->frontend, "ipc://frontend.ipc");
    zsocket_bind (self->backend, "ipc://backend.ipc");
 
    int client_nbr;
    for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
         zthread_new (ctx, client_task, NULL);
    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
         zthread_new (ctx, worker_task, NULL);
 
    // 可用worker的列表
    self->workers = zlist_new ();
 
     // 准備並啟動反應器
    zloop_t *reactor = zloop_new ();
    zloop_reader (reactor, self->backend, s_handle_backend, self);
    zloop_start (reactor);
    zloop_destroy (&reactor);
 
    // 結束之后的清理工作
    while (zlist_size (self->workers)) {
        zframe_t *frame = (zframe_t *) zlist_pop (self->workers);
        zframe_destroy (&frame);
    }
    zlist_destroy (&self->workers);
    zctx_destroy (&ctx);
    free (self);
    return 0;
}


     要正確處理Ctrl-C還是有點困難的,如果你使用zctx類,那它會自動進行處理,不過也需要代碼的配合。若zmq_poll()返回了-1,或者 recv方法(zstr_recv, zframe_recv, zmsg_recv)返回了NULL,就必須退出所有的循環。另外,在最外層循環中增加!zctx_interrupted的判斷也很有用。

### 異步C/S結構

     在之前的ROUTER-DEALER模型中,我們看到了client是如何異步地和多個worker進行通信的。我們可以將這個結構倒置過來,實現多個client異步地和單個server進行通信:


* client連接至server並發送請求;
* 每一次收到請求,server會發送0至N個應答;
* client可以同時發送多個請求而不需要等待應答;
* server可以同時發送多個應答二不需要新的請求。

**asyncsrd.c**

異步C/S模型(DEALER-ROUTER):
 
#include "czmq.h"
 
// ---------------------------------------------------------------------
// 這是client端任務,它會連接至server,每秒發送一次請求,同時收集和打印應答消息。
// 我們會運行多個client端任務,使用隨機的標識。

static void *client_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *client = zsocket_new (ctx, ZMQ_DEALER);
 
    // 設置隨機標識,方便跟蹤
    char identity [10];
    sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
    zsockopt_set_identity (client, identity);
    zsocket_connect (client, "tcp://localhost:5570");
 
     zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
    int request_nbr = 0;
    while (1) {
        // 從poll中獲取消息,每秒一次
        int centitick;
        for (centitick = 0; centitick < 100; centitick++) {
            zmq_poll (items, 1, 10 * ZMQ_POLL_MSEC);
             if (items [0].revents & ZMQ_POLLIN) 
             {
                zmsg_t *msg = zmsg_recv (client);
                zframe_print (zmsg_last (msg), identity);
                zmsg_destroy (&msg);
            }
        }
        zstr_sendf (client, "request #%d", ++request_nbr);
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
// ---------------------------------------------------------------------
// 這是server端任務,它使用多線程機制將請求分發給多個worker,並正確返回應答信息。
// 一個worker只能處理一次請求,但client可以同時發送多個請求。
 
static void server_worker (void *args, zctx_t *ctx, void *pipe);
 
void *server_task (void *args)
{
    zctx_t *ctx = zctx_new ();
 
    // frontend套接字使用TCP和client通信
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5570");
 
    // backend套接字使用inproc和worker通信
    void *backend = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_bind (backend, "inproc://backend");
 
    // 啟動一個worker線程池,數量任意
    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++)
    {
         zthread_fork (ctx, server_worker, NULL);
    }
 
    // 使用隊列裝置連接backend和frontend,我們本來可以這樣做:
    // zmq_device (ZMQ_QUEUE, frontend, backend);
    // 但這里我們會自己完成這個任務,這樣可以方便調試。
 
    // 在frontend和backend間搬運消息
    while (1) {
         zmq_pollitem_t items [] = {
            { frontend, 0, ZMQ_POLLIN, 0 },
            { backend, 0, ZMQ_POLLIN, 0 }
        };
        zmq_poll (items, 2, -1);
         if (items [0].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv (frontend);
            //puts ("Request from client:");
            //zmsg_dump (msg);
            zmsg_send (&msg, backend);
        }
         if (items [1].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv (backend);
            //puts ("Reply from worker:");
            //zmsg_dump (msg);
            zmsg_send (&msg, frontend);
        }
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
// 接收一個請求,隨機返回多條相同的文字,並在應答之間做隨機的延遲。
//
static void server_worker (void *args, zctx_t *ctx, void *pipe)
{
    void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (worker, "inproc://backend");
 
    while (1) {
        // DEALER套接字將信封和消息內容一起返回給我們
        zmsg_t *msg = zmsg_recv (worker);
        zframe_t *address = zmsg_pop (msg);
        zframe_t *content = zmsg_pop (msg);
        assert (content);
        zmsg_destroy (&msg);
 
        // 隨機返回0至4條應答
        int reply, replies = randof (5);
        for (reply = 0; reply < replies; reply++) {
            // 暫停一段時間
            zclock_sleep (randof (1000) + 1);
            zframe_send (&address, worker, ZFRAME_REUSE + ZFRAME_MORE);
            zframe_send (&content, worker, ZFRAME_REUSE);
        }
        zframe_destroy (&address);
        zframe_destroy (&content);
    }
}
 
 
// 主程序用來啟動多個client和一個server
//
int main (void)
{
    zctx_t *ctx = zctx_new ();
    zthread_new (ctx, client_task, NULL);
    zthread_new (ctx, client_task, NULL);
    zthread_new (ctx, client_task, NULL);
    zthread_new (ctx, server_task, NULL);
 
    // 運行5秒后退出
    zclock_sleep (5 * 1000);
    zctx_destroy (&ctx);
    return 0;
}
```

運行上面的代碼,可以看到三個客戶端有各自的隨機標識,每次請求會獲得零到多條回復。

     * client每秒會發送一次請求,並獲得零到多條應答。這要通過zmq_poll()來實現,但我們不能只每秒poll一次,這樣將不能及時處理應答。程 序中我們每秒取100次,這樣一來server端也可以以此作為一種心跳(heartbeat),用來檢測client是否還在線。

     * server使用了一個worker池,每一個worker同步處理一條請求。我們可以使用內置的隊列來搬運消息,但為了方便調試,在程序中我們自己實現了這一過程。你可以將注釋的幾行去掉,看看輸出結果。

這段代碼的整體架構如下圖所示:




     可以看到,client和server之間的連接我們使用的是DEALER-ROUTER,而server和worker的連接則用了DEALER- DEALER。如果worker是一個同步的線程,我們可以用REP。但是本例中worker需要能夠發送多個應答,所以就需要使用DEALER這樣的異 步套接字。這里我們不需要對應答進行路由,因為所有的worker都是連接到一個server上的。

     讓我們看看路由用的信封,client發送了一條信息,server獲取的信息中包含了client的地址,這樣一來我們有兩種可行的server-worker通信方案:

     * worker收到未經標識的信息。我們使用顯式聲明的標識,配合ROUTER套接字來連接worker和server。這種設計需要worker提前告知ROUTER它的存在,這種LRU算法正是我們之前所講述的。

     * worker收到含有標識的信息,並返回含有標識的應答。這就要求worker能夠處理好信封。

第二種涉及較為簡單:

```
     client server frontend worker
   [ DEALER ]<---->[ ROUTER <----> DEALER <----> DEALER ]
             1 part 2 parts 2 parts
```

     當我們需要在client和server之間維持一個對話時,就會碰到一個經典的問題:client是不固定的,如果給每個client都保存一些消 息,那系統資源很快就會耗盡。即使是和同一個client保持連接,因為使用的是瞬時的套接字(沒有顯式聲明標識),那每次連接也相當於是一個新的連接。

想要在異步的請求中保存好client的信息,有以下幾點需要注意:

     * client需要發送心跳給server。本例中client每秒都會發送一個請求給server,這就是一種很可靠的心跳機制。
     * 使用client的套接字標識來存儲信息,這對瞬時和持久的套接字都有效;
     * 檢測停止心跳的client,如兩秒內沒有收到某個client的心跳,就將保存的狀態丟棄。

### 實戰:跨代理路由

     讓我們把目前所學到的知識綜合起來,應用到實戰中去。我們的大客戶今天打來一個緊急電話,說是要構建一個大型的雲計算設施。它要求這個雲架構可以跨越多個數據中心,每個數據中心包含一組client和worker,且能共同協作。

     我們堅信實踐高於理論,所以就提議使用ZMQ搭建這樣一個系統。我們的客戶同意了,可能是因為他的確也想降低開發的成本,或是在推特上看到了太多ZMQ的好處。

#### 細節詳述

     喝完幾杯特濃咖啡,我們准備着手干了,但腦中有個理智的聲音提醒我們應該在事前將問題分析清楚,然后再開始思考解決方案。雲到底要做什么?我們如是問,客戶這樣回答:

     * worker在不同的硬件上運作,但可以處理所有類型的任務。每個集群都有成百個worker,再乘以集群的個數,因此數量眾多。

     * client向worker指派任務,每個任務都是獨立的,每個client都希望能找到對應的worker來處理任務,越快越好。client是不固定的,來去頻繁。

     * 真正的難點在於,這個架構需要能夠自如地添加和刪除集群,附帶着集群中的client和worker。

     * 如果集群中沒有可用的worker,它便會將任務分派給其他集群中可以用的worker。

     * client每次發送一個請求,並等待應答。如果X秒后他們沒有獲得應答,他們會重新發送請求。這一點我們不需要多做考慮,client端的API已經寫好了。

     * worker每次處理一個請求,他們的行為非常簡單。如果worker崩潰了,會有另外的腳本啟動他們。

聽了以上的回答,我們又進一步追問:

     * 集群之間會有一個更上層的網絡來連接他們對嗎?客戶說是的。

     * 我們需要處理多大的吞吐量?客戶說,每個集群約有一千個client,單個client每秒會發送10次請求。請求包含的內容很少,應答也很少,每個不超過1KB。

     我們進行了簡單的計算,2500個client x 10次/秒 x 1000字節 x 2雙向 = 50MB/秒,或400Mb/秒,這對1Gb網絡來說不成問題,可以使用TCP協議

     這樣需求就很清晰了,不需要額外的硬件或協議來完成這件事,只要提供一個高效的路由算法,設計得縝密一些。我們首先從一個集群(數據中心)開始,然后思考如何來連接他們。

#### 單個集群的架構

      worker和client是同步的,我們使用LRU算法來給worker分配任務。每個worker都是等價的,所以我們不需要考慮服務的問題。 worker是匿名的,client不會和某個特定的worker進行通信,因而我們不需要保證消息的送達以及失敗后的重試等。

     鑒於上文提過的原因,client和worker是不會直接通信的,這樣一來就無法動態地添加和刪除節點了。所以,我們的基礎模型會使用一個請求-應答模式中使用過的代理結構。




#### 多個集群的架構

下面我們將集群擴充到多個,每個集群有自己的一組client和worker,並使用代理相連接:




問題在於:我們如何讓一個集群的client和另一個集群的worker進行通信呢?有這樣幾種解決方案,我們來看看他們的優劣:

     * client直接和多個代理相連接。優點在於我們可以不對代理和worker做改動,但client會變得復雜,並需要知悉整個架構的情況。如果我們想要 添加第三或第四個集群,所有的client都會需要修改。我們相當於是將路由和容錯功能寫進client了,這並不是個好主意。

     * worker直接和多個代理相連接。可是REQ類型的worker不能做到這一點,它只能應答給某一個代理。如果改用REP套接字,這樣就不能使用LRU 算法的隊列代理了。這點肯定不行,在我們的結構中必須用LRU算法來管理worker。還有個方法是使用ROUTER套接字,讓我們暫且稱之為方案1。

     * 代理之間可以互相連接,這看上去不錯,因為不需要增加過多的額外連接。雖然我們不能隨意地添加代理,但這個問題可以暫不考慮。這種情況下,集群中的worker和client不必理會整體架構,當代理有剩余的工作能力時便會和其他代理通信。這是方案2。

我們首先看看方案1,worker同時和多個代理進行通信:


     這看上去很靈活,但卻沒有提供我們所需要的特性:client只有當集群中的worker不可用時才會去請求異地的worker。此外,worker的 “已就緒”信號會同時發送給兩個代理,這樣就有可能同時獲得兩份任務。這個方案的失敗還有一個原因:我們又將路由邏輯放在了邊緣地帶。

那來看看方案2,我們為各個代理建立連接,不修改worker和client:




     這種設計的優勢在於,我們只需要在一個地方解決問題就可以了,其他地方不需要修改。這就好像代理之間會秘密通信:伙計,我這兒有一些剩余的勞動力,如果你那兒忙不過來就跟我說,價錢好商量。

     事實上,我們只不過是需要設計一種更為復雜的路由算法罷了:代理成為了其他代理的分包商。這種設計還有其他一些好處:

* 在普通情況下(如只存在一個集群),這種設計的處理方式和原來沒有區別,當有多個集群時再進行其他動作。

* 對於不同的工作我們可以使用不同的消息流模式,如使用不同的網絡鏈接。

* 架構的擴充看起來也比較容易,如有必要我們還可以添加一個超級代理來完成調度工作。

     現在我們就開始編寫代碼。我們會將完整的集群寫入一個進程,這樣便於演示,而且稍作修改就能投入實際使用。這也是ZMQ的優美之處,你可以使用最小的開 發模塊來進行實驗,最后方便地遷移到實際工程中。線程變成進程,消息模式和邏輯不需要改變。我們每個“集群”進程都包含client線程、worker線 程、以及代理線程。

我們對基礎模型應該已經很熟悉了:

     * client線程使用REQ套接字,將請求發送給代理線程(ROUTER套接字);
     * worker線程使用REQ套接字,處理並應答從代理線程(ROUTER套接字)收到的請求;
     * 代理會使用LRU隊列和路由機制來管理請求。

#### 聯邦模式和同伴模式

     連接代理的方式有很多,我們需要斟酌一番。我們需要的功能是告訴其他代理“我這里還有空閑的worker”,然后開始接收並處理一些任務;我們還需要能 夠告訴其他代理“夠了夠了,我這邊的工作量也滿了”。這個過程不一定要十分完美,有時我們確實會接收超過承受能力的工作量,但仍能逐步地完成。

     最簡單的方式稱為聯邦,即代理充當其他代理的client和worker。我們可以將代理的前端套接字連接至其他代理的后端套接字,反之亦然。提示一下,ZMQ中是可以將一個套接字綁定到一個端點,同時又連接至另一個端點的。



     這種架構的邏輯會比較簡單:當代理沒有client時,它會告訴其他代理自己准備好了,並接收一個任務進行處理。但問題在於這種機制太簡單了,聯邦模式 下的代理一次只能處理一個請求。如果client和worker是嚴格同步的,那么代理中的其他空閑worker將分配不到任務。我們需要的代理應該具備 完全異步的特性。

     但是,聯邦模式對某些應用來說是非常好的,比如面向服務架構(SOA)。所以,先不要急着否定聯邦模式,它只是不適用於LRU算法和集群負載均衡而已。

     我們還有一種方式來連接代理:同伴模式。代理之間知道彼此的存在,並使用一個特殊的信道進行通信。我們逐步進行分析,假設有N個代理需要連接,每個代理則有N-1個同伴,所有代理都使用相同格式的消息進行通信。關於消息在代理之間的流通有兩點需要注意:

     * 每個代理需要告知所有同伴自己有多少空閑的worker,這是一則簡單的消息,只是一個不斷更新的數字,很顯然我們會使用PUB-SUB套接字。這樣一來,每個代理都會打開一個PUB套接字,不斷告知外界自身的信息;同時又會打開一個SUB套接字,獲取其他代理的信息。

     * 每個代理需要以某種方式將工作任務交給其他代理,並能獲取應答,這個過程需要是異步的。我們會使用ROUTER-ROUTER套接字來實現,沒有其他選擇。每個代理會使用兩個這樣的ROUTER套接字,一個用於接收任務,另一個用於分發任務。如果不使用兩個套接字,那就需要額外的邏輯來判別收到的是請求還是應答,這就需要在消息中加入更多的信息。

另外還需要考慮的是代理和本地client和worker之間的通信。

#### The Naming Ceremony

     代理中有三個消息流,每個消息流使用兩個套接字,因此一共需要使用六個套接字。為這些套接字取一組好名字很重要,這樣我們就不會在來回切換的時候找不着 北。套接字是有一定任務的,他們的所完成的工作可以是命名的一部分。這樣,當我們日后再重新閱讀這些代碼時,就不會顯得太過陌生了。

以下是我們使用的三個消息流:

  * 本地(local)的請求-應答消息流,實現代理和client、代理和worker之間的通信;
  * 雲端(cloud)的請求-應答消息流,實現代理和其同伴的通信;
  * 狀態(state)流,由代理和其同伴互相傳遞。

     能夠找到一些有意義的、且長度相同的名字,會讓我們的代碼對得比較整齊。可能他們並沒有太多關聯,但久了自然會習慣。

     每個消息流會有兩個套接字,我們之前一直稱為“前端(frontend)”和“后端(backend)”。這兩個名字我們已經使用很多次了:前端會負責接受信息或任務;后端會發送信息或任務給同伴。從概念上說,消息流都是從前往后的,應答則是從后往前。

因此,我們決定使用以下的命名方式:

  * localfe / localbe
  * cloudfe / cloudbe
  * statefe / statebe

     通信協議方面,我們全部使用ipc。使用這種協議的好處是,它能像tcp協議那樣作為一種脫機通信協議來工作,而又不需要使用IP地址或DNS服務。對 於ipc協議的端點,我們會命名為xxx-localfe/be、xxx-cloud、xxx-state,其中xxx代表集群的名稱。

     也許你會覺得這種命名方式太長了,還不如簡單的叫s1、s2、s3……事實上,你的大腦並不是機器,閱讀代碼的時候不能立刻反應出變量的含義。而用上面這種“三個消息流,兩個方向”的方式記憶,要比純粹記憶“六個不同的套接字”來的方便。

以下是代理程序的套接字分布圖:



請注意,我們會將cloudbe連接至其他代理的cloudfe,也會將statebe連接至其他代理的statefe。

#### 狀態流原型

     由於每個消息流都有其巧妙之處,所以我們不會直接把所有的代碼都寫出來,而是分段編寫和測試。當每個消息流都能正常工作了,我們再將其拼裝成一個完整的應用程序。我們首先從狀態流開始:



代碼如下:

**peering1: Prototype state flow in C**


代理同伴模擬(第一部分):
// 狀態流原型
//
#include "czmq.h"
 
int main (int argc, char *argv [])
{
    // 第一個參數是代理的名稱
    // 其他參數是各個同伴的名稱
    //
    if (argc < 2) {
        printf ("syntax: peering1 me {you}...\n");
        exit (EXIT_FAILURE);
    }
    char *self = argv [1];
    printf ("I: 正在准備代理程序 %s...\n", self);
    srandom ((unsigned) time (NULL));
 
    // 准備上下文和套接字
    zctx_t *ctx = zctx_new ();
    void * statebe = zsocket_new (ctx,  ZMQ_PUB);
    zsocket_bind (statebe, "ipc://%s-state.ipc", self);
 
    // 連接statefe套接字至所有同伴
    void * statefe = zsocket_new (ctx,  ZMQ_SUB);
    int argn;
    for (argn = 2; argn < argc; argn++) {
        char *peer = argv [argn];
        printf ("I: 正在連接至同伴代理 '%s' 的狀態流后端\n", peer);
        zsocket_connect (statefe, "ipc:// %s-state.ipc", peer);
    }
    // 發送並接受狀態消息
    // zmq_poll()函數使用的超時時間即心跳時間
    //
    while (1) {
        // 初始化poll對象列表
         zmq_pollitem_t items [] = {
            { statefe, 0, ZMQ_POLLIN, 0 }
         };

         // 輪詢套接字活動,超時時間為1秒
          int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC);
        if (rc == -1)
            break; // 中斷
 
        // 處理接收到的狀態消息
         if (items [0].revents & ZMQ_POLLIN) {
            char *peer_name = zstr_recv (statefe);
            char *available = zstr_recv (statefe);
            printf ("同伴代理 %s 有 %s 個worker空閑\n", peer_name, available);
            free (peer_name);
            free (available);
        }
        else {
             // 發送隨機數表示空閑的worker數
            zstr_sendm (statebe, self);
            zstr_sendf (statebe, "%d", randof (10));
        }
    }
    zctx_destroy (&ctx);
    return EXIT_SUCCESS;
}
```

幾點說明:

     * 每個代理都需要有各自的標識,用以生成相應的ipc端點名稱。真實環境中,代理需要使用TCP協議連接,這就需要一個更為完備的配置機制,我們會在以后的章節中談到。

     * 程序的核心是一個zmq_poll()循環,它會處理接收到消息,並發送自身的狀態。只有當zmq_poll()因無法獲得同伴消息而超時時我們才會發送自身狀態,如果我們每次收到消息都去發送自身狀態,那消息就會過量了。
     
     * 發送的狀態消息包含兩幀,第一幀是代理自身的地址,第二幀是空閑的worker數。我們必須要告知同伴代理自身的地址,這樣才能接收到請求,唯一的方法就是在消息中顯示注明。

     * 我們沒有在SUB套接字上設置標識,否則就會在連接到同伴代理時獲得過期的狀態信息。

     * 我們沒有在PUB套接字上設置閾值(HWM),因為訂閱者是瞬時的。我們也可以將閾值設置為1,但其實是沒有必要的。

讓我們編譯這段程序,用它模擬三個集群,DC1、DC2、DC3。我們在不同的窗口中運行以下命令:

```
peering1 DC1 DC2 DC3 # Start DC1 and connect to DC2 and DC3
peering1 DC2 DC1 DC3 # Start DC2 and connect to DC1 and DC3
peering1 DC3 DC1 DC2 # Start DC3 and connect to DC1 and DC2
```

     每個集群都會報告同伴代理的狀態,之后每隔一秒都會打印出自己的狀態。

     在現實編程中,我們不會通過定時的方式來發送自身狀態,而是在狀態發生改變時就發送。這看起來會很占用帶寬,但其實狀態消息的內容很少,而且集群間的連接是非常快速的。

     如果我們想要以較為精確的周期來發送狀態信息,可以新建一個線程,將statebe套接字打開,然后由主線程將不規則的狀態信息發送給子線程,再由子線程定時發布這些消息。不過這種機制就需要額外的編程了。

#### 本地流和雲端流原型

     下面讓我們建立本地流和雲端流的原型。這段代碼會從client獲取請求,並隨機地分派給集群內的worker或其他集群。


     在編寫代碼之前,讓我們先描繪一下核心的路由邏輯,整理出一份簡單而健壯的設計。

     我們需要兩個隊列,一個隊列用於存放從本地集群client收到的請求,另一個存放其他集群發送來的請求。一種方法是從本地和雲端的前端套接字中獲取消 息,分別存入兩個隊列。但是這么做似乎是沒有必要的,因為ZMQ套接字本身就是隊列。所以,我們直接使用ZMQ套接字提供的緩存來作為隊列使用。

     這項技術我們在LRU隊列裝置中使用過,且工作得很好。做法是,當代理下有空閑的worker或能接收請求的其他集群時,才從套接字中獲取請求。我們可以不斷地從后端獲取應答,然后路由回去。如果后端沒有任何響應,那也就沒有必要去接收前端的請求了。

所以,我們的主循環會做以下幾件事:

     * 輪詢后端套接字,會從worker處獲得“已就緒”的消息或是一個應答。如果是應答消息,則將其路由回集群client,或是其他集群。

     * worker應答后即可標記為可用,放入隊列並計數;

     * 如果有可用的worker,就獲取一個請求,該請求可能來自集群內的client,也可能是其他集群。隨后將請求轉發給集群內的worker,或是隨機轉發給其他集群。

這里我們只是隨機地將請求發送給其他集群,而不是在代理中模擬出一個worker,進行集群間的任務分發。這看起來挺愚蠢的,不過目前尚可使用。

我們使用代理的標識來進行代理之前的消息路由。每個代理都有自己的名字,是在命令行中指定的。只要這些指定的名字和ZMQ為client自動生成的UUID不重復,那么我們就可以知道應答是要返回給client,還是返回給另一個集群。

下面是代碼,有趣的部分已在程序中標注:

**peering2: Prototype local and cloud flow in C**

```c
//
// 代理同伴模擬(第二部分)
// 請求-應答消息流原型
//
// 示例程序使用了一個進程,這樣可以讓程序變得簡單,
// 每個線程都有自己的上下文對象,所以可以認為他們是多個進程。
//
#include "czmq.h"
 
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // 消息:worker已就緒
 
// 代理名稱;現實中,這個名稱應該由某種配置完成
static char *self;
 
// 請求-應答客戶端使用REQ套接字
//
static void *client_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *client = zsocket_new (ctx, ZMQ_REQ);
    zsocket_connect (client, "ipc://%s-localfe.ipc", self);
 
    while (1) {
        // 發送請求,接收應答
        zstr_send (client, "HELLO");
        char *reply = zstr_recv (client);
         if (!reply)
            break; // 中斷
        printf ("Client: %s\n", reply);
        free (reply);
        sleep (1);
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
// worker使用REQ套接字,並進行LRU路由
static void *worker_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx, ZMQ_REQ);
    zsocket_connect (worker, "ipc://%s-localbe.ipc", self);
 
    // 告知代理worker已就緒
    zframe_t *frame = zframe_new (LRU_READY, 1);
    zframe_send (&frame, worker, 0);
 
    // 處理消息
    while (1) {
        zmsg_t *msg = zmsg_recv (worker);
        if (!msg)
            break; // 中斷
 
        zframe_print (zmsg_last (msg), "Worker: ");
        zframe_reset (zmsg_last (msg), "OK", 2);
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
 
int main (int argc, char *argv [])
{
    // 第一個參數是代理的名稱
    // 其他參數是同伴代理的名稱
    //
    if (argc < 2) {
        printf ("syntax: peering2 me {you}...\n");
        exit (EXIT_FAILURE);
    }
    self = argv [1];
    printf ("I: 正在准備代理程序 %s...\n", self);
    srandom ((unsigned) time (NULL));
 
    // 准備上下文和套接字
    zctx_t *ctx = zctx_new ();
    char endpoint [256];
 
    // 將cloudfe綁定至端點
    void *cloudfe = zsocket_new (ctx, ZMQ_ROUTER);
    zsockopt_set_identity (cloudfe, self);
    zsocket_bind (cloudfe, "ipc://%s-cloud.ipc", self);
 
    // 將cloudbe連接至同伴代理的端點
    void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER);
    zsockopt_set_identity (cloudfe, self);
    int argn;
    for (argn = 2; argn < argc; argn++) {
        char *peer = argv [argn];
        printf ("I: 正在連接至同伴代理 '%s' 的cloudfe端點\n", peer);
        zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer);
    }
    // 准備本地前端和后端
    void *localfe = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (localfe, "ipc://%s-localfe.ipc", self);
    void *localbe = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (localbe, "ipc://%s-localbe.ipc", self);
 
    // 讓用戶告訴我們何時開始
    printf ("請確認所有代理已經啟動,按任意鍵繼續: ");
    getchar ();
 
    // 啟動本地worker
    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
        zthread_new (ctx, worker_task, NULL);
 
    // 啟動本地client
    int client_nbr;
    for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
        zthread_new (ctx, client_task, NULL);
 
    // 有趣的部分
    // -------------------------------------------------------------
    // 請求-應答消息流
    // - 若本地有可用worker,則輪詢獲取本地或雲端的請求;
    // - 將請求路由給本地worker或其他集群。
 
    // 可用worker隊列
    int capacity = 0;
    zlist_t *workers = zlist_new ();
 
    while (1) {
        zmq_pollitem_t backends [] = {
            { localbe, 0, ZMQ_POLLIN, 0 },
            { cloudbe, 0, ZMQ_POLLIN, 0 }
        };
        // 如果沒有可用worker,則繼續等待
        int rc = zmq_poll (backends, 2,
            capacity? 1000 * ZMQ_POLL_MSEC: -1);
        if (rc == -1)
            break; // 中斷
 
        // 處理本地worker的應答
        zmsg_t *msg = NULL;
        if (backends [0].revents & ZMQ_POLLIN) {
            msg = zmsg_recv (localbe);
            if (!msg)
                break; // 中斷
            zframe_t *address = zmsg_unwrap (msg);
            zlist_append (workers, address);
            capacity++;
 
            // 如果是“已就緒”的信號,則不再進行路由
            zframe_t *frame = zmsg_first (msg);
            if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
                zmsg_destroy (&msg);
        }
        // 處理來自同伴代理的應答
        else
        if (backends [1].revents & ZMQ_POLLIN) {
            msg = zmsg_recv (cloudbe);
            if (!msg)
                break; // 中斷
            // 我們不需要使用同伴代理的地址
            zframe_t *address = zmsg_unwrap (msg);
            zframe_destroy (&address);
        }
        // 如果應答消息中的地址是同伴代理的,則發送給它
        for (argn = 2; msg && argn < argc; argn++) {
            char *data = (char *) zframe_data (zmsg_first (msg));
            size_t size = zframe_size (zmsg_first (msg));
            if (size == strlen (argv [argn])
            && memcmp (data, argv [argn], size) == 0)
                zmsg_send (&msg, cloudfe);
        }
        // 將應答路由給本地client
        if (msg)
            zmsg_send (&msg, localfe);
 
        // 開始處理客戶端請求
        //
        while (capacity) {
            zmq_pollitem_t frontends [] = {
                { localfe, 0, ZMQ_POLLIN, 0 },
                { cloudfe, 0, ZMQ_POLLIN, 0 }
            };
            rc = zmq_poll (frontends, 2, 0);
            assert (rc >= 0);
            int reroutable = 0;
            // 優先處理同伴代理的請求,避免資源耗盡
            if (frontends [1].revents & ZMQ_POLLIN) {
                msg = zmsg_recv (cloudfe);
                reroutable = 0;
            }
            else
            if (frontends [0].revents & ZMQ_POLLIN) {
                msg = zmsg_recv (localfe);
                reroutable = 1;
            }
            else
                break; // 沒有請求
 
            // 將20%的請求發送給其他集群
            //
            if (reroutable && argc > 2 && randof (5) == 0) {
                // 隨地地路由給同伴代理
                int random_peer = randof (argc - 2) + 2;
                zmsg_pushmem (msg, argv [random_peer], strlen (argv [random_peer]));
                zmsg_send (&msg, cloudbe);
            }
            else {
                zframe_t *frame = (zframe_t *) zlist_pop (workers);
                zmsg_wrap (msg, frame);
                zmsg_send (&msg, localbe);
                capacity--;
            }
        }
    }
    // 程序結束后的清理工作
    while (zlist_size (workers)) {
        zframe_t *frame = (zframe_t *) zlist_pop (workers);
        zframe_destroy (&frame);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return EXIT_SUCCESS;
}
```

在兩個窗口中運行以上代碼:

```
peering2 me you
peering2 you me
```

幾點說明:

     * zmsg類庫讓程序變得簡單多了,這類程序顯然應該成為我們ZMQ程序員必備的工具;
由於我們沒有在程序中實現獲取同伴代理狀態的功能,所以先暫且認為他們都是有空閑worker的。現實中,我們不會將請求發送個一個不存在的同伴代理。

     * 你可以讓這段程序長時間地運行下去,看看會不會出現路由錯誤的消息,因為一旦錯誤,client就會阻塞。你可以試着將一個代理關閉,就能看到代理無法將請求路由給雲端中的其他代理,client逐個阻塞,程序也停止打印跟蹤信息。

#### 組裝

     讓我們將所有這些放到一段代碼里。和之前一樣,我們會在一個進程中完成所有工作。我們會將上文中的兩個示例程序結合起來,編寫出一個可以模擬任意多個集群的程序。

代碼共有270行,非常適合用來模擬一組完整的集群程序,包括client、worker、代理、以及雲端任務分發機制。

**peering3: Full cluster simulation in C**

```c
//
// 同伴代理模擬(第三部分)
// 狀態和任務消息流原型
//
// 示例程序使用了一個進程,這樣可以讓程序變得簡單,
// 每個線程都有自己的上下文對象,所以可以認為他們是多個進程。
//
#include "czmq.h"
 
#define NBR_CLIENTS 10
#define NBR_WORKERS 5
#define LRU_READY "\001" // 消息:worker已就緒
 
// 代理名稱;現實中,這個名稱應該由某種配置完成
static char *self;
 
// 請求-應答客戶端使用REQ套接字
// 為模擬壓力測試,客戶端會一次性發送大量請求
static void *client_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *client = zsocket_new (ctx,  ZMQ_REQ);
    zsocket_connect (client, "ipc://%s-localfe.ipc", self);
    void *monitor = zsocket_new (ctx,  ZMQ_PUSH);
    zsocket_connect (monitor, "ipc://%s-monitor.ipc", self);
 
    while (1) {
        sleep (randof (5));
        int burst = randof (15);
        while (burst--) {
            char task_id [5];
            sprintf (task_id, "%04X", randof (0x10000));
 
            // 使用隨機的十六進制ID來標注任務
            zstr_send (client, task_id);
 
            // 最多等待10秒
             zmq_pollitem_t pollset [1] = { { client, 0, ZMQ_POLLIN, 0 } };
                 int rc = zmq_poll (pollset, 1, 10 * 1000 * ZMQ_POLL_MSEC);
            if (rc == -1)
                break; // 中斷
 
             if (pollset [0].revents & ZMQ_POLLIN) {
                char *reply = zstr_recv (client);
                if (!reply)
                    break; // 中斷
                // worker的應答中應包含任務ID
                puts (reply);
                assert (streq (reply, task_id));
                free (reply);
            }
            else {
                zstr_sendf (monitor,  "E: 客戶端退出,丟失的任務為: %s", task_id);
                return NULL;
            }
        }
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
// worker使用REQ套接字,並進行LRU路由
static void *worker_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx, ZMQ_REQ);
    zsocket_connect (worker, "ipc://%s-localbe.ipc", self);
 
    // 告知代理worker已就緒
    zframe_t *frame = zframe_new (LRU_READY, 1);
    zframe_send (&frame, worker, 0);
 
    while (1) {
        // worker會隨機延遲幾秒
        zmsg_t *msg = zmsg_recv (worker);
        sleep (randof (2));
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
int main (int argc, char *argv [])
{
    // 第一個參數是代理的名稱
    // 其他參數是同伴代理的名稱
    if (argc < 2) {
        printf ("syntax: peering3 me {you}...\n");
        exit (EXIT_FAILURE);
    }
    self = argv [1];
    printf ("I: 正在准備代理程序 %s...\n", self);
    srandom ((unsigned) time (NULL));
 
    // 准備上下文和套接字
    zctx_t *ctx = zctx_new ();
    char endpoint [256];
 
    // 將cloudfe綁定至端點
    void *cloudfe = zsocket_new (ctx,  ZMQ_ROUTER);
    zsockopt_set_identity (cloudfe, self);
    zsocket_bind (cloudfe, "ipc://%s-cloud.ipc", self);
 
    // 將statebe綁定至端點
    void *statebe = zsocket_new (ctx,  ZMQ_PUB);
    zsocket_bind (statebe, "ipc://%s-state.ipc", self);
 
     // 將cloudbe連接至同伴代理的端點
    void *cloudbe = zsocket_new (ctx,  ZMQ_ROUTER);
    zsockopt_set_identity (cloudbe, self);
    int argn;
    for (argn = 2; argn < argc; argn++) {
        char *peer = argv [argn];
        printf ("I: 正在連接至同伴代理 '%s' 的cloudfe端點\n", peer);
        zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer);
    }
 
     // 將statefe連接至同伴代理的端點
    void *statefe = zsocket_new (ctx,  ZMQ_SUB);
    for (argn = 2; argn < argc; argn++) {
        char *peer = argv [argn];
        printf ("I: 正在連接至同伴代理 '%s' 的statebe端點\n", peer);
        zsocket_connect (statefe, "ipc://%s-state.ipc", peer);
    }
    // 准備本地前端和后端
    void *localfe = zsocket_new (ctx,  ZMQ_ROUTER);
    zsocket_bind (localfe, "ipc://%s-localfe.ipc", self);
 
    void *localbe = zsocket_new (ctx,  ZMQ_ROUTER);
    zsocket_bind (localbe, "ipc://%s-localbe.ipc", self);
 
    // 准備監控套接字
    void *monitor = zsocket_new (ctx,  ZMQ_PULL);
    zsocket_bind (monitor, "ipc://%s-monitor.ipc", self);
 
    // 啟動本地worker
    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
         zthread_new (ctx,  worker_task, NULL);
 
    // 啟動本地client
    int client_nbr;
    for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
         zthread_new (ctx,  client_task, NULL);
 
    // 有趣的部分
    // -------------------------------------------------------------
    // 發布-訂閱消息流
    // - 輪詢同伴代理的狀態信息;
    // - 當自身狀態改變時,對外廣播消息。
    // 請求-應答消息流
    // - 若本地有可用worker,則輪詢獲取本地或雲端的請求;
    // - 將請求路由給本地worker或其他集群。
 
    // 可用worker隊列
    int local_capacity = 0;
    int cloud_capacity = 0;
    zlist_t *workers = zlist_new ();
 
    while (1) {
         zmq_pollitem_t primary [] = {
            { localbe, 0, ZMQ_POLLIN, 0 },
            { cloudbe, 0, ZMQ_POLLIN, 0 },
            { statefe, 0, ZMQ_POLLIN, 0 },
            { monitor, 0, ZMQ_POLLIN, 0 }
          };
        // 如果沒有可用的worker,則一直等待
        int rc = zmq_poll (primary, 4,
            local_capacity? 1000 * ZMQ_POLL_MSEC: -1);
        if (rc == -1)
            break; // 中斷
 
        // 跟蹤自身狀態信息是否改變
        int previous = local_capacity;
 
        // 處理本地worker的應答
        zmsg_t *msg = NULL;
 
         if (primary [0].revents & ZMQ_POLLIN) {
            msg = zmsg_recv (localbe);
            if (!msg)
                break; // 中斷
            zframe_t *address = zmsg_unwrap (msg);
            zlist_append (workers, address);
            local_capacity++;
 
            // 如果是“已就緒”的信號,則不再進行路由
            zframe_t *frame = zmsg_first (msg);
            if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
                zmsg_destroy (&msg);
        }
        // 處理來自同伴代理的應答
        else
         if (primary [1].revents & ZMQ_POLLIN) {
            msg = zmsg_recv (cloudbe);
            if (!msg)
                break; // Interrupted
            // 我們不需要使用同伴代理的地址
            zframe_t *address = zmsg_unwrap (msg);
            zframe_destroy (&address);
        }
        // 如果應答消息中的地址是同伴代理的,則發送給它
         for (argn = 2; msg && argn < argc; argn++) {
            char *data = (char *) zframe_data (zmsg_first (msg));
            size_t size = zframe_size (zmsg_first (msg));
            if (size == strlen (argv [argn])
            && memcmp (data, argv [argn], size) == 0)
                zmsg_send (&msg, cloudfe);
        }
        // 將應答路由給本地client
        if (msg)
            zmsg_send (&msg, localfe);
 
        // 處理同伴代理的狀態更新
        if (primary [2].revents & ZMQ_POLLIN) {
            char *status = zstr_recv (statefe);
            cloud_capacity = atoi (status);
            free (status);
        }
        // 處理監控消息
        if (primary [3].revents & ZMQ_POLLIN) {
            char *status = zstr_recv (monitor);
            printf ("%s\n", status);
            free (status);
        }
 
        // 開始處理客戶端請求
        // - 如果本地有空閑worker,則總本地client和雲端接收請求;
        // - 如果我們只有空閑的同伴代理,則只輪詢本地client的請求;
        // - 將請求路由給本地worker,或者同伴代理。
        while (local_capacity + cloud_capacity) {
            zmq_pollitem_t secondary [] = {
                { localfe, 0, ZMQ_POLLIN, 0 },
                { cloudfe, 0, ZMQ_POLLIN, 0 }
            };
            if (local_capacity)
                rc = zmq_poll (secondary, 2, 0);
            else
                rc = zmq_poll (secondary, 1, 0);
            assert (rc >= 0);
 
            if (secondary [0].revents & ZMQ_POLLIN)
                msg = zmsg_recv (localfe);
            else
            if (secondary [1].revents & ZMQ_POLLIN)
                msg = zmsg_recv (cloudfe);
            else
                break; // 沒有任務
 
            if (local_capacity) {
                zframe_t *frame = (zframe_t *) zlist_pop (workers);
                zmsg_wrap (msg, frame);
                zmsg_send (&msg, localbe);
                local_capacity--;
            }
            else {
                // 隨機路由給同伴代理
                int random_peer = randof (argc - 2) + 2;
                zmsg_pushmem (msg, argv [random_peer], strlen (argv [random_peer]));
                zmsg_send (&msg, cloudbe);
            }
        }
        if (local_capacity != previous) {
            // 將自身代理的地址附加到消息中
            zstr_sendm (statebe, self);
            // 廣播新的狀態信息
            zstr_sendf (statebe, "%d", local_capacity);
        }
    }
    // 程序結束后的清理工作
    while (zlist_size (workers)) {
        zframe_t *frame = (zframe_t *) zlist_pop (workers);
        zframe_destroy (&frame);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return EXIT_SUCCESS;
}
```

這段代碼並不長,但花費了大約一天的時間去調通。以下是一些說明:

* client線程會檢測並報告失敗的請求,它們會輪詢代理套接字,查看是否有應答,超時時間為10秒。

* client線程不會自己打印信息,而是將消息PUSH給一個監控線程,由它打印消息。這是我們第一次使用ZMQ進行監控和記錄日志,我們以后會見得更多。

* clinet會模擬多種負載情況,讓集群在不同的壓力下工作,因此請求可能會在本地處理,也有可能會發送至雲端。集群中的client和worker數量、其他集群的數量,以及延遲時間,會左右這個結果。你可以設置不同的參數來測試它們。

* 主循環中有兩組輪詢集合,事實上我們可以使用三個:信息流、后端、前端。因為在前面的例子中,如果后端沒有空閑的worker,就沒有必要輪詢前端請求了。

以下是幾個在編寫過程中遇到的問題:

* 如果請求或應答在某處丟失,client會因此阻塞。回憶以下,ROUTER-ROUTER套接字會在消息如法路由的情況下直接丟棄。這里的一個策略就是 改變client線程,檢測並報告這種錯誤。此外,我還在每次recv()之后以及send()之前使用zmsg_dump()來打印套接字內容,用來更 快地定位消息。

* 主循環會錯誤地從多個已就緒的套接字中獲取消息,造成第一條消息的丟失。解決方法是只從第一個已就緒的套接字中獲取消息。

* zmsg類庫沒有很好地將UUID編碼為C語言字符串,導致包含字節0的UUID會崩潰。解決方法是將UUID轉換成可打印的十六進制字符串。

這段模擬程序沒有檢測同伴代理是否存在。如果你開啟了 某個代理,它已向其他代理發送過狀態信息,然后關閉了,那其他代理仍會向它發送請求。這樣一來,其他代理的client就會報告很多錯誤。解決時有兩點: 一、為狀態信息設置有效期,當同伴代理消失一段時間后就不再發送請求;二、提高請求-應答的可靠性,這在下一章中會講到。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM