Libevent設計的精化之一在於把Timer事件、Signal事件和IO事件統一集成在一個Reactor中,以統一的方式去處理這三種不同的事件,更確切的說是把Timer事件和Signal事件融合到了IO多路復用機制中。
Timer事件的融合相對清晰簡單,其套用了Reactor和Proactor模式(如Windows上的IOCP)中處理Timer事件的經典方法,其實Libevent就是一個Reactor嘛。由於IO復用機制(如Linux下的select、epoll)允許使用一個最大等待時間(即最大超時時間)timeout,當超過了這段最大等待時間,即使沒有發生IO事件,也會返回並執行用戶設置好的函數。那么在Libevent中將Timer時間融合到正常的IO事件中的方法就是,把系統IO復用的最大超時時間設置為一系列Timer時間中最小的超時時間。這樣就能如我們所願在IO事件能順利執行的情況下我們去執行IO事件而忽略Timer事件,如果到達timeout時間沒有IO事件執行,系統復用也會因為超時而返回,這時候剛好就能執行Timer事件。
而Signal事件統一到系統的IO復用機制中就沒那么自然了,由於Signal事件的出現的隨機的,進程不能只是測試一個變量來判別是否發生了一個信號,而是必須告訴內核“在此信號發生時,請執行如下的操作”。這似乎是一件不可以完成的任務,但有時候我們只要換個角度思考,就會發現到達終點的路不只有一條,雖然不是筆直的那條。我們發現,當Signal事件發生時,只要觸發系統的IO復用機制,使其返回,再去統一處理所有的待處理事件就可以了。那么如何觸發呢?最簡單的就是當一個套接字可讀時,IO復用就能被觸發,而我們的任務就是在Signal事件發生時,向這個套接字(假設為A)的另一端(另一個套接字,我們稱之為B)寫入數據(不用多,一個字節即可)即可使套接字A有數據讀,使得IO復用返回繼而處理事件。而用戶只需要向Reactor在套接字A處注冊一個persist的可讀事件,就如同注冊其他事件一樣,把Signal事件融合到IO復用機制中。
上述的套接字A和B由於都在本地,目的是為了實現兩個進程之間的通信,可以將其看作是一個"數據結構",成為socketpair,在任何一個套接字上寫數據都能發送到另一個套接字,是一個全雙工的實現。進程間的通信我們最直接的辦法就是使用pipe,Linux 提供了 popen 和 pclose 函數,用於創建和關閉管道與另外一個進程進行通信。
FILE *popen(const char *command, const char *mode); int pclose(FILE *stream);
遺憾的是,popen 創建的管道只能是單向的 -- mode 只能是 "r" 或 "w" 而不能是某種組合--用戶只能選擇要么往里寫,要么從中讀,而不能同時在一個管道中進行讀寫。如果非得用pipe來實現“全雙工”,就要popen兩次,打開兩個管道。有沒有更簡單的辦法呢?答案就是上述我們所講到的socketpair,而BSD的內核已經實現了一個socketpair函數,該系統調用能創建一對已連接的UNIX族socket。在Linux中,完全可以把這一對socket當成pipe返回的文件描述符一樣使用,唯一的區別就是這一對文件描述符中的任何一個都可讀和可寫,函數原型如下:
int socketpair(int d, int type, int protocol, int sv[2]);
socketpair()函數建立一對匿名的已經連接的套接字,其特性由協議族d、類型type、協議protocol決定,建立的兩個套接字描述符會放在sv[0]和sv[1]中。
第1個參數d,表示協議族,只能為AF_LOCAL或者AF_UNIX;
第2個參數type,表示類型,只能為0。
第3個參數protocol,表示協議,可以是SOCK_STREAM或者SOCK_DGRAM。用SOCK_STREAM建立的套接字對是管道流,與一般的管道相區別的是,套接字對建立的通道是雙向的,即每一端都可以進行讀寫。參數sv,用於保存建立的套接字對。
關於Unix域協議和源自BSD的socketpair函數在《Unix網絡編程 卷1 <第三版>》中第15章Stevens先生已經給我們詳細的講解了,在中文版第330頁也有使用socketpair來實現描述符傳遞的例子,可仔細研讀。
Libevent中也有對socketpair的實現,由於在原來的函數中有一些特定的宏和變量名,直接閱讀和使用會不方便,所以我將他抽出來進行通用化(^_^),作為一個可復用的函數,供學習和使用。下面是源碼:
#include <stdio.h> #include <stdlib.h> #include <netinet/in.h> #include <sys/socket.h> #include <sys/types.h> /** * 創建一個SocketPair,通過返回的兩個fd可以進行進程間通信 * @param family : 套接字對間使用的協議族,可以是AF_INET或AF_LOCAL * @param type : 套接字類型 * @param protocol : 協議類型 * @param fd[2] : 將要創建的Socketpair兩端的文件描述符 */ int Socketpair(int family, int type, int protocol, int fd[2]) { int32_t listener = -1; int32_t connector = -1; int32_t acceptor = -1; struct sockaddr_in listen_addr; struct sockaddr_in connect_addr; unsigned int size; if (protocol || (family != AF_INET && family != AF_LOCAL)) { fprintf(stderr, "EAFNOSUPPORT\n"); return -1; } if (!fd) { fprintf(stderr, "EINVAL\n"); return -1; } /*創建listener,監聽本地的換回地址,端口由內核分配*/ listener = socket(AF_INET, type, 0); if (listener < 0) return -1; memset(&listen_addr, 0, sizeof(listen_addr)); listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); listen_addr.sin_port = 0; /* kernel chooses port. */ if (bind(listener, (struct sockaddr *) &listen_addr, sizeof(listen_addr)) == -1) goto fail; if (listen(listener, 1) == -1) goto fail; /*創建connector, 連接到listener, 作為Socketpair的一端*/ connector = socket(AF_INET, type, 0); if (connector < 0) goto fail; /* We want to find out the port number to connect to. */ size = sizeof(connect_addr); if (getsockname(listener, (struct sockaddr *) &connect_addr, &size) == -1) goto fail; if (size != sizeof(connect_addr)) goto fail; if (connect(connector, (struct sockaddr *) &connect_addr, sizeof(connect_addr)) == -1) goto fail; size = sizeof(listen_addr); /*調用accept函數接受connector的連接,將返回的文件描述符作為Socketpair的另一端*/ acceptor = accept(listener, (struct sockaddr *) &listen_addr, &size); if (acceptor < 0) goto fail; if (size != sizeof(listen_addr)) goto fail; close(listener); /** * 至此,我們已經創建了兩個連接在一起的文件描述符, * 通過向其中任意一個發送數據,都會“轉發”到另一個,即可以實現進程間的通信 */ /* Now check we are talking to ourself by matching port and host on the two sockets. */ if (getsockname(connector, (struct sockaddr *) &connect_addr, &size) == -1) goto fail; if (size != sizeof(connect_addr) || listen_addr.sin_family != connect_addr.sin_family || listen_addr.sin_addr.s_addr != connect_addr.sin_addr.s_addr || listen_addr.sin_port != connect_addr.sin_port) goto fail; fd[0] = connector; fd[1] = acceptor; return 0; fail: if (listener != -1) close(listener); if (connector != -1) close(connector); if (acceptor != -1) close(acceptor); return -1; }
簡單測試一下:
#include <sys/types.h> #include <sys/socket.h> #include <stdlib.h> #include <stdio.h> int Socketpair(int, int, int, int[]); int main () { int fds[2]; int r = Socketpair(AF_INET, SOCK_STREAM, 0, fds); if (r < 0) { perror( "socketpair()" ); exit( 1 ); } if(fork()) { /* Parent process: echo client */ int val = 0; close( fds[1] ); while ( 1 ) { sleep(1); ++val; printf( "Sending data: %d\n", val ); write( fds[0], &val, sizeof(val) ); read( fds[0], &val, sizeof(val) ); printf( "Data received: %d\n", val ); } } else { /* Child process: echo server */ int val; close( fds[0] ); while ( 1 ) { read( fds[1], &val, sizeof(val) ); ++val; write( fds[1], &val, sizeof(val) ); } } }
測試結果:
=============================神奇的分割線============================
源碼請猛戳{ 這里 }
================================================================
參考資料:
Libevent源碼