libeio-異步I/O庫初窺


 

在 Windows 平台上不可用。

 

Libeio是全功能的用於C語言的異步I/O庫,建模風格和秉承的精神與libev類似。特性包括:異步的read、write、open、close、stat、unlink、fdatasync、mknod、readdir等(基本上是完整的POSIX API)。

Libeio完全基於事件庫,可以容易地集成到事件庫(或獨立,甚至是以輪詢方式)使用。Libeio非常輕便,且只依賴於POSIX線程。

Libeio當前的源碼,文檔,集成和輕便性都在libev之下,但應該很快可以用於生產環境了。

 

   Libeio是用多線程實現的異步I/O庫.主要步驟如下:

  1.  主線程接受請求,將請求放入請求隊列,喚醒子線程處理。這里主線程不會阻塞,會繼續接受請求
  2. 子線程處理請求,將請求回執放入回執隊列,並調用用戶自定義方法,通知主線程有請求已處理完畢
  3. 主線程處理回執。

     源碼中提供了一個demo.c用於演示,精簡代碼如下:

[cpp]   view plain copy print ?
  1. #include <stdio.h>  
  2. #include <stdlib.h>  
  3. #include <unistd.h>  
  4. #include <poll.h>  
  5. #include <string.h>  
  6. #include <assert.h>  
  7. #include <fcntl.h>  
  8. #include <sys/types.h>  
  9. #include <sys/stat.h>  
  10.   
  11. #include "eio.h"  
  12.   
  13. int respipe [2];  
  14.   
  15. /* 
  16.  * 功能:子線程通知主線程已有回執放入回執隊列. 
  17.  */  
  18. void  
  19. want_poll (void)  
  20. {  
  21.   char dummy;  
  22.   printf ("want_poll ()\n");  
  23.   write (respipe [1], &dummy, 1);  
  24. }  
  25.   
  26. /* 
  27.  * 功能:主線程回執處理完畢,調用此函數 
  28.  */  
  29. void  
  30. done_poll (void)  
  31. {  
  32.   char dummy;  
  33.   printf ("done_poll ()\n");  
  34.   read (respipe [0], &dummy, 1);  
  35. }  
  36. /* 
  37.  * 功能:等到管道可讀,處理回執信息 
  38.  */  
  39. void  
  40. event_loop (void)  
  41. {  
  42.   // an event loop. yeah.  
  43.   struct pollfd pfd;  
  44.   pfd.fd     = respipe [0];  
  45.   pfd.events = POLLIN;  
  46.   
  47.   printf ("\nentering event loop\n");  
  48.   while (eio_nreqs ())  
  49.     {  
  50.       poll (&pfd, 1, -1);  
  51.       printf ("eio_poll () = %d\n", eio_poll ());  
  52.     }  
  53.   printf ("leaving event loop\n");  
  54. }  
  55.   
  56. /* 
  57.  * 功能:自定義函數,用戶處理請求執行后的回執信息 
  58.  */  
  59. int  
  60. res_cb (eio_req *req)  
  61. {  
  62.   printf ("res_cb(%d|%s) = %d\n", req->type, req->data ? req->data : "?", EIO_RESULT (req));  
  63.   
  64.   if (req->result < 0)  
  65.     abort ();  
  66.   
  67.   return 0;  
  68. }  
  69.   
  70. int  
  71. main (void)  
  72. {  
  73.   printf ("pipe ()\n");  
  74.   if (pipe (respipe))  
  75.       abort ();  
  76.   printf ("eio_init ()\n");  
  77.   if (eio_init (want_poll, done_poll)) //初始化libeio庫  
  78.       abort ();  
  79.   eio_mkdir ("eio-test-dir", 0777, 0, res_cb, "mkdir");      
  80.   event_loop ();  
  81.   return 0;  
  82. }  

   可以將demo.c與libeio一起編譯,也可以先將libeio編譯為動態鏈接庫,然后demo.c與動態鏈接庫一起編譯。

   執行流程圖如下所示:

     流程圖詳細步驟說明如下:

1、通過pipe函數創建管道。

        管道主要作用是子線程告知父線程已有請求回執放入回執隊列,父線程可以進行相應的處理。

2.   libeio執行初始化操作。

       調用eio_init執行初始化。eio_init函數聲明:int eio_init (void (*want_poll)(void), void (*done_poll)(void))。eio_init參數是兩個函數指針,want_poll和done_poll是成對出現。want_poll主要是子線程通知父線程已有請求處理完畢,done_poll則是在所有請求處理完畢后調用。

     eio_init代碼如下: 

[cpp]   view plain copy print ?
  1. /* 
  2.  * 功能:libeio初始化 
  3.  */  
  4. static int ecb_cold  
  5. etp_init (void (*want_poll)(void), void (*done_poll)(void))  
  6. {  
  7.   X_MUTEX_CREATE (wrklock);//子線程隊列互斥量  
  8.   X_MUTEX_CREATE (reslock);//請求隊列互斥量  
  9.   X_MUTEX_CREATE (reqlock);//回執隊列互斥量  
  10.   X_COND_CREATE  (reqwait);//創建條件變量  
  11.   
  12.   reqq_init (&req_queue);//初始化請求隊列  
  13.   reqq_init (&res_queue);//初始化回執隊列  
  14.   
  15.   wrk_first.next =  
  16.   wrk_first.prev = &wrk_first;//子線程隊列  
  17.   
  18.   started  = 0;//運行線程數  
  19.   idle     = 0;//空閑線程數  
  20.   nreqs    = 0;//請求任務個數  
  21.   nready   = 0;//待處理任務個數  
  22.   npending = 0;//未處理的回執個數  
  23.   
  24.   want_poll_cb = want_poll;  
  25.   done_poll_cb = done_poll;  
  26.   
  27.   return 0;  
  28. }  

3、父線程接受I/O請求

    實例IO請求為創建一個文件夾。一般I/O請求都是阻塞請求,即父線程需要等到該I/O請求執行完畢,才能進行下一步動作。在libeio里面,主線程無需等待I/O操作執行完畢,它可以做其他事情,如繼續接受I/O請求。

    這里創建文件夾,調用的libeio中的方法eio_mkdir。libeio對常用的I/O操作,都有自己的封裝函數。

    

[cpp]   view plain copy print ?
  1. eio_req *eio_wd_open   (const char *path, int pri, eio_cb cb, void *data); /* result=wd */  
  2. eio_req *eio_wd_close  (eio_wd wd, int pri, eio_cb cb, void *data);  
  3. eio_req *eio_nop       (int pri, eio_cb cb, void *data); /* does nothing except go through the whole process */  
  4. eio_req *eio_busy      (eio_tstamp delay, int pri, eio_cb cb, void *data); /* ties a thread for this long, simulating busyness */  
  5. eio_req *eio_sync      (int pri, eio_cb cb, void *data);  
  6. eio_req *eio_fsync     (int fd, int pri, eio_cb cb, void *data);  
  7. eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data);  
  8. eio_req *eio_syncfs    (int fd, int pri, eio_cb cb, void *data);  
  9. eio_req *eio_msync     (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data);  
  10. eio_req *eio_mtouch    (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data);  
  11. eio_req *eio_mlock     (void *addr, size_t length, int pri, eio_cb cb, void *data);  
  12. eio_req *eio_mlockall  (int flags, int pri, eio_cb cb, void *data);  
  13. eio_req *eio_sync_file_range (int fd, off_t offset, size_t nbytes, unsigned int flags, int pri, eio_cb cb, void *data);  
  14. eio_req *eio_fallocate (int fd, int mode, off_t offset, size_t len, int pri, eio_cb cb, void *data);  
  15. eio_req *eio_close     (int fd, int pri, eio_cb cb, void *data);  
  16. eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data);  
  17. eio_req *eio_seek      (int fd, off_t offset, int whence, int pri, eio_cb cb, void *data);  
  18. eio_req *eio_read      (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data);  
  19. eio_req *eio_write     (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data);  

   從列舉的函數中可以看出一些共同點,

  •    返回值相同,都是結構體eio_req指針。
  •    函數最后三個參數都一致。pri表示優先級;cb是用戶自定義的函數指針,主線程在I/O完成后調用;data存放數據

   這里需要指出的是,在這些操作里面,沒有執行真正的I/O操作。下面通過eio_mkdir源碼來說明這些函數到底做了什么?

  

[cpp]   view plain copy print ?
  1. /* 
  2.  * 功能:將創建文件夾請求放入請求隊列 
  3.  */  
  4. eio_req *eio_mkdir (const char *path, mode_t mode, int pri, eio_cb cb, void *data)  
  5. {  
  6.   REQ (EIO_MKDIR);   
  7.   PATH;  
  8.   req->int2 = (long)mode;   
  9.   SEND;  
  10. }  

不得不吐槽一下,libeio里面太多宏定義了,代碼風格有點不好。這里REQ,PATH,SEND都是宏定義。為了便於閱讀,把宏給去掉

[cpp]   view plain copy print ?
  1. /* 
  2.  * 功能:將創建文件夾請求放入請求隊列 
  3.  */  
  4. eio_req *eio_mkdir (const char *path, mode_t mode, int pri, eio_cb cb, void *data)  
  5. {  
  6.   eio_req *req;                                                                                                                
  7.   req = (eio_req *)calloc (1, sizeof *req);                       
  8.   if (!req)                                                       
  9.     return 0;                                                                                                                   
  10.   req->type    = EIO_MKDIR;// 請求類型                       
  11.   req->pri     = pri;//請求優先級       
  12.   req->finish  = cb;//請求處理完成后調用的函數         
  13.   req->data    = data;//用戶數據       
  14.   req->destroy = eio_api_destroy;//釋放req資源  
  15.   req->flags |= EIO_FLAG_PTR1_FREE;//標記需要釋放ptr1            
  16.   req->ptr1 = strdup (path);                   
  17.   if (!req->ptr1)                          
  18.   {                               
  19.       eio_api_destroy (req);                      
  20.       return 0;                           
  21.   }  
  22.   req->int2 = (long)mode;   
  23.   eio_submit (req); //將請求放入請求隊列,並喚醒子線程  
  24.   return req;  
  25. }  

 

4、請求放入請求隊列

   請求隊列由結構體指針數組qs,qe構成,數組大小為9,數組的序號標志了優先級,即qs[1]存放的是優先級為1的所有請求中的第一個,qe[1]存放的是優先級為1的所有請求的最后一個。這樣做的好處是,在時間復雜度為O(1)的情況下插入新的請求。

 

[cpp]   view plain copy print ?
  1. /* 
  2.  * 功能:將請求放入請求隊列,或者將回執放入回執隊列。 qe存放鏈表終點.qs存放鏈表起點. 
  3.  */  
  4. static int ecb_noinline  
  5. reqq_push (etp_reqq *q, ETP_REQ *req)  
  6. {  
  7.   int pri = req->pri;  
  8.   req->next = 0;  
  9.   
  10.   if (q->qe[pri])//如果該優先級以后請求,則插入到最后  
  11.     {  
  12.       q->qe[pri]->next = req;  
  13.       q->qe[pri] = req;  
  14.     }  
  15.   else  
  16.     q->qe[pri] = q->qs[pri] = req;  
  17.   
  18.   return q->size++;  
  19. }  

 5、喚醒子線程

     這里並不是來一個請求,就為該請求創建一個線程。在下面兩種情況下,不創建線程。
  •   創建的線程總數大於4(這個數字要想改變,只有重新編譯libeio了)
  •  線程數大於未處理的請求。
    線程創建之后,放入線程隊列。
[cpp]   view plain copy print ?
  1. /* 
  2.  * 功能:創建線程,並把線程放入線程隊列 
  3.  */  
  4. static void ecb_cold  
  5. etp_start_thread (void)  
  6. {  
  7.   etp_worker *wrk = calloc (1, sizeof (etp_worker));  
  8.   
  9.   /*TODO*/  
  10.   assert (("unable to allocate worker thread data", wrk));  
  11.   
  12.   X_LOCK (wrklock);  
  13.   
  14.   //創建線程,並將線程插入到線程隊列.  
  15.   if (thread_create (&wrk->tid, etp_proc, (void *)wrk))  
  16.     {  
  17.       wrk->prev = &wrk_first;  
  18.       wrk->next = wrk_first.next;  
  19.       wrk_first.next->prev = wrk;  
  20.       wrk_first.next = wrk;  
  21.       ++started;  
  22.     }  
  23.   else  
  24.     free (wrk);  
  25.   
  26.   X_UNLOCK (wrklock);  
  27. }  

 6、子線程從請求隊列中取下請求

      取請求時按照優先級來取的。

7、子線程處理請求

     子線程調用eio_excute處理請求。這里才真正的執行I/O操作。之前我們傳過來的是創建文件夾操作,子線程判斷請求類型,根據類型,調用系統函數執行操作,並把執行結果,寫回到請求的result字段,如果執行有誤,設置errno
     因為eio_excute函數比較長,這里只貼出創建文件夾代碼。
     
[cpp]   view plain copy print ?
  1. /* 
  2.  * 功能:根據類型,執行不同的io操作 
  3.  */  
  4. static void  
  5. eio_execute (etp_worker *self, eio_req *req)  
  6. {  
  7. #if HAVE_AT  
  8.   int dirfd;  
  9. #else  
  10.   const char *path;  
  11. #endif  
  12.   
  13.   if (ecb_expect_false (EIO_CANCELLED (req)))//判斷該請求是否取消  
  14.     {  
  15.       req->result  = -1;  
  16.       req->errorno = ECANCELED;  
  17.       return;  
  18.     }  
  19.    switch (req->type)  
  20.    {  
  21.        case EIO_MKDIR:     req->result = mkdirat   (dirfd, req->ptr1, (mode_t)req->int2); break;  
  22.    }  
  23. }  
   從代碼中可以看出,用戶是可以取消之前的I/O操作,如果I/O操作未執行,可以取消。如果I/O操作已經在運行了,則取消無效。

8、寫回執

    回執其實就是之前傳給子線程的自定義結構體。當子線程取下該請求,並根據類型執行后,執行結構寫入請求的result字段,並將該請求插入到回執隊列res_queue中。

9、通知父線程有回執

    用戶自己定義want_poll函數,用於子線程通知父線程有請求回執放入回執隊列。示例代碼是用的寫管道。這里需要指出的時,當將請求回執放入空的回執 隊列才會通知父線程,如果在放入時,回執隊列已不為空,則不會通知父線程。為什么了?因為父線程處理回執的時候,會處理現有的所有回執。
   
[cpp]   view plain copy print ?
  1. /* 
  2.  * 功能:子線程通知主線程已有回執放入回執隊列. 
  3.  */  
  4. void  
  5. want_poll (void)  
  6. {  
  7.   char dummy;  
  8.   printf ("want_poll ()\n");  
  9.   write (respipe [1], &dummy, 1);  
  10. }  

10、父線程處理回執

     調用eio_poll函數處理回執。或許看到這里你在想,eio_poll是個系統函數,我們沒辦法修改,但是我們如何知道每一個I/O請求執行結果。 其實還是用的函數指針,在我們構建一個I/O請求結構體時,有一個finsh函數指針。當父進程處理I/O回執時,會調用該方法。這里自定義的 finish函數名為res_cb,當創建文件夾成功后,調用該函數,輸出一句話
[cpp]   view plain copy print ?
  1. /* 
  2.  * 功能:處理回執 
  3.  */  
  4. static int  
  5. etp_poll (void)  
  6. {  
  7.   unsigned int maxreqs;  
  8.   unsigned int maxtime;  
  9.   struct timeval tv_start, tv_now;  
  10.   
  11.   X_LOCK (reslock);  
  12.   maxreqs = max_poll_reqs;  
  13.   maxtime = max_poll_time;  
  14.   X_UNLOCK (reslock);  
  15.   
  16.   if (maxtime)  
  17.     gettimeofday (&tv_start, 0);  
  18.   
  19.   for (;;)  
  20.     {  
  21.       ETP_REQ *req;  
  22.   
  23.       etp_maybe_start_thread ();  
  24.   
  25.       X_LOCK (reslock);  
  26.       req = reqq_shift (&res_queue);//從回執隊列取出優先級最高的回執信息  
  27.   
  28.       if (req)  
  29.         {  
  30.           --npending;  
  31.   
  32.           if (!res_queue.size && done_poll_cb)//直到回執全部處理完,執行done_poll();  
  33.           {  
  34.               //printf("執行done_poll()\n");  
  35.               done_poll_cb ();  
  36.           }  
  37.         }  
  38.   
  39.       X_UNLOCK (reslock);  
  40.   
  41.       if (!req)  
  42.         return 0;  
  43.   
  44.       X_LOCK (reqlock);  
  45.       --nreqs;//發出請求,到收到回執,該請求才算處理完畢.  
  46.       X_UNLOCK (reqlock);  
  47.   
  48.       if (ecb_expect_false (req->type == EIO_GROUP && req->size))//ecb_expect_false僅僅用於幫助編譯器產生更優代碼,而對真值無任何影響  
  49.         {  
  50.           req->int1 = 1; /* mark request as delayed */  
  51.           continue;  
  52.         }  
  53.       else  
  54.         {  
  55.           int res = ETP_FINISH (req);//調用自定義函數,做進一步處理  
  56.           if (ecb_expect_false (res))  
  57.             return res;  
  58.         }  
  59.   
  60.       if (ecb_expect_false (maxreqs && !--maxreqs))  
  61.         break;  
  62.   
  63.       if (maxtime)  
  64.         {  
  65.           gettimeofday (&tv_now, 0);  
  66.   
  67.           if (tvdiff (&tv_start, &tv_now) >= maxtime)  
  68.             break;  
  69.         }  
  70.     }  
  71.   
  72.   errno = EAGAIN;  
  73.   return -1;  
  74. }  

11、當所有請求執行完畢,調用done_poll做收尾工作。

    在示例代碼中是讀出管道中的數據。用戶可以自己定義一些別的工作
[cpp]   view plain copy print ?
  1. /* 
  2.  * 功能:主線程回執處理完畢,調用此函數 
  3.  */  
  4. void  
  5. done_poll (void)  
  6. {  
  7.   char dummy;  
  8.   printf ("done_poll ()\n");  
  9.   read (respipe [0], &dummy, 1);  
  10. }  

至此,libeio就簡單的跑了一遍,從示例代碼可以看出,libeio使用簡單。雖說現在是beat版,不過Node.js已經在使用了。
 
最后簡單說一下代碼中的宏ecb_expect_false和ecb_expect_true,在if判斷中,經常會出現這兩個宏,一步一步的查看宏定義,宏定義如下:
[cpp]   view plain copy print ?
  1. #define ecb_expect(expr,value)         __builtin_expect ((expr),(value))  
  2. #define ecb_expect_false(expr) ecb_expect (!!(expr), 0)  
  3. #define ecb_expect_true(expr)  ecb_expect (!!(expr), 1)  
  4. /* for compatibility to the rest of the world */  
  5. #define ecb_likely(expr)   ecb_expect_true  (expr)  
  6. #define ecb_unlikely(expr) ecb_expect_false (expr)  
剛開始我也不太懂啥意思,后來查閱資料( http://www.adamjiang.com/archives/251)才明白,這些宏 僅僅是在幫助編譯器產生更優代碼,而對真值的判斷沒有影響


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM