在 Windows 平台上不可用。
Libeio是全功能的用於C語言的異步I/O庫,建模風格和秉承的精神與libev類似。特性包括:異步的read、write、open、close、stat、unlink、fdatasync、mknod、readdir等(基本上是完整的POSIX API)。
Libeio完全基於事件庫,可以容易地集成到事件庫(或獨立,甚至是以輪詢方式)使用。Libeio非常輕便,且只依賴於POSIX線程。
Libeio當前的源碼,文檔,集成和輕便性都在libev之下,但應該很快可以用於生產環境了。
Libeio是用多線程實現的異步I/O庫.主要步驟如下:
- 主線程接受請求,將請求放入請求隊列,喚醒子線程處理。這里主線程不會阻塞,會繼續接受請求
- 子線程處理請求,將請求回執放入回執隊列,並調用用戶自定義方法,通知主線程有請求已處理完畢
- 主線程處理回執。
源碼中提供了一個demo.c用於演示,精簡代碼如下:
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <poll.h>
- #include <string.h>
- #include <assert.h>
- #include <fcntl.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include "eio.h"
- int respipe [2];
- /*
- * 功能:子線程通知主線程已有回執放入回執隊列.
- */
- void
- want_poll (void)
- {
- char dummy;
- printf ("want_poll ()\n");
- write (respipe [1], &dummy, 1);
- }
- /*
- * 功能:主線程回執處理完畢,調用此函數
- */
- void
- done_poll (void)
- {
- char dummy;
- printf ("done_poll ()\n");
- read (respipe [0], &dummy, 1);
- }
- /*
- * 功能:等到管道可讀,處理回執信息
- */
- void
- event_loop (void)
- {
- // an event loop. yeah.
- struct pollfd pfd;
- pfd.fd = respipe [0];
- pfd.events = POLLIN;
- printf ("\nentering event loop\n");
- while (eio_nreqs ())
- {
- poll (&pfd, 1, -1);
- printf ("eio_poll () = %d\n", eio_poll ());
- }
- printf ("leaving event loop\n");
- }
- /*
- * 功能:自定義函數,用戶處理請求執行后的回執信息
- */
- int
- res_cb (eio_req *req)
- {
- printf ("res_cb(%d|%s) = %d\n", req->type, req->data ? req->data : "?", EIO_RESULT (req));
- if (req->result < 0)
- abort ();
- return 0;
- }
- int
- main (void)
- {
- printf ("pipe ()\n");
- if (pipe (respipe))
- abort ();
- printf ("eio_init ()\n");
- if (eio_init (want_poll, done_poll)) //初始化libeio庫
- abort ();
- eio_mkdir ("eio-test-dir", 0777, 0, res_cb, "mkdir");
- event_loop ();
- return 0;
- }
可以將demo.c與libeio一起編譯,也可以先將libeio編譯為動態鏈接庫,然后demo.c與動態鏈接庫一起編譯。
執行流程圖如下所示:
流程圖詳細步驟說明如下:
1、通過pipe函數創建管道。
管道主要作用是子線程告知父線程已有請求回執放入回執隊列,父線程可以進行相應的處理。
2. libeio執行初始化操作。
調用eio_init執行初始化。eio_init函數聲明:int eio_init (void (*want_poll)(void), void (*done_poll)(void))。eio_init參數是兩個函數指針,want_poll和done_poll是成對出現。want_poll主要是子線程通知父線程已有請求處理完畢,done_poll則是在所有請求處理完畢后調用。
eio_init代碼如下:
- /*
- * 功能:libeio初始化
- */
- static int ecb_cold
- etp_init (void (*want_poll)(void), void (*done_poll)(void))
- {
- X_MUTEX_CREATE (wrklock);//子線程隊列互斥量
- X_MUTEX_CREATE (reslock);//請求隊列互斥量
- X_MUTEX_CREATE (reqlock);//回執隊列互斥量
- X_COND_CREATE (reqwait);//創建條件變量
- reqq_init (&req_queue);//初始化請求隊列
- reqq_init (&res_queue);//初始化回執隊列
- wrk_first.next =
- wrk_first.prev = &wrk_first;//子線程隊列
- started = 0;//運行線程數
- idle = 0;//空閑線程數
- nreqs = 0;//請求任務個數
- nready = 0;//待處理任務個數
- npending = 0;//未處理的回執個數
- want_poll_cb = want_poll;
- done_poll_cb = done_poll;
- return 0;
- }
3、父線程接受I/O請求
實例IO請求為創建一個文件夾。一般I/O請求都是阻塞請求,即父線程需要等到該I/O請求執行完畢,才能進行下一步動作。在libeio里面,主線程無需等待I/O操作執行完畢,它可以做其他事情,如繼續接受I/O請求。
這里創建文件夾,調用的libeio中的方法eio_mkdir。libeio對常用的I/O操作,都有自己的封裝函數。
- eio_req *eio_wd_open (const char *path, int pri, eio_cb cb, void *data); /* result=wd */
- eio_req *eio_wd_close (eio_wd wd, int pri, eio_cb cb, void *data);
- eio_req *eio_nop (int pri, eio_cb cb, void *data); /* does nothing except go through the whole process */
- eio_req *eio_busy (eio_tstamp delay, int pri, eio_cb cb, void *data); /* ties a thread for this long, simulating busyness */
- eio_req *eio_sync (int pri, eio_cb cb, void *data);
- eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data);
- eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data);
- eio_req *eio_syncfs (int fd, int pri, eio_cb cb, void *data);
- eio_req *eio_msync (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data);
- eio_req *eio_mtouch (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data);
- eio_req *eio_mlock (void *addr, size_t length, int pri, eio_cb cb, void *data);
- eio_req *eio_mlockall (int flags, int pri, eio_cb cb, void *data);
- eio_req *eio_sync_file_range (int fd, off_t offset, size_t nbytes, unsigned int flags, int pri, eio_cb cb, void *data);
- eio_req *eio_fallocate (int fd, int mode, off_t offset, size_t len, int pri, eio_cb cb, void *data);
- eio_req *eio_close (int fd, int pri, eio_cb cb, void *data);
- eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data);
- eio_req *eio_seek (int fd, off_t offset, int whence, int pri, eio_cb cb, void *data);
- eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data);
- eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data);
從列舉的函數中可以看出一些共同點,
- 返回值相同,都是結構體eio_req指針。
- 函數最后三個參數都一致。pri表示優先級;cb是用戶自定義的函數指針,主線程在I/O完成后調用;data存放數據
這里需要指出的是,在這些操作里面,沒有執行真正的I/O操作。下面通過eio_mkdir源碼來說明這些函數到底做了什么?
- /*
- * 功能:將創建文件夾請求放入請求隊列
- */
- eio_req *eio_mkdir (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
- {
- REQ (EIO_MKDIR);
- PATH;
- req->int2 = (long)mode;
- SEND;
- }
不得不吐槽一下,libeio里面太多宏定義了,代碼風格有點不好。這里REQ,PATH,SEND都是宏定義。為了便於閱讀,把宏給去掉
- /*
- * 功能:將創建文件夾請求放入請求隊列
- */
- eio_req *eio_mkdir (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
- {
- eio_req *req;
- req = (eio_req *)calloc (1, sizeof *req);
- if (!req)
- return 0;
- req->type = EIO_MKDIR;// 請求類型
- req->pri = pri;//請求優先級
- req->finish = cb;//請求處理完成后調用的函數
- req->data = data;//用戶數據
- req->destroy = eio_api_destroy;//釋放req資源
- req->flags |= EIO_FLAG_PTR1_FREE;//標記需要釋放ptr1
- req->ptr1 = strdup (path);
- if (!req->ptr1)
- {
- eio_api_destroy (req);
- return 0;
- }
- req->int2 = (long)mode;
- eio_submit (req); //將請求放入請求隊列,並喚醒子線程
- return req;
- }
4、請求放入請求隊列
請求隊列由結構體指針數組qs,qe構成,數組大小為9,數組的序號標志了優先級,即qs[1]存放的是優先級為1的所有請求中的第一個,qe[1]存放的是優先級為1的所有請求的最后一個。這樣做的好處是,在時間復雜度為O(1)的情況下插入新的請求。
- /*
- * 功能:將請求放入請求隊列,或者將回執放入回執隊列。 qe存放鏈表終點.qs存放鏈表起點.
- */
- static int ecb_noinline
- reqq_push (etp_reqq *q, ETP_REQ *req)
- {
- int pri = req->pri;
- req->next = 0;
- if (q->qe[pri])//如果該優先級以后請求,則插入到最后
- {
- q->qe[pri]->next = req;
- q->qe[pri] = req;
- }
- else
- q->qe[pri] = q->qs[pri] = req;
- return q->size++;
- }
5、喚醒子線程
- 創建的線程總數大於4(這個數字要想改變,只有重新編譯libeio了)
- 線程數大於未處理的請求。
- /*
- * 功能:創建線程,並把線程放入線程隊列
- */
- static void ecb_cold
- etp_start_thread (void)
- {
- etp_worker *wrk = calloc (1, sizeof (etp_worker));
- /*TODO*/
- assert (("unable to allocate worker thread data", wrk));
- X_LOCK (wrklock);
- //創建線程,並將線程插入到線程隊列.
- if (thread_create (&wrk->tid, etp_proc, (void *)wrk))
- {
- wrk->prev = &wrk_first;
- wrk->next = wrk_first.next;
- wrk_first.next->prev = wrk;
- wrk_first.next = wrk;
- ++started;
- }
- else
- free (wrk);
- X_UNLOCK (wrklock);
- }
6、子線程從請求隊列中取下請求
7、子線程處理請求
- /*
- * 功能:根據類型,執行不同的io操作
- */
- static void
- eio_execute (etp_worker *self, eio_req *req)
- {
- #if HAVE_AT
- int dirfd;
- #else
- const char *path;
- #endif
- if (ecb_expect_false (EIO_CANCELLED (req)))//判斷該請求是否取消
- {
- req->result = -1;
- req->errorno = ECANCELED;
- return;
- }
- switch (req->type)
- {
- case EIO_MKDIR: req->result = mkdirat (dirfd, req->ptr1, (mode_t)req->int2); break;
- }
- }
8、寫回執
9、通知父線程有回執
- /*
- * 功能:子線程通知主線程已有回執放入回執隊列.
- */
- void
- want_poll (void)
- {
- char dummy;
- printf ("want_poll ()\n");
- write (respipe [1], &dummy, 1);
- }
10、父線程處理回執
調用eio_poll函數處理回執。或許看到這里你在想,eio_poll是個系統函數,我們沒辦法修改,但是我們如何知道每一個I/O請求執行結果。 其實還是用的函數指針,在我們構建一個I/O請求結構體時,有一個finsh函數指針。當父進程處理I/O回執時,會調用該方法。這里自定義的 finish函數名為res_cb,當創建文件夾成功后,調用該函數,輸出一句話- /*
- * 功能:處理回執
- */
- static int
- etp_poll (void)
- {
- unsigned int maxreqs;
- unsigned int maxtime;
- struct timeval tv_start, tv_now;
- X_LOCK (reslock);
- maxreqs = max_poll_reqs;
- maxtime = max_poll_time;
- X_UNLOCK (reslock);
- if (maxtime)
- gettimeofday (&tv_start, 0);
- for (;;)
- {
- ETP_REQ *req;
- etp_maybe_start_thread ();
- X_LOCK (reslock);
- req = reqq_shift (&res_queue);//從回執隊列取出優先級最高的回執信息
- if (req)
- {
- --npending;
- if (!res_queue.size && done_poll_cb)//直到回執全部處理完,執行done_poll();
- {
- //printf("執行done_poll()\n");
- done_poll_cb ();
- }
- }
- X_UNLOCK (reslock);
- if (!req)
- return 0;
- X_LOCK (reqlock);
- --nreqs;//發出請求,到收到回執,該請求才算處理完畢.
- X_UNLOCK (reqlock);
- if (ecb_expect_false (req->type == EIO_GROUP && req->size))//ecb_expect_false僅僅用於幫助編譯器產生更優代碼,而對真值無任何影響
- {
- req->int1 = 1; /* mark request as delayed */
- continue;
- }
- else
- {
- int res = ETP_FINISH (req);//調用自定義函數,做進一步處理
- if (ecb_expect_false (res))
- return res;
- }
- if (ecb_expect_false (maxreqs && !--maxreqs))
- break;
- if (maxtime)
- {
- gettimeofday (&tv_now, 0);
- if (tvdiff (&tv_start, &tv_now) >= maxtime)
- break;
- }
- }
- errno = EAGAIN;
- return -1;
- }
11、當所有請求執行完畢,調用done_poll做收尾工作。
- /*
- * 功能:主線程回執處理完畢,調用此函數
- */
- void
- done_poll (void)
- {
- char dummy;
- printf ("done_poll ()\n");
- read (respipe [0], &dummy, 1);
- }
至此,libeio就簡單的跑了一遍,從示例代碼可以看出,libeio使用簡單。雖說現在是beat版,不過Node.js已經在使用了。
- #define ecb_expect(expr,value) __builtin_expect ((expr),(value))
- #define ecb_expect_false(expr) ecb_expect (!!(expr), 0)
- #define ecb_expect_true(expr) ecb_expect (!!(expr), 1)
- /* for compatibility to the rest of the world */
- #define ecb_likely(expr) ecb_expect_true (expr)
- #define ecb_unlikely(expr) ecb_expect_false (expr)