ZeroMQ 教程 002 : 高級技巧


本文主要譯自 zguide - chapter two. 但並不是照本翻譯.

上一章我們簡單的介紹了一個ZMQ, 並給出了三個套路的例子: 請求-回應, 訂閱-發布, 流水線(分治). 這一章, 我們將深入的探索一下ZMQ中的socket, 以及"套路"

socket API

如果熟悉linux socket編程的同學閱讀完了第一章, 一定有一種說不上來的別扭感覺.因為通常情況下, 當我們討論socket的時候, 我們一般指的是操作系統提供的網絡編程接口里的那個socket概念. 而在ZMQ中, 只是借用了這個概念的名字, 在ZMQ中, 我們討論到socket的時候, 一般指代的是調用zmq_socket()接口返回的那個socket, 具體一點: zmq socket.

zmq socket比起linux socket來說, 邏輯理解起來比較類似, 雖然兩者內部完全就不是同一種東西.

  1. socket需要被創建, 以及關閉. zmq_socket(), zmq_close()
  2. socket有配置項. zmq_setsockopt(), zmq_getsockopt()
  3. socket有綁定和連接兩種操作. zmq_bind(), zmq_connect()
  4. 收發socket上的數據. zmq_msg_send(), zmq_msg_recv(), zmq_send(), zmq_recv()

但與linux socket不同的是, zmq socket沒有listen這個邏輯概念.

需要注意的是, zmq socket是void指針, 而消息則是結構實例. 這就意味着, 在C語言的API中, 需要zmq socket的地方, 傳遞的一定是值, 而需要傳遞消息的時候, 比如使用zmq_msg_send()zmq_msg_recv()這樣的接口, 消息參數則傳遞其地址. 其設計哲學是: 在zmq中, socket不歸程序員掌控, 所以你可能拿到一個句柄(地址), 但不能看到它長什么樣(不能看到socket實例), 但消息是程序員創建的, 是受程序員掌控的.

將socket接入網絡拓撲中

在兩個結點上用ZMQ實現通訊, 你需要分別為兩個結點創建socket, 並在其中一個結點上調用zmq_bind(), 在另一個結點上創建對應的zmq_connect(). 在ZMQ中, 請不要再以死板的"客戶端", "服務端"來區分網絡結點. 而要這樣理解: zmq_bind()調用應該發生在網絡拓撲中那些不易變的結點上, 而zmq_connect()應該發生在網絡拓撲中那些易變的結點上.

ZMQ建立起的數據連接和常見的TCP連接有一些不同, 但也有一些共通之處, 如下:

  1. TCP是TCP/IP協議棧的四層協議, 當建立一個TCP連接的時候, 雙方都必須使用TCP/IP協議棧. 從這個角度看, ZMQ是四層之上的4.5層, ZMQ下面統一了很多連接協議, 對於TCP/IP協議棧來說, ZMQ下面有TCP, 除了TCP/IP, ZMQ還能通過共享內存在線程間建立連接, 在進程間建立連接(具體連接手段對上層是透明的), 使用TCP/IP協議棧的PGM協議(建立在IP層上的一種多播協議)建立連接.
  2. 在linux socket中, 一個連接就是一個socket, 但在ZMQ中, 一個socket上可以承載多個數據連接. 這里socket和connection不再是同個層次上的等價詞匯, 要把socket理解為程序員訪問數據連接的一個入口, 一個大門, 門推開, 可能有多個連接, 而不止一個. 有多個數據流等待吞吐.
  3. 上面說了, 用ZMQ在結點間建立連接, 程序員操作ZMQ相關API的時候, 實際上位於的是類似於TCP/IP里的第4.5層, 反過來看, 即具體的連接是如何建立, 如何保持, 如何維護的, 這ZMQ庫的工作, 不應該由使用ZMQ庫的人去關心. 也就是說, 自從使用了ZMQ庫, 你再也不需要關心TCP是如何握手了. 並且, 對於合適的協議, 對端結點上線下線時, ZMQ庫將負責優雅的處理接連中斷, 重試連接等臟活.
  4. 再次重申, 如何連接, 是ZMQ庫的工作, 你不應該插手. 你只需要關心數據, 套路, 拓撲.

在請求-回應套路中, 我們把比較不易變的邏輯結點稱為服務端, 把易變, 也就是會經常性的退出, 或重新加入網絡拓撲的結點稱為客戶端. 服務端向外提供服務, 必須提供一個"地址"供客戶端去上門, 換句話說, 在這個套路拓撲中, 那些經常來來去去的客戶端應該知道去哪找服務端. 但反過來, 服務端完全不關心去哪找客戶端, 你愛來不來, 不來就滾, 不要打擾我飛升. 對於不易變的結點, 應該使用zmq_bind()函數, 對於易變的結點, 應該采用zmq_connect

在傳統的linux socket編程中, 如果服務端還沒有上線工作, 這個時候去啟動客戶端程序, 客戶端程序的connect()調用會返回錯誤. 但在ZMQ中, 它妥善處理了這種情況. 客戶端調用zmq_connect(), 不會報錯, 僅會導致消息被阻塞而發不出去.

不要小看這一點設計, 它反映出ZMQ的設計思想: 在請求-應答套路中, 它不光允許客戶端可以隨時退出, 再回來. 甚至允許服務端去上個廁所.

另外, 一個服務端可以多次調用zmq_bind()以將自己關聯到多個endpoint上.(所謂的endpoint, 就是通訊協議+通訊地址的組合, 它一般情況下指代了在這種通訊協議中的一個網絡結點, 但這個結點可以是邏輯性的, 不一定只是一台機器).這就意味着, zmq socket可以同時接受來自多個不同通訊協議的多簇請求消息.

zmq_bind(socket, "tcp://*:5555");
zmq_bind(socket, "tcp://*:999");
zmq_bind(socket, "inproc://suprise_motherfucker");

但是, 對於同一種通訊協議里的同一個endpoint, 你只能對其執行一次zmq_bind()操作. 這里有個例外, 就是ipc進程間通信. 邏輯上允許另外一個進程去使用之前一個進程已經使用過的ipc endpoint, 但不要濫用這特性: 這只是ZMQ提供給程序崩潰后恢復現場的一種手段, 在正常的代碼邏輯中, 不要做這樣的事情.

所以看到這里你大概能理解zmq對bind和connect這兩個概念的態度: ZMQ努力的將這兩個概念之間的差異抹平, 但很遺憾, zmq並沒有將這兩個操作抽象成一個類似於touch的操作. 但還是請謹記, 在你的網絡拓撲中, 讓不易變結點去使用zmq_bind(), 讓易變結點去使用zmq_connect

zmq socket是分類型的, 不同類型的socket提供了差異化的服務, socket的類型與結點在拓撲中的角色有關, 也影響着消息的出入, 以及緩存策略. 不同類型的socket之間, 有些可以互相連接, 但有些並不能, 這些規則, 以及如何在套路中為各個結點安排合適類型的socket, 都是后續我們將要講到的內容.

如果從網絡通訊的角度來講, zmq是一個將傳統傳輸層封裝起來的網絡庫. 但從數據傳輸, 消息傳輸, 以及消息緩存這個角度來講, zmq似乎又稱得上是一個消息隊列庫. 總之, zmq是一個優秀的庫, 優秀不是指它的實現, 它的性能, 而是它能解決的問題, 它的設計思路.

收發消息

在第一章里, 我們接觸到了兩個有關消息收發的函數, zmq_send()zmq_recv(), 現在, 我們需要把術語規范一下.

zmq_send()zmq_recv()是用來傳輸"數據"的接口. 而"消息"這個術語, 在zmq中有指定含義, 傳遞消息的接口是zmq_msg_send()zmq_msg_recv()

當我們說起"數據"的時候, 我們指的是二進制串. 當我們說"消息"的時候, 指提是zmq中的一種特定結構體.

需要額外注意的是, 無論是調用zmq_send()還是zmq_msg_send(), 當調用返回時, 消息並沒有真正被發送出去, 更沒有被對方收到. 調用返回只代表zmq將你要發送的"消息"或"數據"放進了一個叫"發送緩沖區"的地方. 這是zmq實現收發異步且帶緩沖隊列的一個設計.

單播傳輸

ZMQ底層封裝了三種單播通訊協議, 分別是: 共享內存實現的線程間通訊(inproc), 進程間通信(ipc), 以及TCP/IP協議棧里的TCP協議(tcp). 另外ZMQ底層還封裝了兩種廣播協議: PGM, EPGM. 多播我們在非常后面的章節才會介紹到, 在你了解它之前, 請不要使用多播協議, 即便你是在做一些類似於發布-訂閱套路的東西.

對於多數場景來說, 底層協議選用tcp都是沒什么問題的. 需要注意的是, zmq中的tcp, 被稱為 "無連接的tcp協議", 而之所以起這么一個精神分裂的名字, 是因為zmq允許在對端不存在的情況下, 結點去zmq_connect(). 你大致可以想象zmq做了多少額外工作, 但這些對於你來說, 對於上層應用程序來說, 是透明了, 你不必去關心具體實現.

IPC通訊類似於tcp, 也是"無連接"的, 目前, 這種方式不能在windows上使用, 很遺憾. 並且, 按照慣例, 在使用ipc作為通訊方式時, 我們一般給endpoint加上一個.ipc的后綴. 另外, 在Unix操作系統上, 使用ipc連接還請格外的注意不同進程的權限問題, 特別是從屬於兩個不同用戶的進程.

最后來說一下inproc, 也就是線程間通信, 它只能用於同一進程內的不同線程通訊. 比起tcp和ipc, 這種通訊方式快的飛起. 它與tcp和ipc最大的區別是: 在有客戶端調用connect之前, 必須確保已經有一個服務端在對應的endpoint上調用了bind, 這個缺陷可能會在未來的某個版本被修正, 但就目前來講, 請務必小心注意.

ZMQ對底層封裝的通訊協議是有侵入性的

很遺憾的是, ZMQ對於其底層封裝的網絡協議是有侵入性的, 換句話說, 你沒法使用ZMQ去實現一個HTTP服務器. HTTP作為一個五層協議, 使用TCP作為傳輸層協議, 對TCP里的報文格式是有規約限制的, 而ZMQ作為一個封裝了TCP的4.5層協議, 其在數據交互時, 已經侵入了TCP的報文格式. 你無法讓TCP里的報文既滿足HTTP的格式要求, 還滿足ZMQ的格式要求.

關心ZMQ到底是如何侵入它封裝的通訊協議的, 這個在第三章, 當我們接觸到ZMQ_ROUTER_RAW這種socket配置項的時候才會深入討論, 目前你只需要明白, ZMQ對其底層封裝的通訊協議有侵入.

這意味着, 你無法無損的將ZMQ引入到一些現成的項目中. 這很遺憾.

I/O線程

我們先前提到過, ZMQ在后台使用獨立的線程來實現異步I/O處理. 一般情況下吧, 一個I/O線程就應該足以處理當前進程的所有socket的I/O作業, 但是這個凡事總有個極限情況, 所以總會存在一些很荀的場景, 你需要多開幾個I/O線程.

當你創建一個context的時候, ZMQ就在背后創建了一個I/O處理線程. 如果這么一個I/O線程不能滿足你的需求, 那么就需要在創建context的時候加一些料, 讓ZMQ多創建幾個I/O處理線程. 一般有一個簡單估算I/O線程數量的方法: 每秒你的程序有幾個G字節的吞吐量, 你就開幾個I/O線程.

下面是自定義I/O線程數量的方法:

int io_threads = 4;
void * context = zmq_ctx_new();
zmq_ctx_set(context, ZMQ_IO_THREADS, io_threads);
assert(zmq_ctx_get(context, ZMQ_IO_THREADS) == io_threads);

回想一下你用linux socket + epoll編寫服務端應用程序的套路, 一般都是一個tcp連接專門開一個線程. ZMQ不一樣, ZMQ允許你在一個進程里持有上千個連接(不一定是TCP哦), 但處理這上千個連接的I/O作業, 可能只有一個, 或者幾個線程而已, 並且事實也證明這樣做是可行的. 可能你的進程里只有十幾個線程, 但就是能處理超過上千個連接.

當你的程序只使用inproc作為通訊手段的時候, 其實是不需要線程來處理異步I/O的, 因為inproc是通過共享內存實現通訊的. 這個時候你可以手動設置I/O線程的數量為0. 這是一個小小的優化手段, 嗯, 對性能的提升基本為0.

套路, 套路, 套路

ZMQ的設計是親套路的, ZMQ的核心其實在於路由與緩存, 這也是為什么作為一個網絡庫, 它更多的被人從消息隊列這個角度了解到的原因. 要用ZMQ實現套路, 關鍵在於使用正確的socket類型, 然后把拓撲中的socket組裝配對起來. 所以, 要懂套路, 就需要懂zmq里的socket類型.

zmq提供了你構建如下套路的手段:

  1. 請求-應答套路. 多對多的客戶端-服務端模型. 用於遠程調用以及任務分發場景.
  2. 發布-訂閱套路. 多對多的喇叭-村民模型. 用於數據分發場景.
  3. 流水線套路. 用於並行作業處理場景.
  4. 一夫一妻套路. 一對一的連接模型. 這一般用於在進程中兩個線程進行通訊時使用.

我們在第一章中已經大致接觸了套路, 除了一夫一妻沒有接觸到, 這章稍后些部分我們也將接觸這種套路.要了解具體socket的各個類型都是干嘛用的, 可以去閱讀zmq_socket()的manpage, 我建議你去閱讀, 並且仔細閱讀, 反復閱讀.下面列出的是可以互相組合的socket類型. 雙方可以替換bindconnect操作.

  1. PUB SUB. 經典的發布-訂閱套路
  2. REQ REP. 經典的請求-應答套路
  3. REQ ROUTER (注意, REQ發出的數據中, 以一個空幀來區分消息頭與消息體)
  4. DEALER REP(注意, REP假定收到的數據中, 有一個空幀用以區分消息頭與消息體)
  5. DEALER ROUTER
  6. DEALER DEALER
  7. ROUTER ROUTER
  8. PUSH PULL. 經典的流水線套路.
  9. PAIR PAIR. 一夫一妻套路

后續你還會看到有XPUB與XSUB兩種類型的socket. 就目前來說, 只有上面的socket配對連接是有效的, 其它沒列出的組合的行為是未定義的, 但就目前的版本來說, 錯誤的組合socket類型並不會導致連接時出錯, 甚至可能會碰巧按你的預期運行, 但強烈不建議你這個瞎jb搞. 在未來的版本中, 組合非法的socket類型可能會導致API調用出錯.

消息, 消息, 消息

libzmq有兩套收發消息的API接口, 這個之前我們已經講過. 並且在第一章里建議你多使用zmq_send()zmq_recv(), 建議你規避zmq_msg_send()zmq_msg_recv(). 但zmq_recv有一個缺陷, 就是當你提供給zmq_recv()接口的接收buffer不夠長時, zmq_recv()會把數據截斷. 如果你無法預測你要收到的二進制數據的長度, 那么你只能使用zmq_msg_xxx()接口.

從接口名上的msg三個字母就能看出, 這個系列的接口是操縱結構體, 也就是"消息"(其實是幀, 后面會講到), 而不是"數據", 而非緩沖區的接口, 實際上它們操縱的是zmq_msg_t類型的結構. 這個系列的接口功能更為豐富, 但使用起來也請務必萬分小心.

  1. 初始化消息相關的接口: zmq_msg_init(), zmq_msg_init_size(), zmq_msg_init_data()
  2. 消息收發接口: zmq_msg_send(), zmq_msg_recv()
  3. 消息釋放接口: zmq_close()
  4. 訪問消息內容的接口: zmq_msg_data(), zmq_msg_size(), zmq_msg_more()
  5. 訪問消息配置項的接口: zmq_msg_get(), zmq_msg_set()
  6. 復制拷貝操作接口: zmq_msg_copy(), zmq_msg_move()

消息結構中封裝的數據是二進制的, 依然由程序員自己解釋. 關於zmq_msg_t結構類型, 下面是你需要知道的基礎知識:

  1. 去閱讀上面的消息相關接口API的manpage, 你會發現傳遞參數都是以zmq_msg_t *. 也就是說這是一個內部實現不對外開放的類型, 創建, 傳遞, 都應當以指針類型進行操作.
  2. 要從socket中接收一個消息, 你需要先通過zmq_msg_init()創建一個消息對象, 然后將這個消息對象傳遞給zmq_msg_recv()接口
  3. 要向socket中寫入消息, 你需要先通過zmq_msg_init_size()創建一個數據容量指定的消息對象, 然后把你要寫入的二進制數據通過內存拷貝函數, 比如memcpy()寫入消息中, 最后調用zmq_msg_send(), 看到這里你應該明白, zmq_msg_init_size()接口內部進行了內存分配.
  4. 消息的"釋放"和"銷毀"是兩個不同的概念. zmq_msg_t其實是引用計數方式實現的共享對象類型, "釋放"是指當前上下文放棄了對該消息的引用, 內部導致了實例的引用計數-1, 而"銷毀"則是徹底把實例本身給free掉了. 當你"釋放"一個消息的時候, 應當調用zmq_msg_close()接口. 如果消息實例在釋放后引用計數歸0, 那么這個消息實例會被ZMQ自動銷毀掉.
  5. 要訪問消息里包裝的數據, 調用zmq_msg_data()接口, 要獲取消息中數據的長度, 調用zmq_msg_size()
  6. 在你熟讀並理解相關manpage中的內容之前, 不要去調用zmq_msg_move(), zmq_msg_copy(), zmq_msg_init_data()這三個接口
  7. 當你通過zmq_msg_send()調用將消息發送給socket后, 這個消息內部包裝的數據會被清零, 也就是zmq_msg_size() == 0, 所以, 你不應該連續兩次使用同一個zmq_msg_t *值調用zmq_msg_send(). 但需要注意的是, 這里的"清零", 並不代表消息被"釋放", 也不代表消息被"銷毀". 消息還是消息, 只是其中的數據被扔掉了.

如果你想把同一段二進制數據發送多次, 正確的做法是下面這樣:

  1. 調用zmq_msg_init_size(), 創建第一個消息, 再通過memcpy或類似函數將二進制數據寫入消息中
  2. 調用zmq_msg_init()創建第二個消息, 再調用zmq_msg_copy()從第一個消息將數據"復制"過來
  3. 重復上述步驟
  4. 依次調用zmq_msg_send()發送上面的多個消息

ZMQ還支持所謂的"多幀消息", 這種消息允許你把多段二進制數據一次性發送給對端. 這個特性在第三章我們再講. (P.S.: 這是一個很重要的特性, 路由代理等高級套路就嚴重依賴這種多幀消息.). ZMQ中的消息有三層邏輯概念: 消息, 幀, 二進制數據. 用戶自定義的二進制數據被包裝成幀, 然后一個或多個幀組成一個消息. 消息是ZMQ拓撲網絡中兩個結點收發的單位, 但在ZMQ底層的傳輸協議中, 最小單位是幀.

換一個角度來講, ZMQ使用其底層的傳輸協議, 比如tcp, 比如inproc, 比如ipc來傳輸數據, 當ZMQ調用這些傳輸協議傳遞數據的時候, 最小單元是幀. 幀的完整性由傳輸協議來保證, 即是ZMQ本身不關心這個幀會不會破損, 幀的完整傳輸應當由這些傳輸協議去保證. 而在使用ZMQ構建應用程序的程序員眼中, 最小的傳輸單位是消息, 一個消息里可能會有多個幀, 程序員不去關心消息從一端到另一端是否會出現丟幀, 消息的完整性與原子性應當由ZMQ庫去保證.

前面我們講過, ZMQ對其底層的傳輸協議是有侵入性的. 如果要了解ZMQ到底是如何在傳輸協議的基礎上規定幀傳輸格式的, 可以去閱讀這個規范.

在我們到達第三章之前, 我們所討論的消息中都僅包含一個幀. 這就是為什么在這一小節的描述中, 我們幾乎有引導性的讓你覺得, zmq_msg_t類型, 就是"消息", 其實不是, 其實zmq_msg_t消息只是"幀".

  1. 一個消息可以由多個幀組成
  2. 每個幀都是一個zmq_msg_t對象
  3. 使用zmq_msg_send(), zmq_msg_recv(), 你可以一幀一幀的發送數據. 可以用多次調用這些接口的方式來發送一個完整的消息, 或者接收一個完整的消息: 在發送時傳入ZMQ_SNDMORE參數, 或在接收時, 通過zmq_getsockopt()來獲取ZMQ_RCVMORE選項的值. 更多關於如何使用低級API收發多幀消息的信息, 請參見相關接口的manpage
  4. ZMQ也提供了便於收發多幀消息的高級API

關於消息或幀, 還有下面的一些特性:

  1. ZMQ允許你發送數據長度為0的幀. 比如在有些場合, 這只是一個信號, 而沒有任何語義上的數據需要被攜帶
  2. ZMQ在發送多幀消息時, 保證消息的原子性與完整性. 如果丟失, 所有幀都不會到達對端, 如果成功, 那么必須所有幀都被正確送達, 幀在傳輸過程中不會出現破損.
  3. 在調用發送數據的癌后, 消息並不會被立即發出, 而是被放在發送緩沖區中. 這和zmq_send()是一致的.
  4. 你必須在完成消息接收后, 調用zmq_msg_close()接口來釋放這個zmq_msg_t對象

最后再強調一下, 在你不理解zmq_msg_t的原理之前, 不要使用zmq_msg_init_data()接口, 這是一個0拷貝接口, 如果不熟悉zmq_msg_t結構的原理, 瞎jb用, 是會core dump的

ZMQ中的多路I/O復用

在先前的所有例子程序中, 大多程序里干的都是這樣的事情

  1. 等待socket上有數據
  2. 接收數據, 處理
  3. 重復上面的過程

如果你接觸過linux中的select, pselect, epoll等多路IO復用接口, 你一定會好奇, 在使用zmq的時候, 如何實現類似的效果呢? 畢竟ZMQ不光把linux socket的細節給你封裝了, 連文件描述符都給你屏蔽封裝掉了, 顯然你沒法直接調用類似於select, pselect, epoll這種接口了.

答案是, ZMQ自己搞了一個類似的玩意, zmq_poll()了解一下.

我們先看一下, 如果沒有多路IO接口, 如果我們要從兩個socket上接收數據, 我們會怎樣做. 下面是一個沒什么卵用的示例程序, 它試圖從兩個socket上讀取數據, 使用了異步I/O. (如果你有印象的話, 應該記得對應的兩個endpoint實際上是我們在第一章寫的兩個示例程序的數據生產方: 天氣預報程序與村口的大喇叭)

#include <zmq.h>
#include <stdio.h>

int main(void)
{
    void * context = zmq_ctx_new();
    void * receiver = zmq_socket(context, ZMQ_PULL);
    zmq_connect(receiver, "tcp://localhost:5557");

    void * subscriber = zmq_socket(context, ZMQ_SUB);
    zmq_connect(subscriber, "tcp://localhost:5556");
    zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);

    while(1)
    {
        char msg[256];
        while(1)
        {
            int size = zmq_recv(receiver, msg, 255, ZMQ_DONTWAIT);
            if(size != -1)
            {
                // 接收數據成功
            }
            else
            {
                break;
            }
        }

        while(1)
        {
            int size = zmq_recv(subscriber, msg, 255, ZMQ_DONTWAIT);
            if(size == -1)
            {
                // 接收數據成功
            }
            else
            {
                break;
            }
        }

        sleep(1);   // 休息一下, 避免瘋狂循環
    }

    zmq_close(receiver);
    zmq_close(subscriber);
    zmq_ctx_destroy(context);

    return 0;
}

在沒有多路IO手段之前, 這基本上就是你能做到的最好情形了. 大循環里的sleep()讓人渾身難受. 不加sleep()吧, 在沒有數據的時候, 這個無限空循環能把一個核心的cpu占滿. 加上sleep()吧, 收包又會有最壞情況下1秒的延時.

但有了zmq_poll()接口就不一樣了, 代碼就會變成這樣:

#include <zmq.h>
#include <stdio.h>

int main(void)
{
    void * context = zmq_ctx_new();
    void * receiver = zmq_socket(context, ZMQ_PULL);
    zmq_connect(receiver, "tcp://localhost:5557");

    void * subscriber = zmq_socket(context, ZMQ_SUB);
    zmq_connect(subscriber, "tcp://localhost:5556");
    zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);

    while(1)
    {
        char msg[256];
        zmq_pollitem_t items[] = {
            {receiver,  0,  ZMQ_POLLIN,     0},
            {subscriber,0,  ZMQ_POLLIN,     0},
        };

        zmq_poll(items, 2, -1);

        if(items[0].revents & ZMQ_POLLIN)
        {
            int size = zmq_recv(receiver, msg, 255, 0);
            if(size != -1)
            {
                // 接收消息成功
            }
        }

        if(items[1].revents & ZMQ_POLLIN)
        {
            int size = zmq_recv(subscriber, msg, 255, 0);
            if(size != -1)
            {
                // 接收消息成功
            }
        }
    }

    zmq_close(receiver);
    zmq_close(subscriber);
    zmq_ctx_destroy(context);

    return 0;
}

zmq_pollitem_t類型定義如下, 這個定義可以從zmq_poll()的manpage里查到

typedef struct{
    void * socket;  // ZMQ的socket
    int fd;         // 是的, zmq_poll()還可以用來讀寫linux file descriptor
    short events;   // 要被監聽的事件, 基礎事件有 ZMQ_POLLIN 和 ZMQ_POLLOUT, 分別是可讀可寫
    short revents;  // 從zmq_poll()調用返回后, 這里存儲着觸發返回的事件
} zmq_pollitem_t;

多幀消息的收發

我們之前提到過, 用戶數據被包裝成zmq_msg_t對象, 也就是幀, 而在幀上, 還有一個邏輯概念叫"消息". 那么在具體編碼中, 如何發送多幀消息呢? 而又如何接收多幀消息呢? 簡單的講, 兩點:

  1. 在發送時, 向zmq_msg_send()傳入ZMQ_SNDMORE選項, 告訴發送接口, "我后面還有其它幀"
  2. 在接收消息時, 每調用一次zmq_msg_recv()接收一個幀, 就調用一次zmq_msg_more()或者zmq_getsockopt() + ZMQ_RCVMORE來判斷是否這是消息的最后一個幀

發送示例:

zmq_msg_send(&msg, socket, ZMQ_SNDMORE);
zmq_msg_send(&msg, socket, ZMQ_SNDMORE);
zmq_msg_send(&msg, socket, 0);          // 消息的最后一個幀

接收示例:

while(1)
{
    zmq_msg_t msg;
    zmq_msg_init(&msg);
    zmq_msg_recv(&msg, socket, 0);
    // 做處理
    zmq_msg_close(&msg);

    if(!zmq_msg_more(&msg)) // 注意, zmq_msg_more可以在zmq_msg_close后被安全的調用
    {
        break;
    }
}

這里有一個需要注意的有趣小細節: 要判斷一個收來的幀是不是消息的最后一個幀, 有兩種途徑, 一種是zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size), 另外一種是zmq_msg_more(&msg). 前一種途徑的入參是socket, 后一種途徑的入參是msg. 這真是很因缺思汀. 目前來說, 兩種方法都可以, 不過我建議你使用zmq_getsockopt(), 至於原因嘛, 因為在zmq_msg_recv()的manpage中, 是這樣建議的.

關於多幀消息, 你需要注意以下幾點:

  1. 多幀消息的傳輸是原子性的, 這是由ZMQ保證的
  2. 原子性也意味着, 當你使用zmq_poll()時, 當socket可讀, 並且用zmq_msg_recv()讀出一個幀時, 代表着不用等待下一次循環, 你直接繼續讀取, 一定能讀取能整個消息中剩余的其它所有幀
  3. 當一個多幀消息開始被接收時, 無論你是否通過zmq_msg_more()zmq_getsockopt() + ZMQ_RCVMORE檢查消息是否接收完整, 你一幀幀的收, 也會把整個消息里的所有幀收集齊. 所以從這個角度看, zmq_msg_more()可以在把所有可讀的幀從socket里統一接收到手之后, 再慢慢判斷這些幀應該怎么拼裝. 所以這樣看, 它和zmq_getsockopt()的功能也不算是完全重復.
  4. 當一個多幀消息正在發送時, 除了把socket關掉(暴力的), 否則你不能取消本次發送, 本次發送將持續至所有幀都被發出.

中介與代理

ZMQ的目標是建立去中心化的消息通信網絡拓撲. 但不要誤解"去中心"這三個字, 這並不意味着你的網絡拓撲在中心圈內空無一物. 實際上, 用ZMQ搭建的網絡拓撲中常常充滿了各種非業務處理的網絡結點, 我們把這些感知消息, 傳遞消息, 分發消息, 但不實際處理消息的結點稱為"中介", 在ZMQ構建的網絡中, 它們按應用場景有多個細化的名字, 比如"代理", "中繼", "裝置", "掮客"等.

這套邏輯在現實世界里也很常見, 中間人, 中介公司, 它們不實際生產社會價值, 表面上看它們的存在是在吸兩頭的血, 這些皮條客在社會中的存在意義在於: 它們減少了溝通的復雜度, 對通信雙方進行了封裝, 提高了社會運行效率.

在發布-訂閱套路中加入中介: XPUB與XSUB

當構建一個稍有規模的頒式系統的時候, 一個避不開的問題就是, 網絡中的結點是如何感知其它結點的存在的? 結點會當機, 會擴容, 在這些變化發生的時候, 網絡中的其它正在工作的結點如何感知這些變化, 並保持系統整體正常運行呢? 這就是經典的"動態探索問題".

動態探索問題有一系列很經典的解決方案, 最簡單的解決方案就是把問題本身解決掉: 把網絡拓撲設計死, 代碼都寫死, 別讓它瞎jb來回變, 問題消滅了, done!. 這種解決方案的缺點就是如果網絡拓撲要有變更, 比如業務規模擴展了, 或者有個結點當機了, 網絡配置管理員會罵娘.

拓撲規模小的時候, 消滅問題的思路沒什么壞處, 但拓撲稍微復雜一點, 顯然這就是一個很可笑的解決方案.比如說, 網絡中有一個發布者, 有100多個訂閱者, 發布者bind到endpoint上, 訂閱者connect到endpoint上. 如果代碼是寫死的, 如果發布者本身出了點什么問題, 或者發布者一台機器搞不住了, 需要橫向擴容, 你就得改代碼, 然后手動部署到100多台訂閱者上. 這樣的運維成本太大了.

這種場景, 你就需要一個"中介", 對發布者而言, 它從此無需關心訂閱者是誰, 在哪, 有多少人, 只需要把消息給中介就行了. 對於訂閱者而言, 它從此無需關注發布者有幾個, 是否使用了多個endpoint, 在哪, 有多少人. 只需要向中介索取消息就行了. 雖然這時發布者身上的問題轉嫁到的中介身上: 即中介是網絡中最易碎的結點, 如果中介掛了整個拓撲就掛了, 但由於中介不處理業務邏輯, 只是一個類似於交換機的存在, 所以同樣的機器性能, 中介在單位時間能轉發的消息數量, 比發布者和訂閱者能處理的消息高一個甚至幾個數量級. 是的, 使用中介引入了新的問題, 但解決了老的問題.

中介並沒有解決所有問題, 當你引入中介的時候, 中介又變成了網絡中最易碎的點, 所以在實際應用中, 要控制中介的權重, 避免整個網絡拓撲嚴重依賴於一個中介這種情況出現: ZMQ提倡去中心化, 不要把中介變成一個壟斷市場的掮客.

對於發布者而言, 中介就是訂閱者, 而對於訂閱者而言, 中介就是發布者. 中介使用兩種額外的socket類型: XPUB與XSUB. XSUB與真實的發布者連接, XPUB與真實的訂閱者連接.

在請求-回應套路中加入掮客: DELEAR與ROUTER

在我們之前寫的請求-回應套路程序中, 我們有一個客戶端, 一個服務端. 這是一個十分簡化的例子, 實際應用場景中的請求-回應套路中, 一般會有多個客戶端與多個服務端.

請求-應答模式有一個隱含的條件: 服務端是無狀態的. 否則就不能稱之為"請求-應答"套路, 而應該稱之為"嘮嗑套路".

要連接多個客戶端與多個服務端, 有兩種思路.

第一種暴力思路就是: 讓N個客戶端與M個服務端建立起N*M的全連接. 這確實是一個辦法, 雖然不是很優雅. 在ZMQ中, 實現起來還輕松不少: 因為ZMQ的socket可以向多個endpoint發起連接, 這對於客戶端來說, 編碼難度降低了. 客戶端應用程序中可以創建一個zmq_socket, 然后connect到多個服務端的endpoint上就行了. 這種思路做的話, 客戶端數量擴張很容易, 直接部署就可以, 代碼不用改. 但是缺陷有兩個:

  1. 服務端擴容時, 所有客戶端的代碼都得跟着改
  2. 客戶端代碼里必須知道所有服務端的endpoint

總的來說, 這是一種很暴力的解決辦法, 不適合用於健壯的生產環境. 但是這確實是一個辦法.

為了解決上面兩個缺陷, 自然而然的我們就會想到: 為什么不能把服務端抽象出來呢? 讓一個掮客來做那個唯一的endpoint, 以供所有客戶端connect, 然后掮客在背后再把請求體分發給各個服務端, 服務端做出回應后掮客再代替服務端把回應返回給客戶端, 這樣就解決了上面的問題:

  1. 對於客戶端來說, 服務端抽象成了一個endpoint, 服務端擴容時, 客戶端是沒有感知的.
  2. 客戶端不需要知道服務端的所有endpoint, 只需要知道掮客的endpoint就可以了.

並且, 掮客還可以做到以下

  1. 如果N個客戶端發送請求的速度時快時慢, 快的時候, M個服務端處理不過來. 掮客可以做一個緩沖地帶.
  2. 掮客可以記錄會話狀態, 可以保證某一個特定的客戶端始終與一個固定的服務端進行數據交互. 某種程度上, 掮客與客戶端分別記錄部分會話信息, 服務端可以在無狀態的情況下實現"嘮嗑套路"

所以, 在請求回應套路中加入掮客, 是一個很明智的選擇, 這就是第二種思路, 這種思路不是沒有缺陷, 有, 而且很明顯: 掮客是整個系統中最脆弱的部分.

但這個缺陷可以在一定程度上克服掉:

  1. 如果單機掮客轉發能力不夠, 那么可以搞多個掮客. 比如N個客戶端,M個服務端, 3個掮客. 客戶端與3個掮客建立全連接, 3個掮客與M個服務端建立全連接. 總是要好過N個客戶端與M個服務端建立全連接的.
  2. 如果單機掮客緩沖能力不夠, 甚至可以加多層掮客. 這種使用方法就把掮客的緩沖特性放在了首位.

ZMQ中, 有兩個特殊的socket類型特別適合掮客使用:

  1. ROUTER 用於掮客與多個客戶端連接. 掮客bind, 客戶端connect.
  2. DEALER 用於掮客和多個服務端連接. 掮客bind, 服務端connect.

關於這兩種特殊的socket的特性, 后續我們會仔細深入, 目前來說, 你只需要了解

  1. 它們實現了消息的緩沖
  2. 它們通過一種特殊的機制記錄了會話

多說無益, 來看代碼. 下面是在客戶端與服務端中插入掮客的代碼實例:

客戶端

#include <zmq.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();

    void * socket = zmq_socket(context, ZMQ_REQ);
    zmq_connect(socket, "tcp://localhost:5559");

    for(int i = 0; i < 10; ++i)
    {
        s_send(socket, "Hello");

        char * strRsp = s_recv(socket);

        printf("Received reply %d [%s]\n", i, strRsp);
        free(strRsp);
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);

    return 0;
}

服務端

#include <zmq.h>
#include <unistd.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();

    void * socket = zmq_socket(context, ZMQ_REP);
    zmq_connect(socket, "tcp://localhost:5560");

    while(1)
    {
        char * strReq = s_recv(socket);
        printf("Received request: [%s]\n", strReq);
        free(strReq);

        sleep(1);

        s_send(socket, "World");
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);
    
    return 0;
}

掮客

#include <zmq.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket_for_client = zmq_socket(context, ZMQ_ROUTER);
    void * socket_for_server = zmq_socket(context, ZMQ_DEALER);
    zmq_bind(socket_for_client, "tcp://*:5559");
    zmq_bind(socket_for_server, "tcp://*:5560");

    zmq_pollitem_t items[] = {
        {   socket_for_client,  0,  ZMQ_POLLIN, 0   },
        {   socket_for_server,  0,  ZMQ_POLLIN, 0   },
    };

    while(1)
    {
        zmq_msg_t message;
        zmq_poll(items, 2, -1);

        if(items[0].revents & ZMQ_POLLIN)
        {
            while(1)
            {
                zmq_msg_init(&message);
                zmq_msg_recv(&message, socket_for_client, 0);
                int more = zmq_msg_more(&message);
                zmq_msg_send(&message, socket_for_server, more ? ZMQ_SNDMORE : 0);
                zmq_msg_close(&message);

                if(!more)
                {
                    break;
                }
            }
        }

        if(items[1].revents & ZMQ_POLLIN)
        {
            while(1)
            {
                zmq_msg_init(&message);
                zmq_msg_recv(&message, socket_for_server, 0);
                int more = zmq_msg_more(&message);
                zmq_msg_send(&message, socket_for_client, more ? ZMQ_SNDMORE : 0);
                zmq_msg_close(&message);

                if(!more)
                {
                    break;
                }
            }
        }
    }

    zmq_close(socket_for_client);
    zmq_close(socket_for_server);
    zmq_ctx_destroy(context);

    return 0;
}

客戶端和服務端由於掮客的存在, 代碼都簡單了不少, 對於掮客的代碼, 有以下幾點需要思考:

  1. 為什么客戶端和服務端雙方在代碼中以s_sends_recv互相傳遞字符串, 但在掮客那里就需要用zmq_msg_t進行轉發呢?
  2. 為什么掮客在轉發消息的時候, 還需要判斷是否是多幀消息呢?
  3. 更進一步的, 如果有多個客戶端與多個服務端, 客戶端A向掮客發送請求, 掮客將其轉發到了服務端B, 然后B回包, 發向掮客, 當回包消息到達掮客時, 掮客是如何將回包消息正確投遞給A, 而不是其它客戶端的呢?

上面三點其實是同一個問題: 掮客是如何實現帶會話追蹤的轉發消息的?

另外, 如果你先啟動掮客, 再啟動客戶端, 再啟動服務端. 你會看到在服務端正確啟動后, 客戶端顯示它收到了回包.那么:

  1. 在服務端未啟動時, 顯然在客戶端的角度來講, 客戶端已經將第一個請求投遞給了掮客. 如果此時有1000個客戶端與掮客相連, 1000個首請求消息是如何存儲的? 10000個呢? 什么時候掮客會丟棄請求?

這就是有關掮客的第二個問題: 如何配置緩沖區.

本章目前暫時不會對這三個問題做出解答, 大家先思考一下. 我們將在下一章深入掮客的細節進行進一步探索.

ZMQ內置的掮客函數

在上面的掮客代碼示例中, 核心代碼就是zmq_poll對兩個socket的監聽, 以及while(1)循環. ZMQ將這兩坨操作統一封裝到了一個函數中, 省得大家每次都要寫boring code.

int zmq_proxy (const void *frontend, const void *backend, const void *capture);

參數frontendbackend分別是與客戶端相連的socket與服務端相連的socket. 在使用zmq_proxy函數之前, 這兩個socket必須被正確配置好, 該調用connect就調用connect, 該調用bind就調用bind. 簡單來講, zmq_proxy負責把frontendbackend之間的數據互相遞送給對方. 而如果僅僅是單純的遞送的話, 第三個參數capture就應當被置為NULL, 而如果還想監聽一下數據, 那么就再創建一個socket, 並將其值傳遞給capture, 這樣, frontendbackend之間的數據都會有一份拷貝被送到capture上的socket.

當我們用zmq_proxy重寫上面的掮客代碼的話, 代碼會非常簡潔, 會變成這樣:

#include <zmq.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket_for_client = zmq_socket(context, ZMQ_ROUTER);
    void * socket_for_server = zmq_socket(context, ZMQ_DEALER);
    zmq_bind(socket_for_client, "tcp://*:5559");
    zmq_bind(socket_for_server, "tcp://*:5560");

    zmq_proxy(socket_for_client, socket_for_server, NULL);

    zmq_close(socket_for_client);
    zmq_close(socket_for_server);
    zmq_ctx_destroy(context);

    return 0;
}

橋接技巧

橋接是服務器后端的一種常用技巧. 所謂的橋接有點類似於掮客, 但是解決問題的側重點不一樣. 掮客主要解決了三個問題:

  1. 降低網絡連接數量. 從N*M降低到 (N+M)*X
  2. 向客戶端與服務端屏蔽彼此的具體實現, 隱藏彼此的具體細節.
  3. 緩沖

而橋接解決的問題的側重點主要在:

  1. 向通信的一方, 屏蔽另一方的具體實現.

這種設計思路常用於后台服務的接口層. 接口層一方面連接着后端內部局域網, 另外一方面對公提供服務. 這種服務可以是請求-回應式的服務, 也可以是發布-訂閱式的服務(顯然發布方在后端內部的局域網里). 這個時候接口層其實就完成了橋接的工作.

其實這種應用場景里, 把這種技巧稱為橋接並不是很合適. 因為橋接是一個計算機網絡中硬件層的術語, 最初是用於線纜過長信號衰減時, 在線纜末端再加一個信號放大器之類的設備, 為通信續命用的.

原版ZMQ文檔在這里提出bridging這個術語, 也只是為了說明一下, zmq_proxy的適用場景不僅局限於做掮客, 而是應該在理解上更寬泛一點, zmq_proxy函數就是互相傳遞兩個socket之間數據函數, 僅此而已, 而具體這個函數能應用在什么樣的場景下, 掮客與橋接場景均可以使用, 但絕不局限於此. 寫代碼思維要活.

妥善的處理錯誤

ZMQ庫對待錯誤, 或者叫異常, 的設計哲學是: 見光死. 前文中寫的多數示例代碼, 都沒有認真的檢查ZMQ庫函數調用的返回值, 也沒有關心它們執行失敗后會發生什么. 一般情況下, 這些函數都能正常工作, 但凡事總有個萬一, 萬一創建socket失敗了, 萬一bind或connect調用失敗了, 會發生什么?

按照見光死的字面意思: 按我們上面寫代碼的風格, 一旦出錯, 程序就掛掉退出了.

所以正確使用ZMQ庫的姿勢是: 生產環境運行的代碼, 務必為每一個ZMQ庫函數的調用檢查返回值, 考慮調用失敗的情況. ZMQ庫函數的設計也繼續了POSIX接口風格里的一些設計, 這些設計包括:

  1. 創建對象的接口, 在失敗時一般返回NULL
  2. 處理數據的接口, 正常情況下將返回處理的數據的字節數. 失敗情況下將返回-1
  3. 其它一般性的函數, 成功時返回0, 失敗時返回-1
  4. 當調用失敗發生時, 具體的錯誤碼存放在errno中, 或zmq_errno()
  5. 有關錯誤的詳情描述信息, 通過zmq_strerror()可能獲得

真正健壯的代碼, 應該像下面這樣寫, 是的, 它很啰嗦, 但它很健壯:

// ...
void * context = zmq_ctx_new();
assert(context);
void * socket = zmq_socket(context, ZMQ_REP);
assert(socket);
int rc = zmq_bind(socket, "tcp://*:5555");
if(rc == -1)
{
    printf("E: bind failed: %s\n", strerror(errno));
    return -1;
}
// ...

有兩個比較例外的情況需要你注意一下:

  1. 處理ZMQ_DONTWAIT的函數返回-1時, 一般情況下不是一個致命錯誤, 不應當導致程序退出. 比如在收包函數里帶上這個標志, 那么語義只是說"沒數據可收", 是的, 收包函數會返回-1, 並且會置error值為EAGAIN, 但這並不代表程序發生了不可逆轉的錯誤.
  2. 當一個線程調用zmq_ctx_destroy()時, 如果此時有其它線程在忙, 比如在寫數據或者收數據什么的, 那么這會直接導致這些在干活的線程, 調用的這些阻塞式接口函數返回-1, 並且errno被置為ETERM. 這種情況在實際編碼過程中不應當出現.

下面我們寫一個健壯的分治套路, 和我們在第一章中寫過的類似, 不同的是, 這次, 在監理收到"所有工作均完成"的消息之后, 會發消息給各個工程隊, 讓工程隊停止運行. 這個例子主要有兩個目的:

  1. 向大家展示, 在使用ZMQ庫的同時, 把代碼寫健壯
  2. 向大家展示如何優雅的干掉一個進程

原先的分治套路代碼, 使用PUSH/PULL這兩種socket類型, 將任務分發給多個工程隊. 但在工作做完之后, 工程隊的程序還在運行, 工程隊的程序無法得知任務什么進修終止. 這里我們再摻入發布-訂閱套路, 在工作做完之后, 監理向廣大工程隊, 通過PUB類型的socket發送"活干活了"的消息, 而工程隊用SUB類型的socket一旦收到監理的消息, 就停止運行.

包工頭ventilator的代碼和上一章的一毛一樣, 只是對所有的ZMQ庫函數調用增加了錯誤處理. 照顧大家, 這里再帖一遍

#include <zmq.h>
#include <stdio.h>
#include <time.h>
#include <assert.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    assert(context);
    void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
    assert(socket_to_sink);
    void * socket_to_worker = zmq_socket(context, ZMQ_PUSH);
    assert(socket_to_worker);
    if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1)
    {
        printf("E: connect failed: %s\n", strerror(errno));
        return -1;
    }
    if(zmq_bind(socket_to_worker, "tcp://*:5557") == -1)
    {
        printf("E: bind failed: %s\n", strerror(errno));
        return -1;
    }

    printf("Press Enter when all workers get ready:");
    getchar();
    printf("Sending tasks to workers...\n");

    if(s_send(socket_to_sink, "Get ur ass up") == -1)
    {
        printf("E: s_send failed: %s\n", strerror(errno));
        return -1;
    }

    srandom((unsigned)time(NULL));

    int total_ms = 0;
    for(int i = 0; i < 100; ++i)
    {
        int workload = randof(100) + 1;
        total_ms += workload;
        char string[10];
        snprintf(string, sizeof(string), "%d", workload);
        if(s_send(socket_to_worker, string) == -1)
        {
            printf("E: s_send failed: %s\n", strerror(errno));
            return -1;
        }
    }

    printf("Total expected cost: %d ms\n", total_ms);

    zmq_close(socket_to_sink);
    zmq_close(socket_to_worker);
    zmq_ctx_destroy(context);

    return 0;
}

接下來是工程隊worker的代碼, 這一版新增了一個socket_to_sink_of_control來接收來自監理的停止消息:

#include <zmq.h>
#include <assert.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    assert(context);
    void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL);
    assert(socket_to_ventilator);
    if(zmq_connect(socket_to_ventilator, "tcp://localhost:5557") == -1)
    {
        printf("E: connect failed: %s\n", strerror(errno));
        return -1;
    }

    void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
    assert(socket_to_sink);
    if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1)
    {
        printf("E: connect failed: %s\n", strerror(errno));
        return -1;
    }

    void * socket_to_sink_of_control = zmq_socket(context, ZMQ_SUB);
    assert(socket_to_sink_of_control);
    if(zmq_connect(socket_to_sink_of_control, "tcp://localhost:5559") == -1)
    {
        printf("E: connect failed: %s\n", strerror(errno));
        return -1;
    }
    if(zmq_setsockopt(socket_to_sink_of_control, ZMQ_SUBSCRIBE, "", 0) == -1)
    {
        printf("E: setsockopt failed: %s\n", strerror(errno));
    }

    zmq_pollitem_t items [] = {
        {   socket_to_ventilator,   0,  ZMQ_POLLIN, 0   },
        {   socket_to_sink_of_control,  0,  ZMQ_POLLIN, 0   },
    };

    while(1)
    {
        if(zmq_poll(items, 2, -1) == -1)
        {
            printf("E: poll failed: %s\n", strerror(errno));
            return -1;
        }

        if(items[0].revents & ZMQ_POLLIN)
        {
            char * strWork = s_recv(socket_to_ventilator);
            assert(strWork);
            printf("%s.", strWork);
            fflush(stdout);
            s_sleep(atoi(strWork));
            free(strWork);
            if(s_send(socket_to_sink, "") == -1)
            {
                printf("E: s_send failed %s\n", strerror(errno));
                return -1;
            }
        }

        if(items[1].revents & ZMQ_POLLIN)
        {
            break;
        }
    }

    zmq_close(socket_to_ventilator);
    zmq_close(socket_to_sink);
    zmq_close(socket_to_sink_of_control);
    zmq_ctx_destroy(context);

    return 0;
}

接下來是監理的代碼, 這一版新增了socket_to_worker_of_control來在任務結束之后給工程隊發布停止消息:

#include <zmq.h>
#include <assert.h>
#include <stdint.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    assert(context);

    void * socket_to_worker = zmq_socket(context, ZMQ_PULL);
    if(zmq_bind(socket_to_worker, "tcp://*:5558") == -1)
    {
        printf("E: bind failed: %s\n", strerror(errno));
        return -1;
    }

    void * socket_to_worker_of_control = zmq_socket(context, ZMQ_PUB);
    if(zmq_bind(socket_to_worker_of_control, "tcp://*:5559") == -1)
    {
        printf("E: bind failed: %s\n", strerror(errno));
        return -1;
    }

    char * strBeginMsg = s_recv(socket_to_worker);
    assert(strBeginMsg);
    free(strBeginMsg);

    int64_t i64StartTime = s_clock();

    for(int i = 0; i < 100; ++i)
    {
        char * strRes = s_recv(socket_to_worker);
        assert(strRes);
        free(strRes);

        if(i % 10 == 0)
        {
            printf(":");
        }
        else
        {
            printf(".");
        }

        fflush(stdout);
    }

    printf("Total elapsed time: %d msec\n", (int)(s_clock() - i64StartTime));

    if(s_send(socket_to_worker_of_control, "STOP") == -1)
    {
        printf("E: s_send failed: %s\n", strerror(errno));
        return -1;
    }

    zmq_close(socket_to_worker);
    zmq_close(socket_to_worker_of_control);
    zmq_ctx_destroy(context);

    return 0;
}

這個例子也展示了如何將多種套路揉合在一個場景中. 所以說寫代碼, 思維要靈活.

處理POSIX Signal

一般情況下, Linux上的程序在接收到諸如SIGINTSIGTERM這樣的信號時, 其默認動作是讓進程退出. 這種退出信號的默認行為, 只是簡單的把進程干掉, 不會管什么緩沖區有沒有正確刷新, 也不會管文件以及其它資源句柄是不是正確被釋放了.

這對於實際應用場景中的程序來說是不可接受的, 所以在編寫后台應用的時候一定要注意這一點: 要妥善的處理POSIX Signal. 限於篇幅, 這里不會對Signal進行進一步討論, 如果對這部分內容不是很熟悉的話, 請參閱<Unix環境高級編程>(<Advanced Programming in the UNIX Environment>)第十章(chapter 10. Signals).

下面是妥善處理Signal的一個例子

#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <assert.h>
#include <string.h>

#include <zmq.h>

#define S_NOTIFY_MSG    " "
#define S_ERROR_MSG     "Error while writing to self-pipe.\n"

static int s_fd;

static void s_signal_handler(int signal_value)
{
    int rc = write(s_fd, S_NOTIFY_MSG, sizeof(S_NOTIFY_MSG));
    if(rc != sizeof(S_NOTIFY_MSG))
    {
        write(STDOUT_FILENO, S_ERROR_MSG, sizeof(S_ERROR_MSG) - 1);
        exit(1);
    }
}

static void s_catch_signals(int fd)
{
    s_fd = fd;

    struct sigaction action;
    action.sa_handler = s_signal_handler;

    action.sa_flags = 0;
    sigemptyset(&action.sa_mask);
    sigaction(SIGINT, &action, NULL);
    sigaction(SIGTERM, &action, NULL);
}

int main(void)
{
    int rc;

    void * context = zmq_ctx_new();
    assert(context);
    void * socket = zmq_socket(context, ZMQ_REP);
    assert(socket);

    if(zmq_bind(socket, "tcp://*:5555") == -1)
    {
        printf("E: bind failed: %s\n", strerror(errno));
        return -__LINE__;
    }

    int pipefds[2];
    rc = pipe(pipefds);
    if(rc != 0)
    {
        printf("E: creating self-pipe failed: %s\n", strerror(errno));
        return -__LINE__;
    }

    for(int i = 0; i < 2; ++i)
    {
        int flags = fcntl(pipefds[0], F_GETFL, 0);
        if(flags < 0)
        {
            printf("E: fcntl(F_GETFL) failed: %s\n", strerror(errno));
            return -__LINE__;
        }

        rc = fcntl(pipefds[0], F_SETFL, flags | O_NONBLOCK);
        if(rc != 0)
        {
            printf("E: fcntl(F_SETFL) failed: %s\n", strerror(errno));
            return -__LINE__;
        }
    }

    s_catch_signals(pipefds[1]);

    zmq_pollitem_t items[] = {
        {   0,      pipefds[0],     ZMQ_POLLIN,     0   },
        {   socket, 0,              ZMQ_POLLIN,     0   },
    };

    while(1)
    {
        rc = zmq_poll(items, 2, -1);
        if(rc == 0)
        {
            continue;
        }
        else if(rc < 0)
        {
            if(errno == EINTR)
            {
                continue;
            }
            else
            {
                printf("E: zmq_poll failed: %s\n", strerror(errno));
                return -__LINE__;
            }
        }

        // Signal pipe FD
        if(items[0].revents & ZMQ_POLLIN)
        {
            char buffer[2];
            read(pipefds[0], buffer, 2);    // clear notifying bytes
            printf("W: interrupt received, killing server...\n");
            break;
        }

        // Read socket
        if(items[1].revents & ZMQ_POLLIN)
        {
            char buffer[255];
            rc = zmq_recv(socket, buffer, 255, ZMQ_NOBLOCK);
            if(rc < 0)
            {
                if(errno == EAGAIN)
                {
                    continue;
                }

                if(errno == EINTR)
                {
                    continue;
                }

                printf("E: zmq_recv failed: %s\n", strerror(errno));

                return -__LINE__;
            }

            printf("W: recv\n");
            // Now send message back;
            // ...
        }
    }

    printf("W: cleaning up\n");
    zmq_close(socket);
    zmq_ctx_destroy(context);

    return 0;
}

上面這個程序的邏輯流程是這樣的:

  1. 首先這是一個典型的服務端應用程序. 先創建了一個類型為ZMQ_REP的zmq socket, 並將之bind在本地5555端口上

  2. 然后程序創建了一個管道, 並將管道0(寫端)置為非阻塞模式

  3. 然后程序為信號SIGINTSIGTERM掛載了自定義的信號處理函數, 信號處理函數做的事如下:

    1. 向管道1(寫端)寫入字符串" "
    2. 若寫入失敗, 則向標准輸出寫入錯誤字符串"Err while writing to self-pipe"並調用exit()退出程序
  4. 然后將zmq socket與管道1讀端均加入zmq_poll

    1. 在zmq socket收到請求時, 正常處理請求
    2. 在管道1收到數據時, 說明接收到了SIGINTSIGTERM信號, 則退出數據處理循環, 之后將依次調用zmq_close()zmq_ctx_destroy()

這種寫法使用了管道, 邏輯上清晰了, 代碼上繁瑣了, 但這都不是重點, 重點是這個版本的服務端程序在接收到SIGINTSIGTERM時, 雖然也會退出進程, 但在退出之前會妥善的關閉掉zmq socket與zmq context.

而還有一種更簡潔的寫法(這種簡潔的寫在其實是有潛在的漏洞的, 詳情請參見<Unix環境高級編程>(<Advanced Programming in the UNIX Environment>) 第十章(chapter 10. Signals) )

  1. 定義一個全局變量 s_interrupted
  2. 定義一個信號處理函數, 該信號處理函數在接收到諸如SIGINT之類的信號時, 置s_interrupted為1
  3. 在業務處理邏輯中, 判斷全局變量s_interrupted的值, 若該值為1, 則進入退出流程

大致如下:

s_catch_signals();      // 注冊事件回調
client = zmq_socket(...);
while(!s_interrupted)   // 時刻檢查 s_interrupted 的值
{
    char * message = s_recv(client);
    if(!message)
    {
        break;          // 接收消息異常時也退出
    }

    // 處理業務邏輯
}
zmq_close(close);

避免內存泄漏

服務端應用程序最蛋疼的問題就是內存泄漏了, 這個問題已經困擾了C/C++程序員二三十年了, ZMQ在這里建議你使用工具去檢測你的代碼是否有內存泄漏的風險. 這里建議你使用的工具是: valgrind

默認情況下, ZMQ本身會導致valgrind報一大堆的警告, 首先先屏蔽掉這些警告. 在你的工程目錄下新建一個文件名為 vg.supp, 寫入下面的內容

{
    <socketcall_sendto>
    Memcheck:Param
    socketcall.sendto(msg)
    fun:send
    ...
}
{
    <socketcall_sendto>
    Memcheck:Param
    socketcall.send(msg)
    fun:send
}

然后記得妥善處理掉諸如SIGINTSIGTERM這樣的Signal. 否則valgrind會認為不正確的退出程序會有內存泄漏風險. 最后, 在編譯你的程序時, 加上 -DDEBUG 選項. 然后如下運行valgrind

valgrind --tool=memcheck --leak-check=full --suppression=vg.supp <你的程序>

如果你的代碼寫的沒有什么問題, 會得到下面這樣的贊賞

==30536== ERROR SUMMARY: 0 errors from 0 contexts...

在多線程環境使用 ZMQ

啊, 多線程, 給大家講一個笑話, 小明有一個問題, 然后小明決定使用多線程編程解決這個問題. 最后小明問題兩個了有.

傳統的多線程編程中, 或多或少都會摻入同步手段. 而這此同步手段一般都是程序員的噩夢, 信號量, 鎖. ZMQ則告誡廣大程序員: 不要使用信號量, 也不要使用鎖, 不要使用除了 zmq inproc之外的任何手段進行線程間的數據交互.

ZMQ在多線程上的哲學是這樣的:

  1. 多線程應該以並行優勢提高程序運行效率
  2. 避免線程同步. 如果你的多線程程序需要大量的代碼來完成線程同步, 那么一定是你的程序設計有問題.
  3. 如果非得同步, 那么不要使用鎖或信號量. 而使用 zmq inproc socket 來在線程間傳遞信息
  4. 良好的多線程程序設計, 應當很容易的將其改造成多進程服務, 更進一步, 應該很容易的拆分進程以部署在不同的機器結點上.
  5. 總的來說, 以多進程的設計思路去設計多線程程序, 核心哲學是避免線程同步.

更細節的, 在進行多線程編程時, 你應當遵循以下的幾個點:

  1. 將數據進行獨立拆分, 每個線程只訪問自己的私有數據, 避免多線程共享數據. 除了一個例外: zmq的context實例
  2. 避免使用傳統的線程同步手段: 信號量, 臨界區, 鎖. 上面已經強調過了, 不要使用這些手段.
  3. 在程序一開始處, 未創建線程時, 創建context實例, 隨后將這個context實例共享給所有線程
  4. 如果父線程需要創建數據實例, 那么開attached線程創建程序中要使用的數據實例. 然后通過inproc pair socket將數據實例回傳. 父線程bind, 子線程connect
  5. 如果父線程需要並行的子線程來處理業務. 那么開detached線程來跑業務, 並在各子線程中為各個子線程創建自己獨享的context. 父子線程使用tcp socket進行通信. 這樣你的程序就會很容易的擴展成多進程服務, 而不需要改動過多的代碼.
  6. 線程間的數據交互一律使用zmq socket傳遞消息.
  7. 不要在線程間傳遞socket句柄. zmq socket實例不是線程安全的. 從本質上講在線程間傳遞socket句柄是可行的, 但這要建立在經驗豐富的基礎上, 否則只會讓事情更大條. 在線程間傳遞socket實例一般情況下只發生在zmq庫的其它編程語言的binding庫上, 一般也是用於帶GC的語言去處理自動對象回收. 這種技巧不應該出現在zmq的使用者身上.

如果你程序要用到多個掮客, 比如, 多個線程都擁有自己獨立的掮客, 一個常見的錯誤就是: 在A線程里創建掮客的左右兩端socket, 然后將socket傳遞給B線程里的掮客. 這違反了上面的規則: 不要在線程間傳遞socket. 這種錯誤很難發覺, 並且出錯是隨機的, 出現問題后很難排查.

ZMQ對線程庫是沒有侵入性的, ZMQ沒有內置線程庫, 也沒有使用其它的線程實例. 使用ZMQ寫多線程應用程序, 多線程接口就是操作系統操作的線程接口. 所以它對線程相關的檢查工具是友好的: 比如Intel的Thread Checker. 這種設計的壞處就是你寫的程序在線程接口這一層的可移植性需要你自己去保證. 或者你需要使用其它第三方的可移植線程庫.

這里我們寫一個例子吧, 我們把最初的請求-回應套路代碼改造成多線程版的. 原始版的服務端是單進程單線程程序, 如果請求量比較低的話, 是沒有什么問題的, 單線程的ZMQ應用程序吃滿一個CPU核心是沒有問題的, 但請求量再漲就有點捉襟見肘了, 這個時候就需要讓程序吃滿多個核心. 當然多進程服務也能完成任務, 但這里主要是為了介紹在多線程編程中使用ZMQ, 所以我們把服務端改造成多線程模式.

另外, 顯然你可以使用一個掮客, 再外加一堆服務端結點(無論結點是獨立的進程, 還是獨立的機器)來讓服務端的處理能力更上一層樓. 但這更跑偏了.

還是看代碼吧. 服務端代碼如下:

#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include "zmq_helper.h"

static void * worker_routine(void * context)
{
    void * socket_to_main_thread = zmq_socket(context, ZMQ_REP);
    assert(socket_to_main_thread);
    zmq_connect(socket_to_main_thread, "inproc://workers");

    while(1)
    {
        char * strReq = s_recv(socket_to_main_thread);
        printf("Received request: [%s]\n", strReq);
        free(strReq);
        sleep(1);
        s_send(socket_to_main_thread, "World");
    }

    zmq_close(socket_to_main_thread);
    return NULL;
}

int main(void)
{
    void * context = zmq_ctx_new();
    assert(context);

    void * socket_to_client = zmq_socket(context, ZMQ_ROUTER);
    assert(socket_to_client);
    zmq_bind(socket_to_client, "tcp://*:5555");

    void * socket_to_worker_thread = zmq_socket(context, ZMQ_DEALER);
    assert(socket_to_worker_thread);
    zmq_bind(socket_to_worker_thread, "inproc://workers");

    for(int i = 0; i < 5; ++i)
    {
        pthread_t worker;
        pthread_create(&worker, NULL, worker_routine, context);
    }

    zmq_proxy(socket_to_client, socket_to_worker_thread, NULL);

    zmq_close(socket_to_client);
    zmq_close(socket_to_worker_thread);
    zmq_ctx_destroy(context);

    return 0;
}

這就是一個很正統的設計思路, 多個線程之間是互相獨立的, worker線程本身很容易能改造成獨立的進程, 主線程做掮客.

使用 PAIR socket 進行線程間通信

來, 下面就是一個例子, 使用PAIR socket完成線程同步, 內部通信使用的是inproc

#include <zmq.h>
#include <pthread.h>
#include "zmq_helper.h"

static void * thread1_routine(void * context)
{
    printf("thread 1 start\n");
    void * socket_to_thread2 = zmq_socket(context, ZMQ_PAIR);
    zmq_connect(socket_to_thread2, "inproc://thread_1_2");

    printf("thread 1 ready, send signal to thread 2\n");

    s_send(socket_to_thread2, "READY");

    zmq_close(socket_to_thread2);
    printf("thread 1 end\n");
    return NULL;
}

static void * thread2_routine(void * context)
{
    printf("thread 2 start\n");
    void * socket_to_thread1 = zmq_socket(context, ZMQ_PAIR);
    zmq_bind(socket_to_thread1, "inproc://thread_1_2");
    pthread_t thread1;
    pthread_create(&thread1, NULL, thread1_routine, context);

    char * str = s_recv(socket_to_thread1);
    free(str);
    zmq_close(socket_to_thread1);

    void * socket_to_mainthread = zmq_socket(context, ZMQ_PAIR);
    zmq_connect(socket_to_mainthread, "inproc://thread_2_main");
    printf("thread 2 ready, send signal to main thread\n");
    s_send(socket_to_mainthread, "READY");

    zmq_close(socket_to_mainthread);
    printf("thread 2 end\n");
    return NULL;
}

int main(void)
{
    printf("main thread start\n");
    void * context = zmq_ctx_new();

    void * socket_to_thread2 = zmq_socket(context, ZMQ_PAIR);
    zmq_bind(socket_to_thread2, "inproc://thread_2_main");
    pthread_t thread2;
    pthread_create(&thread2, NULL, thread2_routine, context);

    char * str = s_recv(socket_to_thread2);
    free(str);
    zmq_close(socket_to_thread2);

    printf("Test over\n");
    zmq_ctx_destroy(context);
    printf("main thread end\n");
    return 0;

}

這個簡單的程序包含了幾個編寫多線程同步時的潛規則:

  1. 線程間同步使用 inproc PAIR 型的socket. 共享context
  2. 父線程bind, 子線程connect

需要注意的是, 上面這種寫法的多線程, 很難拆成多個進程, 上面這種寫法一般用於壓根就不准備拆分的服務端應用程序. inproc很快, 性能很好, 但是不能用於多進程或多結點通信.

另外一種常見的設計就是使用tcp來傳遞同步信息. 使用tcp使得多線程拆分成多進程成為一種可能. 另外一種同步場景就是使用發布-訂閱套路. 而不使用PAIR. 甚至可以使用掮客使用的ROUTER/DEALER進行同步. 但需要注意下面幾點:

  1. 在使用PUSH/PULL做同步時, 需要注意: PUSH會把消息廣播給所有PULL.注意這一點, 不要把同步消息發給其它線程
  2. 在使用ROUTER/DEALER做同步時. 需要注意: ROUTER會把你發送的消息裝進一個"信封", 也就是說, 你調用zmq_send接口發送的消息將變成一個多幀消息被發出去. 如果你發的同步消息不帶語義, 那么還好, 如果你發送的消息帶語義, 那么請特別小心這一點, 多幀消息的細節將在第三章進行進一步討論. 而DEALER則會把消息廣播給所有對端, 這一點和PUSH一樣, 請額外注意. 總之建立在閱讀第三章之前, 不要用ROUTER或DEALER做線程同步.
  3. 你還可以使用PUB/SUB來做線程同步. PUB/SUB不會封裝你發送的消息, 你發啥就是啥, 但你需要每次為SUB端通過zmq_setsockopt設置過濾器, 否則SUB端收不到任何消息, 這一點很煩.

所以總的來說, 用PAIR是最方便的選擇.

不同機器結點間的同步

當你需要同步, 或者協調的兩個結點位於兩個不同的機器上時, PAIR就不那么好用了, 直接原因就是: PAIR不支持斷線重連. 在同一台機器上, 多個進程之間同步, 沒問題, 多個線程之間同步, 也沒問題. 因為單機內建立起的通訊連接基本不可能發生意外中斷, 而一旦發生中斷, 一定是進程掛了, 這個時候麻煩事是進程為什么掛了, 而不是通訊連接為什么掛了.

但是在跨機器的結點間進行同步, 就需要考慮到網絡波動的原因了. 結點本身上運行的服務可能沒什么問題, 但就是網線被剪了, 這種情況下使用PAIR就不再合適了, 你就必須使用其它socket類型了.

另外, 線程同步與跨機器結點同步之間的另外一個重大區別是: 線程數量一般是固定的, 服務穩定運行期間, 線程數目一般不會增加, 也不會減少. 但跨機器結點可能會橫向擴容. 所以要考慮的事情就又我了一坨.

我們下面會給出一個示例程序, 向你展示跨機器結點之間的同步到底應該怎么做. 還記得上一章我們講發布-訂閱套路的時候, 提到的, 在訂閱方建立連接的那段短暫的時間內, 所有發布方發布的消息都會被丟棄嗎? 這里我們將改進那個程序, 在下面改進版的發布-訂閱套路中, 發布方會等待所有訂閱方都建立連接完成后, 才開始廣播消息. 下面將要展示的代碼主要做了以下的工作:

  1. PUB方提前知道SUB方的數量
  2. PUB方啟動, 等待SUB方連接, 發送就緒信息.
  3. 當所有SUB方連接完畢后, 開始工作.
  4. 而同步工作是由REQ/REP完成的.

來看代碼:

發布方代碼:

#include <zmq.h>
#include "zmq_helper.h"

#define SUBSCRIBER_EXPECTED 10

int main(void)
{
    void * context = zmq_ctx_new();

    void * socket_for_pub = zmq_socket(context, ZMQ_PUB);

    int sndhwm = 1100000;
    zmq_setsockopt(socket_for_pub, ZMQ_SNDHWM, &sndhwm, sizeof(int));

    zmq_bind(socket_for_pub, "tcp://*:5561");

    void * socket_for_sync = zmq_socket(context, ZMQ_REP);
    zmq_bind(socket_for_sync, "tcp://*:5562");

    printf("waiting for subscribers\n");
    int subscribers_count = 0;
    while(subscribers_count < SUBSCRIBER_EXPECTED)
    {
        char * str = s_recv(socket_for_sync);
        free(str);

        s_send(socket_for_sync, "");
        subscribers_count++;
    }

    printf("broadingcasting messages\n");
    for(int i = 0; i < 1000000; ++i)
    {
        s_send(socket_for_pub, "Lalalal");
    }

    s_send(socket_for_pub, "END");

    zmq_close(socket_for_pub);
    zmq_close(socket_for_sync);
    zmq_ctx_destroy(context);

    return 0;
}

訂閱方代碼

#include <zmq.h>
#include <unistd.h>
#include "zmq_helper.h"


int main(void)
{
    void * context = zmq_ctx_new();

    void * socket_for_sub = zmq_socket(context, ZMQ_SUB);
    zmq_connect(socket_for_sub, "tcp://localhost:5561");
    zmq_setsockopt(socket_for_sub, ZMQ_SUBSCRIBE, "", 0);

    sleep(1);

    void * socket_for_sync = zmq_socket(context, ZMQ_REQ);
    zmq_connect(socket_for_sync, "tcp://localhost:5562");

    s_send(socket_for_sync, "");

    char * str = s_recv(socket_for_sync);
    free(str);

    int i = 0;
    while(1)
    {
        char * str = s_recv(socket_for_sub);
        if(strcmp(str, "END") == 0)
        {
            free(str);
            break;
        }

        free(str);
        i++;
    }

    printf("Received %d broadcast message\n", i);

    zmq_close(socket_for_sub);
    zmq_close(socket_for_sync);
    zmq_ctx_destroy(context);

    return 0;
}

最后帶一個啟動腳本:

#! /bin/bash

echo "Starting subscribers..."

for((a=0; a<10; a++)); do
    ./subscriber &
done

echo "Starting publisher..."
./publisher

運行啟動腳本之后, 你大概會得到類似於下面的結果:

Starting subscribers...
Starting publisher...
waiting for subscribers
broadingcasting messages
Received 1000000 broadcast message
Received 1000000 broadcast message
Received 1000000 broadcast message
Received 1000000 broadcast message
Received 1000000 broadcast message
Received 1000000 broadcast message
Received 1000000 broadcast message
Received 1000000 broadcast message
Received 1000000 broadcast message
Received 1000000 broadcast message

你看, 這次有了同步手段, 每個訂閱者都確實收到了100萬條消息, 一條不少

上面的代碼還有一個細節需要你注意一下:

注意到在訂閱者的代碼中, 有一行sleep(1), 如果去掉這一行, 運行結果可能(很小的概率)不是我們期望的那樣. 之所以這樣做是因為:

先創建用於接收消息的socket_for_sub, 然后connect之. 再去做同步操作. 有可能: 同步的REQ與REP對話已經完成, 但是socket_for_sub的連接過程還沒有結束. 這個時候還是會丟掉消息. 也就是說, 這個sleep(1)操作是為了確認: 在同步操作完成之后, 用於發布-訂閱套路的通訊連接一定建立好了.

零拷貝

接觸過與性能有關的網絡編程的*nix端后台開發的同步一定聽說這這樣的一個術語: 零拷貝(Zero-Copy). 你仔細回想我們通過網絡編程接收, 發送消息的過程. 如果我們要發送一個消息, 我們需要把這個消息傳遞給發送相關的接口, 如果我們需要接收一個消息, 我們需要把我們的緩沖區提供給接收消息的函數.

這里就有一個性能痛點, 特別是在接收消息的時候: 在網絡接口API底層, 一定有另外一個緩沖區率先接收了數據, 之后, 你調用收包函數, 諸如recv這樣的函數, 將你的緩沖區提供給函數, 然后, 數據需要從事先收到數據的緩沖區, 拷貝至你自己提供給API的緩沖區.

如果我們向更底層追究一點, 會發現網絡編程中, 最簡單的發收消息模型里, 至少存在着兩到三次拷貝, 不光收包的過程中有, 發包也有. 上面講到的只是離應用開發者最近的一層發生的拷貝動作. 而實際上, 可能發生拷貝的地方有: 應用程序與API交互層, API與協議棧交互層, 協議棧/內核空間交互層, 等等.

對於更深層次來講, 不是我們應用程序開發者應該關心的地方, 並且時至今日, 從協議棧到離我們最近的那一層, 操作系統基本上都做了避免拷貝的優化. 那么, ZMQ作為一個網絡庫, 在使用的進修, 應用程序開發就應當避免離我們最近的那一次拷貝.

這也是為什么ZMQ庫除了zmq_sendzmq_recv之外, 又配套zmq_msg_t類型再提供了zmq_msg_sendzmq_msg_recv接口的原因. zmq_msg_t內置了一個緩沖區, 可以用來收發消息, 當你使用msg系的接口時, 收與發都發生在zmq_msg_t實例的緩沖區中, 不存在拷貝問題.

總之, 要避免拷貝, 需要以下幾步:

  1. 使用zmq_msg_init_data()創建一個zmq_msg_t實例. 接口返回的是zmq_msg_t的句柄. 應用開發者看不到底層實現.
  2. 發送數據時, 將數據通過memcpy之類的接口寫入zmq_msg_t中, 再傳遞給zmq_msg_send. 接收數據時, 直接將zmq_msg_t句柄傳遞給zmq_msg_recv
  3. 需要注意的是, zmq_msg_t被發送之后, 其中的數據就自動被釋放了. 也就是, 對於同一個zmq_msg_t句柄, 你不能連續兩次調用zmq_msg_send
  4. zmq_msg_t內部使用了引用計數的形式來指向真正存儲數據的緩沖區, 也就是說, zmq_msg_send會將這個計數減一. 當計數為0時, 數據就會被釋放. ZMQ庫對於zmq_msg_t的具體實現並沒有做過多介紹, 也只點到這一層為止.
  5. 所以這時你應該明白, 多個zmq_msg_t是有可能共享同一段二進制數據的. 這也是zmq_msg_copy做的事情. 如果你需要將同一段二進制數據發送多次, 那么請使用zmq_msg_copy來生成額外的zmq_msg_t句柄. 每次zmq_msg_copy操作都將導致真正的數據的引用計數被+1. 每次zmq_msg_send則減1, 引用計數為0, 數據自動釋放.
  6. 數據釋放其實調用的是zmq_msg_close接口. 注意: 在zmq_msg_send被調用之后, ZMQ庫自動調用了zmq_msg_close, 你可以理解為, 在zmq_msg_send內部, 完成數據發送后, 自動調用了zmq_msg_close
  7. 蛋疼的事在收包上. 由於zmq_msg_t的內部實現是一個黑盒, 所以如果要接收數據, 雖然調用zmq_msg_recv的過程中沒有發生拷貝, 但應用程序開發者最終還是需要把數據讀出來. 這就必須有一次拷貝. 這是無法避免的. 或者換一個角度來描述這個蛋疼的點: ZMQ沒有向我們提供真正的零拷貝收包接口. 收包時的拷貝是無可避免的.

最后給大家一個忠告: 拷貝確實是一個后端服務程序的性能問題. 但瓶頸一般不在調用網絡庫時發生的拷貝, 而在於其它地方的拷貝. zmq_msg_t的使用重心不應該在"優化拷貝, 提升性能"這個點上, 而是第三章要提到和進一步深入講解的多幀消息.

在發布-訂閱套路中使用多幀消息, 即"信封"

之前我們講到的發布-訂閱套路里, 發布者廣播的消息全是字符串, 而訂閱者篩選過濾消息也是按字符串匹配前幾個字符, 這種策略有點土. 假如我們能把發布者廣播的消息分成兩段: 消息頭與消息體. 消息頭里寫明信息類型, 消息體里再寫具體的信息內容. 這樣過濾器直接匹配消息頭就能決定這個消息要還是不要, 這就看起來洋氣多了.

ZMQ中使用多幀消息支持這一點. 發布者發布多幀消息時, 訂閱者的過濾器在匹配時, 只匹配第一幀.

多說無益, 來看例子, 在具體展示發布者與訂閱者代碼之前, 需要為我們的zmq_help.h文件再加一個函數, 用於發送多帖消息的s_sendmore

/*
 * 把字符串作為字節數據, 發送至zmq socket, 但不發送字符串末尾的'\0'字節
 * 並且通知socket, 后續還有幀要發送
 * 發送成功時, 返回發送的字節數
 */
static inline int s_sendmore(void * socket, const char * string)
{
    return zmq_send(socket, string, strlen(string), ZMQ_SNDMORE);
}

下面是發布者的代碼:

#include <zmq.h>
#include <unistd.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket = zmq_socket(context, ZMQ_PUB);
    zmq_bind(socket, "tcp://*:5563");

    while(1)
    {
        s_sendmore(socket, "A");
        s_send(socket, "We don't want to see this");
        s_sendmore(socket, "B");
        s_send(socket, "We would like to see this");

        sleep(1);
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);

    return 0;
}

下面是訂閱者的代碼:

#include <zmq.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket = zmq_socket(context, ZMQ_SUB);
    zmq_connect(socket, "tcp://localhost:5563");
    zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "B", 1);

    while(1)
    {
        char * strMsgType = s_recv(socket);
        char * strMsgContent = s_recv(socket);

        printf("[%s] %s\n", strMsgType, strMsgContent);

        free(strMsgType);
        free(strMsgContent);
    }

    zmq_close(socket);
    zmq_ctx_destroy(socket);

    return 0;
}

這里有兩點:
0. 過濾器過濾的是整個消息, 第一幀對不上, 后面所有的幀都不要了
0. ZMQ庫保證, 多幀消息的傳輸是原子性的. 你不會收到一個缺幀的消息

高水位閾值

消息越發越快, 越發越多, 你慢慢的就會意識到一個問題: 內存資源很寶貴, 並且很容易被用盡. 如果你不注意到這一點, 服務器上某個進程阻塞個幾秒鍾, 就炸了.

想象一下這個場景: 在同一台機器上, 有一個進程A在瘋狂的向進程B發送消息. 突然, B覺得很累, 休息了3秒(比如CPU過載, 或者B在跑GC吧, 無所謂什么原因), 這3秒鍾B處理不過來A發送的數據了. 那么在這3秒鍾, A依然瘋狂的試圖向B發送消息, 會發生什么? 如果B有收包緩沖區的話, 這個緩沖區肯定被塞滿了, 如果A有發送緩沖區的話, 這個緩沖區也應該被塞滿了. 剩余的沒被發出去的消息就堆積到A進程的內存空間里, 這個時候如果A程序寫的不好, 那么A進程由於內存被瘋狂占用, 很快就會掛掉.

這是一個消息隊列里的經典問題, 就是消息生產者和消費者的速度不匹配的時候, 消息中間件應當怎么設計的問題. 這個問題的根其實是在B身上, 但B對於消息隊列的設計者來說是不可控的: 這是消息隊列使用者寫的B程序, 你怎么知道那波屌人寫的啥屌代碼? 所以雖然問題由B產生, 但最好還是在A那里解決掉.

最簡單的策略就是: A保留一些緩存能力, 應對突發性的狀況. 超過一定限度的話, 就要扔消息了. 不能把這些生產出來的消息, 發不出去還存着. 這太蠢了.

另外還有一種策略, 如果A只是一個消息中轉者, 可以在超過限度后, 告訴生產消息的上流, 你停一下, 我這邊滿了, 請不要再給我發消息了. 這種情況下的解決方案, 其實就是經典的"流控"問題. 這個方案其實也不好, A只能向上游發出一聲呻吟, 但上游如果執意還是要發消息給A, A也沒辦法去剪網線, 所以轉一圈又回來了: 還是得扔消息.

ZMQ里, 有一個概念叫"高水位閾值", (high-water mark. HWM), 這個值其實是網絡結點自身能緩存的消息的能力. 在ZMQ中, 每一個活動的連接, 即socket, 都有自己的消息緩沖隊列, HWM指的就是這個隊列的容量. 對於某些socket類型, 諸如SUB/PULL/REQ/REP來說, 只有收包隊列. 對於某此socket類型來說, 諸如DEALER/ROUTER/PAIR, 既能收還能發, 就有兩個隊列, 一個用於收包, 一個用於發包.

在ZMQ 2.X版本中, HWM的值默認是無限的. 這種情況下很容易出現我們這一小節開頭講的問題: 發送消息的api接口永遠不會報錯, 對端假死之后內存就會炸. 在ZMQ 3.X版本中, 這個值默認是1000, 這就合理多了.

當socket的HWM被觸及后, 再調用發送消息接口, ZMQ要么會阻塞接口, 要么就扔掉消息. 具體哪種行為取決於sokcet的類型.

  1. 對於PUB和ROUTER類型的socket來說, 會扔數據.
  2. 對於其它類型的socket, 會阻塞接口.

顯然在這種情況下, 如果以非阻塞形式發包, 接口會返回失敗.

另外, 很特殊的是, inproc類型兩端的兩個socket共享同一個隊列: 真實的HWM值是雙方設置的HWM值的總和. 你可以將inproc方式想象成一根管子, 雙方設置HWM時只是在宣稱我需要占用多長的管子, 但真實的管子長度是二者的總和.

最后, 很反直覺的是, HWM的單位是消息個數, 而不是字節數. 這就很有意思了. 另外, HWM觸頂時, 隊列中的消息數量一般不好剛好就等於你設置的HWM值, 真實情況下, 可能會比你設置的HWM值小, 極端情況下可能只有你設置的HWM的一半.

數據丟失問題

當你寫代碼, 編譯, 鏈接, 運行, 然后發現收不到消息, 這個時候你應當這樣排查:

  1. 如果你使用的是SUB類型的socket, 檢查一下有沒有調用zmq_setsockopt設置過濾器

  2. 如果你使用的是SUB類型的socket, 謹記在建立連接過程中, 對端的PUB發送的數據你是收不到了, 如果你確實想要這部分數據, 請做同步處理

  3. 如果你使用的是SUB類型的socket, 上面兩點你都做正確了, 還是有可能收不到消息. 這是因為ZMQ內部的隊列在連接建立之后可能還沒有初始化完成. 這種情況沒什么好的解決辦法, 有兩個土辦法

    1. 讓消息發送方在同步之后再sleep(1)
    2. 如果拓撲允許的話在, 讓SUB socket去執行bind操作, 反過來讓PUB socket去執行connect操作. 這是ZMQ官方給出的糊屎方法, 我都不知道該怎么吐槽.
  4. 如果你使用的是REQ/REP類型的socket, 注意收與發的先后順序, 順序錯了調用收發包接口會報錯, 這個時候如果你忽略掉了報錯, 程序的行為會和丟包差不多.

  5. 如果你在使用PUSH/PULL類型的socket, 如果你發現消息的分發不是公平的, 那可能是因為在發送消息時, 還有PULL沒有與PUSH建立連接, 於是這個PULL就沒有位於公平分發的候選人中. 使用PUSH/PULL要特別注意這一點.

  6. 如果你在線程間共享了socket句柄, 趕緊改代碼, 順便打自己兩巴掌.

  7. 如果你使用的是inproc通訊手段, 那么請確保通信的雙方創建socket時使用的context是同一個context.

  8. ROUTER socket最能造作. 第三章會看到, ROUTER與DEALER涉及到會話追蹤, 如果這部分內容出現異常, 也會類似於數據丟失.

  9. 最后, 如果你確實找不到出錯的原因, 但就是看不到消息, 請考慮向ZeroMQ 社區提問.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM