kqueue用法簡介


1.什么是kqueue和IO復用

kueue是在UNIX上比較高效的IO復用技術。 所謂的IO復用,就是同時等待多個文件描述符就緒,以系統調用的形式提供。如果所有文件描述符都沒有就緒的話,該系統調用阻塞,否則調用返回,允許用戶進行后續的操作。 常見的IO復用技術有select, poll, epoll以及kqueue等等。其中epoll為Linux獨占,而kqueue則在許多UNIX系統上存在,包括OS X(好吧,現在叫macOS了。。)

2. 使用概覽

kueue在設計上是非常簡潔的,在易用性上可能比select和epoll更好一些。 使用kqueue的大致代碼如下:(后面會給出一個完整的示例)


const static int FD_NUM = 2 // 要監視多少個文件描述符

int kq = kqueue(); // kqueue對象

// kqueue的事件結構體,不需要直接操作
struct kevent changes[FD_NUM]; // 要監視的事件列表
struct kevent events[FD_NUM]; // kevent返回的事件列表(參考后面的kevent函數)

int stdin_fd = STDIN_FILENO;
int stdout_fd = STDOUT_FILENO;

// 在changes列表中注冊標准輸入流的讀事件 以及 標准輸出流的寫事件
// 最后一個參數可以是任意的附加數據(void * 類型),在這里給事件附上了當前的文件描述符,后面會用到
EV_SET(&changes[0], stdin_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &stdin_fd); 
EV_SET(&changes[1], stdout_fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, &stdin_fd);

// 進行kevent函數調用,如果changes列表里有任何就緒的fd,則把該事件對應的結構體放進events列表里面
// 返回值是這次調用得到了幾個就緒的事件 (nev = number of events)
int nev = kevent(kq, changes, FD_NUM, events, FD_NUM, NULL); // 已經就緒的文件描述符數量
for(int i=0; i<nev; i++){
    struct kevent event = events[i]; // 一個個取出已經就緒的事件

    int ready_fd = *((int *)event.udata); // 從附加數據里面取回文件描述符的值
    if( ready_fd == stdin_fd ){
        // 讀取ready_fd
    }else if( ready_fd == stdin_fd ){
        // 寫入ready_fd
    }
}

3. 相關結構體與函數解析

可以看出來,kqueue體系只有三樣東西:struct kevent結構體,EV_SET宏以及kevent函數。

struct kevent 結構體內容如下:

struct kevent {
    uintptr_t       ident;          /* identifier for this event,比如該事件關聯的文件描述符 */
    int16_t         filter;         /* filter for event,可以指定監聽類型,如EVFILT_READ,EVFILT_WRITE,EVFILT_TIMER等 */
    uint16_t        flags;          /* general flags ,可以指定事件操作類型,比如EV_ADD,EV_ENABLE, EV_DELETE等 */
    uint32_t        fflags;         /* filter-specific flags */
    intptr_t        data;           /* filter-specific data */
    void            *udata;         /* opaque user data identifier,可以攜帶的任意數據 */
};

EV_SET 是用於初始化kevent結構的便利宏,其簽名為:

EV_SET(&kev, ident, filter, flags, fflags, data, udata);

可以發現和kevent結構體完全對應,除了第一個,它就是你要初始化的那個kevent結構。

kevent 是真正進行IO復用的函數,其簽名為:

int kevent(int kq, 
    const struct kevent *changelist, // 監視列表
    int nchanges, // 長度
    struct kevent *eventlist, // kevent函數用於返回已經就緒的事件列表
    int nevents, // 長度
    const struct timespec *timeout); // 超時限制

4. 完整示例

下面給出一個完整的示例,這個程序將從標准輸入中讀取數據,寫到標准輸出中。其中輸入輸出全部使用kqueue來進行IO復用。可以使用重定向把文件寫入標准輸入來進行測試。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/event.h>
#include <errno.h>
#include <string.h>

// 為文件描述符打開對應狀態位的工具函數
void turn_on_flags(int fd, int flags){
    int current_flags;
    // 獲取給定文件描述符現有的flag
    // 其中fcntl的第二個參數F_GETFL表示要獲取fd的狀態
    if( (current_flags = fcntl(fd, F_GETFL)) < 0 ) exit(1);

    // 施加新的狀態位
    current_flags |= flags;
    if( fcntl(fd, F_SETFL, current_flags) < 0 ) exit(1);
}

// 錯誤退出的工具函數
int quit(const char *msg){
    perror(msg);
    exit(1);
}

const static int FD_NUM = 2; // 兩個文件描述符,分別為標准輸入與輸出
const static int BUFFER_SIZE = 1024; // 緩沖區大小

// 完全以IO復用的方式讀入標准輸入流數據,輸出到標准輸出流中
int main(){
    struct kevent changes[FD_NUM];
    struct kevent events[FD_NUM];

    // 創建一個kqueue
    int kq;
    if( (kq = kqueue()) == -1 ) quit("kqueue()");

    // 准備從標准輸入流中讀數據
    int stdin_fd = STDIN_FILENO;
    int stdout_fd = STDOUT_FILENO;

    // 設置為非阻塞
    turn_on_flags(stdin_fd, O_NONBLOCK);
    turn_on_flags(stdout_fd, O_NONBLOCK);

    // 注冊監聽事件
    int k = 0;
    EV_SET(&changes[k++], stdin_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &stdin_fd);
    EV_SET(&changes[k++], stdout_fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, &stdout_fd);

    int nev, nread, nwrote = 0; // 發生事件的數量, 已讀字節數, 已寫字節數
    char buffer[BUFFER_SIZE];

    while(1){
        nev = kevent(kq, changes, FD_NUM, events, FD_NUM, NULL); // 已經就緒的文件描述符數量
        if( nev <= 0 ) quit("kevent()");

        int i;
        for(i=0; i<nev; i++){
            struct kevent event = events[i];
            if( event.flags & EV_ERROR ) quit("Event error");

            int ev_fd = *((int *)event.udata);

            // 輸入流就緒 且 緩沖區還有空間能繼續讀
            if( ev_fd == stdin_fd && nread < BUFFER_SIZE ){
                int new_nread;
                if( (new_nread = read(ev_fd, buffer + nread, sizeof(buffer) - nread)) <= 0 )
                    quit("read()"); // 由於可讀事件已經發生,因此如果讀出0個字節也是不正常的
                
                nread += new_nread; // 遞增已讀數據字節數
            }

            // 輸出流就緒 且 緩沖區有內容可以寫出
            if( ev_fd == stdout_fd && nread > 0 ){
                if( (nwrote = write(stdout_fd, buffer, nread)) <=0 )
                    quit("write()");

                memmove(buffer, buffer+nwrote, nwrote); // 為了使實現的代碼更簡潔,這里把還沒有寫出去的數據往前移動
                nread -= nwrote; // 減去已經寫出去的字節數
            }
        }
    }

    return 0;
}

程序中對stdin和stdout設置非阻塞的原因是我們希望有多少就緒的數據就讀多少,或者能寫入多少進緩沖區就寫入多少。否則在阻塞模式下,如果read沒有填滿buffer(文件沒讀完時),或者還有buffer數據沒寫入時,系統調用(read和write)會阻塞,這會對性能造成很大影響。因此這里設置為非阻塞模式。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM