Libevent學習之SocketPair實現


  Libevent設計的精化之一在於把Timer事件、Signal事件和IO事件統一集成在一個Reactor中,以統一的方式去處理這三種不同的事件,更確切的說是把Timer事件和Signal事件融合到了IO多路復用機制中。

  Timer事件的融合相對清晰簡單,其套用了Reactor和Proactor模式(如Windows上的IOCP)中處理Timer事件的經典方法,其實Libevent就是一個Reactor嘛。由於IO復用機制(如Linux下的select、epoll)允許使用一個最大等待時間(即最大超時時間)timeout,當超過了這段最大等待時間,即使沒有發生IO事件,也會返回並執行用戶設置好的函數。那么在Libevent中將Timer時間融合到正常的IO事件中的方法就是,把系統IO復用的最大超時時間設置為一系列Timer時間中最小的超時時間。這樣就能如我們所願在IO事件能順利執行的情況下我們去執行IO事件而忽略Timer事件,如果到達timeout時間沒有IO事件執行,系統復用也會因為超時而返回,這時候剛好就能執行Timer事件。

  而Signal事件統一到系統的IO復用機制中就沒那么自然了,由於Signal事件的出現的隨機的,進程不能只是測試一個變量來判別是否發生了一個信號,而是必須告訴內核“在此信號發生時,請執行如下的操作”。這似乎是一件不可以完成的任務,但有時候我們只要換個角度思考,就會發現到達終點的路不只有一條,雖然不是筆直的那條。我們發現,當Signal事件發生時,只要觸發系統的IO復用機制,使其返回,再去統一處理所有的待處理事件就可以了。那么如何觸發呢?最簡單的就是當一個套接字可讀時,IO復用就能被觸發,而我們的任務就是在Signal事件發生時,向這個套接字(假設為A)的另一端(另一個套接字,我們稱之為B)寫入數據(不用多,一個字節即可)即可使套接字A有數據讀,使得IO復用返回繼而處理事件。而用戶只需要向Reactor在套接字A處注冊一個persist的可讀事件,就如同注冊其他事件一樣,把Signal事件融合到IO復用機制中。


  上述的套接字A和B由於都在本地,目的是為了實現兩個進程之間的通信,可以將其看作是一個"數據結構",成為socketpair,在任何一個套接字上寫數據都能發送到另一個套接字,是一個全雙工的實現。進程間的通信我們最直接的辦法就是使用pipe,Linux 提供了 popen 和 pclose 函數,用於創建和關閉管道與另外一個進程進行通信。

FILE *popen(const char *command, const char *mode);  
int pclose(FILE *stream);  

  遺憾的是,popen 創建的管道只能是單向的 -- mode 只能是 "r" 或 "w" 而不能是某種組合--用戶只能選擇要么往里寫,要么從中讀,而不能同時在一個管道中進行讀寫。如果非得用pipe來實現“全雙工”,就要popen兩次,打開兩個管道。有沒有更簡單的辦法呢?答案就是上述我們所講到的socketpair,而BSD的內核已經實現了一個socketpair函數,該系統調用能創建一對已連接的UNIX族socket。在Linux中,完全可以把這一對socket當成pipe返回的文件描述符一樣使用,唯一的區別就是這一對文件描述符中的任何一個都可讀和可寫,函數原型如下:

int socketpair(int d, int type, int protocol, int sv[2]); 

  socketpair()函數建立一對匿名的已經連接的套接字,其特性由協議族d、類型type、協議protocol決定,建立的兩個套接字描述符會放在sv[0]和sv[1]中。
  第1個參數d,表示協議族,只能為AF_LOCAL或者AF_UNIX;
  第2個參數type,表示類型,只能為0。
  第3個參數protocol,表示協議,可以是SOCK_STREAM或者SOCK_DGRAM。用SOCK_STREAM建立的套接字對是管道流,與一般的管道相區別的是,套接字對建立的通道是雙向的,即每一端都可以進行讀寫。參數sv,用於保存建立的套接字對。

  關於Unix域協議和源自BSD的socketpair函數在《Unix網絡編程 卷1 <第三版>》中第15章Stevens先生已經給我們詳細的講解了,在中文版第330頁也有使用socketpair來實現描述符傳遞的例子,可仔細研讀。

  Libevent中也有對socketpair的實現,由於在原來的函數中有一些特定的宏和變量名,直接閱讀和使用會不方便,所以我將他抽出來進行通用化(^_^),作為一個可復用的函數,供學習和使用。下面是源碼:

#include <stdio.h>
#include <stdlib.h>

#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>

/**
 *  創建一個SocketPair,通過返回的兩個fd可以進行進程間通信
 *  @param family : 套接字對間使用的協議族,可以是AF_INET或AF_LOCAL
 *  @param type : 套接字類型
 *  @param protocol : 協議類型
 *  @param fd[2] : 將要創建的Socketpair兩端的文件描述符
 */
int
Socketpair(int family, int type, int protocol, int fd[2])
{
    int32_t listener = -1;
    int32_t connector = -1;
    int32_t acceptor = -1;
    struct sockaddr_in listen_addr;
    struct sockaddr_in connect_addr;
    unsigned int size;

    if (protocol || 
            (family != AF_INET && family != AF_LOCAL)) {
        fprintf(stderr, "EAFNOSUPPORT\n");
        return -1;
    }
    if (!fd) {
        fprintf(stderr, "EINVAL\n");
        return -1;
    }

    /*創建listener,監聽本地的換回地址,端口由內核分配*/
    listener = socket(AF_INET, type, 0);
    if (listener < 0)
        return -1;
    memset(&listen_addr, 0, sizeof(listen_addr));
    listen_addr.sin_family = AF_INET;
    listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    listen_addr.sin_port = 0;    /* kernel chooses port.    */
    if (bind(listener, (struct sockaddr *) &listen_addr, sizeof(listen_addr))
            == -1)
        goto fail;
    if (listen(listener, 1) == -1)
        goto fail;

    /*創建connector, 連接到listener, 作為Socketpair的一端*/
    connector = socket(AF_INET, type, 0);
    if (connector < 0)
        goto fail;
    /* We want to find out the port number to connect to.  */
    size = sizeof(connect_addr);
    if (getsockname(listener, (struct sockaddr *) &connect_addr, &size) == -1)
        goto fail;
    if (size != sizeof(connect_addr))
        goto fail;
    if (connect(connector, (struct sockaddr *) &connect_addr,
                sizeof(connect_addr)) == -1)
        goto fail;

    size = sizeof(listen_addr);
    /*調用accept函數接受connector的連接,將返回的文件描述符作為Socketpair的另一端*/
    acceptor = accept(listener, (struct sockaddr *) &listen_addr, &size);
    if (acceptor < 0)
        goto fail;
    if (size != sizeof(listen_addr))
        goto fail;
    close(listener);
    
    /**
     * 至此,我們已經創建了兩個連接在一起的文件描述符,
     * 通過向其中任意一個發送數據,都會“轉發”到另一個,即可以實現進程間的通信
     */
     
    /* Now check we are talking to ourself by matching port and host on the
       two sockets.     */
    if (getsockname(connector, (struct sockaddr *) &connect_addr, &size) == -1)
        goto fail;
    if (size != sizeof(connect_addr)
            || listen_addr.sin_family != connect_addr.sin_family
            || listen_addr.sin_addr.s_addr != connect_addr.sin_addr.s_addr
            || listen_addr.sin_port != connect_addr.sin_port)
        goto fail;
    fd[0] = connector;
    fd[1] = acceptor;

    return 0;

fail:
    if (listener != -1)
        close(listener);
    if (connector != -1)
        close(connector);
    if (acceptor != -1)
        close(acceptor);

    return -1;
}

簡單測試一下:

#include <sys/types.h>
#include <sys/socket.h>

#include <stdlib.h>
#include <stdio.h>

int Socketpair(int, int, int, int[]);

int main ()
{
    int fds[2];

    int r = Socketpair(AF_INET, SOCK_STREAM, 0, fds);
    if (r < 0) {
        perror( "socketpair()" );
        exit( 1 );
    }

    if(fork()) {
        /*  Parent process: echo client */
        int val = 0;
        close( fds[1] );
        while ( 1 ) {
            sleep(1);
            ++val;
            printf( "Sending data: %d\n", val );
            write( fds[0], &val, sizeof(val) );
            read( fds[0], &val, sizeof(val) );
            printf( "Data received: %d\n", val );
        }
    }
    else {
        /*  Child process: echo server */
        int val;
        close( fds[0] );
        while ( 1 ) {
            read( fds[1], &val, sizeof(val) );
            ++val;
            write( fds[1], &val, sizeof(val) );
        }
    }
}

測試結果:

 

=============================神奇的分割線============================

                                                               源碼請猛戳{ 這里

================================================================

 

參考資料:

Linux上實現雙向進程間通信管道(socketpair)

libevent源碼深度剖析

Libevent源碼

《Unix網絡編程 卷一 <第三版>》

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM