在zeromq源碼分析筆記之架構說到了zmq的整體架構,可以看到線程間通信包括兩類,一類是用於收發命令,告知對象該調用什么方法去做什么事情,命令的結構由command_t結構體確定;另一類是socket_base_t實例與session的消息通信,消息的結構由msg_t確定。命令的發送與存儲是通過mailbox_t實現的,消息的發送和存儲是通過pipe_t實現的,這兩個結構都會詳細說到,今天先說一下線程間的收發命令。
zeromq的線程可分為兩類,一類是io線程,像reaper_t、io_thread_t都屬於這一類,這類線程的特點就是內含一個輪詢器poller及mailbox_t,通過poller可以監聽激活mailbox_t的信號 ;另一類是zmq的socket,所有socket_base_t實例化的對象都可以看做一個單獨的線程,這類線程不含poller,但同樣含有一個mailbox_t,可以用於收發命令,由於不含poller,只能在每次使用socket_base_t實例的時候先處理一下mailbox_t,看是否有命令需要處理,代碼上來看就是每次先調用下面這個函數接收並處理一下命令:
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
另外,兩類線程發送命令的方式是一致的。下面,就詳細的說一下命令結構、如何發送命令、兩類線程如何接收命令
1、命令
先看一下命令結構(詳細的結構參見源碼Command.hpp):
// This structure defines the commands that can be sent between threads.
struct command_t
{
// Object to process the command.
zmq::object_t *destination; enum type_t { ... } type; union { ... } args; };
可以看到,命令由三部分構成,分別是發往的目的地destination,命令的類型type,命令的參數args。所謂的命令就是一個對象交代另一個對象去做某件事情,說白了就是告訴令一個對象應該調用哪個方法,命令的發出者是一個對象,而接收者是一個線程,線程接收到命令后,根據目的地派發給相應的對象做處理。可以看到命令的destination屬性是object_t類型的,在上節介紹類的層次結構圖時,說到object_t及其子類都具有發送和處理命令的功能(沒有收命令的功能),所以有必要弄清楚一件事,對象、object_t、poller、線程、mailbox_t、命令是什么關系?
- 在zmq中,每個線程都會擁有一個信箱,命令收發功能底層都是由信箱實現的
- zmq提供了object_t類,用於使用線程信箱發送命令的功能(object_t類還有其他的功能),object_t還有處理命令的功能。
- io線程內還有一個poller用於監聽激活mailbox_t的信號,線程收到激活信號后,會去mailbox_t中讀命令,然后把命令交由object_t處理
簡單來說就是,object_t發命令,poller監聽命令到來信號告知線程收命令,交給object_t處理。無論是object_t、還是線程本身、還是poller其實都操作mailbox_t,object_t、poller、mailbox_t都綁定在同一個線程上。下面就來看看具體的如何發送命令
2、發命令
一個對象想使用線程的發命令功能,其類就得繼承自object_t(源碼在Object.hpp/.cpp):
class object_t { public: object_t (zmq::ctx_t *ctx_, uint32_t tid_); void process_command (zmq::command_t &cmd_); ... protected: ... private: zmq::ctx_t *ctx;// Context provides access to the global state. uint32_t tid;// Thread ID of the thread the object belongs to. void send_command (command_t &cmd_); }
可以看到,object_t內含一個tid,含義就是,該object_t對象要使用哪個線程的mailbox_t。關於zmq::ctx_t,在zmq中被稱為上下文語境,上下文語境簡單來說就是zmq的存活環境,里面存儲是一些全局對象,zmq中所有的線程都可以使用這些對象。zmq線程中的mailbox_t對象會被zmq存儲在ctx_t對象中。zmq的做法就是,在上下文語境中使用一個容器slots裝載線程的mailbox,在新建線程的時候,給線程分配一個線程標志tid和mailbox,把mailbox放入容器的tid那個位置,代碼來說就是slots[tid]=mailbox。有了這個基礎,線程A給線程B發命令就只要往slots[B.tid]寫入命令就可以了:
void zmq::object_t::send_command (command_t &cmd_) { ctx->send_command (cmd_.destination->get_tid (), cmd_); } void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); } void zmq::mailbox_t::send (const command_t &cmd_) { sync.lock(); cpipe.write (cmd_, false); bool ok = cpipe.flush (); sync.unlock (); if (!ok) signaler.send (); }
3、io線程收命令
前面說過,每個io線程都含有一個poller,io線程的結構如下(源碼在Io_thread_t.hpp/.cpp):
class io_thread_t : public object_t, public i_poll_events { public: io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_); ~io_thread_t (); void start (); // Launch the physical thread. void stop ();// Ask underlying thread to stop. ... private: mailbox_t mailbox;// I/O thread accesses incoming commands via this mailbox. poller_t::handle_t mailbox_handle;// Handle associated with mailbox' file descriptor. poller_t *poller;// I/O multiplexing is performed using a poller object. } zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : object_t (ctx_, tid_) { poller = new (std::nothrow) poller_t; alloc_assert (poller); mailbox_handle = poller->add_fd (mailbox.get_fd (), this); poller->set_pollin (mailbox_handle); }
構造函數中把mailbox_t句柄加入poller中,讓poller監聽其讀事件,所以,如果有信號發過來,poller會被喚醒,並調用io_thread_t的in_event:
void zmq::io_thread_t::in_event () { // TODO: Do we want to limit number of commands I/O thread can // process in a single go? command_t cmd; int rc = mailbox.recv (&cmd, 0); while (rc == 0 || errno == EINTR) {//如果讀管道中有內容或者等待信號的時候被中斷,將一直讀取 if (rc == 0) cmd.destination->process_command (cmd); rc = mailbox.recv (&cmd, 0); } errno_assert (rc != 0 && errno == EAGAIN); }
可以看到,in_event使用了mailbox_t的接收命令的功能。接收到命令之后,調用destination處理命令的功能去處理命令。
4、socket_base_t線程收命令
上一篇說過socket_base_t的每個實例都可以看成一個zmq線程,但是比較特殊,並沒有使用poller,而是在使用到socket的下面幾個方法的時候去檢查是否有未處理的命令:
int zmq::socket_base_t::getsockopt (int option_, void *optval_,size_t *optvallen_) int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::term_endpoint (const char *addr_) int zmq::socket_base_t::send (msg_t *msg_, int flags_) int zmq::socket_base_t::recv (msg_t *msg_, int flags_) void zmq::socket_base_t::in_event ()//這個函數只有在銷毀socke的時候會被用到,在后面講zmq_close的時候會說到
檢查的手段就是調用process_commands方法:
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) { int rc; command_t cmd; if (timeout_ != 0) { // If we are asked to wait, simply ask mailbox to wait. rc = mailbox.recv (&cmd, timeout_); } else { some code rc = mailbox.recv (&cmd, 0); } // Process all available commands. while (rc == 0) { cmd.destination->process_command (cmd); rc = mailbox.recv (&cmd, 0); } some code }
可見,最終都是使用mailbox_t的接收命令的功能。
這里有一個值得思考的問題,為什么socket_base_t實例這個線程不使用poller呢?每次使用上面那些方法的時候去檢查不是很麻煩嗎?
說一下個人理解,不見得正確。socket_base_t實例之所以被認為是一個特殊的線程,是因為其和io_thread_t一樣,都具有收發命令的功能,(關於這點可以看一下io_thread_t的源碼,可以發現其主要功能就是收發命令),但是socket_base_t實例是由用戶線程創建的,也就是依附於用戶線程,而zmq中所有通信都是異步了,所以用戶線程是不能被阻塞的,一旦使用poller,線程將被阻塞,也就違背了設計初衷。
5、mailbox_t
上面說到線程間收發命令都是通過mailbox_t實現的,現在就來看看mailbox_t到底是如何實現的,mailbox_t的聲明如下(源碼位於Mailbox.hpp/.cpp),其中藍色加粗中文字體都是我本人的注釋,英文為原注釋,以后所有源碼注釋都是這個含義:
class mailbox_t
{
public: mailbox_t (); ~mailbox_t (); fd_t get_fd (); void send (const command_t &cmd_); int recv (command_t *cmd_, int timeout_); private: typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t; // The pipe to store actual commands. cpipe_t cpipe; signaler_t signaler;// Signaler to pass signals from writer thread to reader thread. // There's only one thread receiving from the mailbox, but there // is arbitrary number of threads sending. Given that ypipe requires // synchronised access on both of its endpoints, we have to synchronise // the sending side. mutex_t sync;//只有一個線程從mailbox中接受消息,但是會有大量的線程往mailbox中發送消息,鑒於ypipe需要同步訪問兩端的兩端,我們必須同步發送端 bool active; // True if the underlying pipe is active, ie. when we are allowed to read commands from it. // Disable copying of mailbox_t object. mailbox_t (const mailbox_t&); const mailbox_t &operator = (const mailbox_t&); };
mailbox_t中的有幾個屬性很關鍵,有必要說一下
- cpipe,后面可能會稱之為管道,ypipe_t類型,在zmq的實現中ypipe_t是一個單生產者單消費者無鎖隊列(下一篇會詳細介紹),只有一個讀命令線程和一個寫命令線程的時候是線程安全的。ypipe_t的安全性誰使用誰負責。命令都是存儲在cpipe中的。
- sync,由於mailbox_t底層使用的是ypipe_t,而且多個線程向一個線程發命令的場景是很常見的,所以要互斥ypipe_t的發送端。
- signaler,通知命令接受方,現在信箱mailbox中有命令了,你可以去讀了,從代碼的角度就是通知接受方mailbox_t把active設置為true。signaler的底層根據不同平台有不同實現,本質上可以看成一個socketpair,這個東西比較重要,應該先man一下,我這里不多說。
- active,管道中是否有命令可讀
先來想一個問題,既然signaler可作為信號通知,為何還要active這個屬性?然后帶着問題看源碼
現在來看,線程th1如何向線程th2發送命令?在zmq中是這么做的,th1先把命令寫入th2的管道cpipe中,然后刷新th2的管道,再使用signaler發送一個信號給th2,告訴th2我向你的管道寫了一個命令,你可以去管道讀命令了。
void zmq::mailbox_t::send (const command_t &cmd_)
{
sync.lock();//互斥寫命令端
//關於cpipe的詳細實現,會在下一篇詳細的介紹,現在只需要知道函數的功能就可以了 cpipe.write (cmd_, false);//向接受送方mailbox_t管道寫入命令,在沒有調用flush之前,接收方看不到這個命令 bool ok = cpipe.flush ();//刷新管道,這個時候接收方能看到剛才那條命令了 sync.unlock (); if (!ok) signaler.send ();//發送信號給接受命令的一方 }
再說th2讀命令,如果th2是socket_base_t實例線程,先調用process_commands,process_commands會調用循環調用mailbox_t的recv函數,直到沒命令可讀退出循環;如果th2是io_thread_t這類線程,會有poller監聽信號的到來,然后調用線程的in_event,in_event又會循環調用mailbox_t的recv函數,直到沒命令可讀退出循環,並睡眠,等待再次被信號喚醒。需要注意的是,這兩類線程對發送過的信號都在mailbox_t的recv函數中處理的。現在就來看一下mailbox_t是如何接收命令的:
int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{
// Try to get the command straight away.
if (active) {//開始的時候,信箱是未激活狀態
bool ok = cpipe.read (cmd_); if (ok) return 0; // If there are no more commands available, switch into passive state. // 沒有命令可讀時,先把信箱設置為未激活狀態,表示沒命令可讀,然后把對方發過來的激活信箱的信號處理一下(沒什么特殊的處理,就是接受一下) active = false; signaler.recv (); } // Wait for signal from the command sender. int rc = signaler.wait (timeout_);//signaler.wait的返回值有三種①wait函數出錯,返回-1,並且設置errno=EINTR②返回-1並且errno=EAGAIN,表示信號沒等到③等到信號。 if (rc != 0 && (errno == EAGAIN || errno == EINTR))//這里對應wait的前兩種情況 return -1; // We've got the signal. Now we can switch into active state. active = true;//等到激活信箱的信號了,激活信箱 // Get a command. errno_assert (rc == 0); bool ok = cpipe.read (cmd_); zmq_assert (ok); return 0; }
從代碼上來看,recv是這樣工作的,先檢查信箱是否激活,如果已經被激活,直接讀命令退出;如果沒激活,先去等激活信號,等到了就讀命令退出,沒等到就直接退出。需要注意的是,調用recv的函數都在recv上包裹了一個while,大概是這種形式while(true){ mailbox.recv() ;},(可以看上面源碼是怎么調用recv的),也就是調用者會一直調用recv讀命令,直到讀不出命令為止,然后把激活信號取走,把信箱設置未激活態。這就是接收命令的流程。
所以,active和signaler是這樣合作的:寫命令線程每寫一條命令,先去檢查讀命令線程是否阻塞,如果阻塞,會調用讀命令線程mailbox_t中的signaler,發送一個激活讀線程mailbox_t的信號,讀線程收到這個命令后在recv函數中把activ設置為true,這時,讀線程循環調用recv的時候,發現active為true,就會一直讀命令,直到沒命令可讀時,又把active設置為false,等待下一次信號到來。
現在可以回答上面那個問題了,active是否多余?
先試想一下如果不使用active,每寫一條命令都必須發送一個信號讀讀線程,在大並發的情況下,這也是一筆消耗。而使用active,只需要在讀線程睡眠的時候(沒有命令可讀時,io_thread_t這類線程會睡眠,socket_base_t實例線程特殊,不會睡眠)發送信號喚醒讀線程就可以,可以節省大量的資源。
有關mailbox_t的命令存儲結構cpipe在下一篇詳細介紹