node源碼詳解(七) —— 文件異步io、線程池【互斥鎖、條件變量、管道、事件對象】


 知識共享許可協議本作品采用知識共享署名 4.0 國際許可協議進行許可。轉載保留聲明頭部與原文鏈接https://luzeshu.com/blog/nodesource7 

本博客同步在https://cnodejs.org/topic/571618c7e84805cd5410ea26 
本博客同步在http://www.cnblogs.com/papertree/p/5405202.html


  在上篇博客講到,網絡io通過封裝io觀察者(uv__io_t),添加到loop->watcher_queue隊列。在2.2節中講到文件異步io不同於網絡io,文件異步io把請求操作交給線程池處理,所有線程池的異步io操作統一由一個io觀察者來管理,等線程池處理完畢再通過該io觀察者告知事件循環(epoll_wait)有異步io操作完成,需要在事件循環的線程執行回調函數。

  這篇博客分以下幾個部分講解其中的細節:

1. 從文件異步io操作到封裝請求交給線程池的過程

2. 線程池的原理、相關的系統支持【互斥鎖、條件變量】

3. 線程池完成io操作后,告知主線程/事件循環的方式 —— 線程池統一的io觀察者,及相關的系統支持【管道、事件對象】

4. 主線程epoll_wait收到線程池的通知后,回調到文件異步io操作的callback的過程

7.1 文件異步io到線程池

  上一篇博客以server.listen(80)為例來講解網絡io,這一篇以fs.writeFile('xxx', function (err, data) {});為例來講解文件異步io。

  js代碼到libuv的函數,經歷了幾個層次(6.2.1節-6.2.4節,“原生js lib模塊 -> node C++模塊 -> libuv模塊”),這幾個層次文件io和網絡io是類似的,就忽略了。

  這里只針對libuv的文件異步io如何封裝成請求對象交給線程池。

7.1.1 libuv的文件io請求對象 —— uv_fs_t

  看一下libuv的異步讀文件代碼,deps/uv/src/unix/fs.c:

圖7-1-1

  可以看到一次異步文件讀操作在libuv層被封裝到一個uv_fs_t的結構體,req->cb是來自上層的回調函數(node C++層:src/node_file.cc 的After函數)。

  異步io請求最后調用uv__work_submit,把異步io請求提交給線程池。這里有兩個函數:

uv__fs_work:這個是文件io的處理函數,可以看到當cb為NULL的時候,即非異步模式,uv__fs_work在當前線程(事件循環所在線程)直接被調用。如果cb != NULL,即文件io為異步模式,此時把uv__fs_work和uv__fs_done提交給線程池。

uv__fs_done:這個是異步文件io結束后的回調函數。在uv__fs_done里面會回調上層C++模塊的cb函數(即req->cb)。

  這里需要注意的是,異步模式下,把uv__fs_work、uv__fs_done當成參數調用uv__work_submit向線程池提交異步io請求,此時io操作的主體 —— uv__fs_work函數是在線程池里執行的。但是uv__fs_done必須在事件循環的線程里被回調,因為這個函數最終會回調到用戶js代碼的回調函數,而js代碼里的所有代碼必須在同個線程里面

7.1.2 線程池的請求對象 —— struct uv__work

  來看看uv__work_submit做了什么:

圖7-1-2

  uv__work_submit 把傳進來的uv__fs_work、uv__fs_done封裝到uv__work結構體里面,這個結構體表示一個線程操作的請求。通過post把請求提交給線程池,post的原理7.2節講。

  看到post函數里面的QUEUE_INSERT_TAIL,把該uv__work對象加進wq鏈表里面。wq是一個全局靜態變量。也就是說,進程空間里的所有線程共用同一個wq鏈表。wq隊列的使用在最下面的7.4.2節會用到。

  至於通過void* [2]類型的成員變量w->wq去維護一個鏈表的機制,在6.4節里有介紹。

 


 

7.2 線程池的原理 —— 條件變量與互斥鎖

7.2.1 條件變量與互斥鎖基礎

1. 互斥鎖 —— pthread_mutex_t mutex

  系統通過pthread_mutex_t結構、及相關的pthread_mutex_lock()、pthread_mutex_unlock()來對共享資源的請求進行加鎖、解鎖。

2. 條件變量 —— pthread_cond_t condition

  系統通過pthread_cond_t結構、及相關的pthread_cond_wait()、pthread_cond_signal()函數來實現線程間等待、通知的機制。

【注意:系統提供的條件變量機制必須結合互斥鎖使用,也就是pthread_cond_wait(&condition, &mutex)需要傳條件變量與一個互斥體結構,而且pthread_cond_wait之前必須獲得互斥鎖。其中原因簡單來說就是條件變量本身也是需要加鎖保護的資源。具體解釋可以參考:http://stackoverflow.com/questions/6312342/pthread-cond-wait-and-mutex-requirement

7.2.2 線程池原理

  來看看threadpool.c 文件的幾個相關函數:

圖7-2-1

  這里有四個環節:

1. 創建工作線程:

  這里的init_once函數調用uv_thread_create創建了nthreads數量的工作線程,nthread默認為4。worker為工作線程的執行函數。

  看到圖7-1-2,有一行uv_once(&once, init_once); 【uv_once對應的系統調用是pthread_once】。該行代碼保證了init_once 有且僅被執行一次。在第一次調用uv__work_submit()時會執行一次init_once()。

2. 工作線程進入等待:

  看到worker線程最終會陷入uv_cond_wait【對應的系統調用是pthread_cond_wait】進行等待,且idle_threads自增。

  這里的&cond、&mutex分別是一個全局的靜態條件變量、互斥體。

3. 提交任務到線程池:

  看到post函數通過uv_cond_signal【對應的系統調用是pthread_cond_signal】向相應的條件變量——cond發送信號,處在uv_cond_wait掛起等待的工作線程當中的某個被激活。

  worker線程往下執行,從wq取出w(保存的過程見7.1節),執行w->work()(對應7.1節中的uv_fs_work)。

4.  通知主線程的事件循環:

  工作線程完成任務后,調用uv_async_send通知主線程某個統一的io觀察者。這里的機制7.3節講。


7.3 線程池統一的io觀察者 —— 管道、事件對象

7.3.1 管道、事件對象

  管道、事件對象都是系統提供的機制,都可以用於線程間發送數據,所以這里可以用於線程間的通知。

1. 管道

  管道的相關系統調用是pipe()、pipe2()。參考 http://man7.org/linux/man-pages/man2/pipe.2.html

  管道會創建兩個fd,往fd[1]寫數據,那么fd[0]就會收到數據。那么只需要把fd[0]添加到epoll_wait()所監聽的io觀察者隊列里面,在工作線程需要通知的時候往fd[1]寫數據,即能在主線程的epoll里面監聽其他工作線程任務完成的通知。

2. 事件對象

  事件對象的相關系統調用是eventfd()、eventfd2()。參考 http://man7.org/linux/man-pages/man2/eventfd.2.html

  與管道不同的是eventfd()只會創建一個fd,事件對象的讀寫都通過這個fd。事件對象內部維護一個counter,往fd寫一個8字節的整數,會往counter加,而讀的時候會返回counter,如果counter為0,那么讀操作會阻塞住(fd為阻塞模式)。而這個fd也是可以交由epoll機制進行監聽的,那么也可以達到使用管道一樣的目的。

 3. 使用哪個?

  這里libuv創建異步io觀察者fd時,優先使用eventfd,如果系統不支持事件對象,就使用管道替代。看一下相關實現:

圖7-3-1

  可以看到使用uv__eventfd2返回-1(errno = ENOSYS)時,uv__async_start里面使用管道替代了事件對象。而判斷系統是否支持eventfd,是通過__NR_eventfd2宏去判斷。

  這里需要注意的是:使用宏進行判斷__NR_eventfd是否defined是在編譯期,而uv__async_start的執行是在運行期,也就是說,如果你在不支持事件對象的系統編譯之后,在支持事件對象的系統上運行,那么uv__eventfd2始終是返回-1的

7.3.2 異步io觀察者

7.3.2.1 數據結構 —— struct uv__async

  在6.1.3節講了持有io觀察者的結構體 uv_tcp_s,6.2.4節講了網絡io操作如何封裝成uv_tcp_t結構體、並構造對應的io_watcher,6.3.1和6.4節講了如何把io_watcher加進uv_loop_t default_loop_struct的watcher_queue隊列里。

  那么類似於網絡io操作的io觀察者(uv__io_t io_watcher)由uv_tcp_s結構體來持有,這里要討論的異步io觀察者也是由一個數據結構(struct uv__async)持有的io觀察者。通過把持有的io觀察者(io_watcher)加進loop->watcher_queue隊列,來加進到epoll的觀察者隊列中。

  看到6.1.1節中關於struct uv_loop_s default_loop_struct的截圖,發現uv_loop_s里面有個成員 struct uv__async async_watcher。這個就是管理統一異步io觀察者的數據結構,一個事件循環結構體(uv_loop_t)有且只有一個。類似於uv_tcp_s。

  看一些uv__async的定義,也持有一個uv__io_t io_watcher,還有封裝了一個cb:

圖7-3-2

7.3.2.2 異步io觀察者的保存與回調

  我們知道一個uv_tcp_t的io觀察者,是在用戶調用了網絡io之后,才加進到loop->watcher_queue里面的。那么這個異步io觀察者是在node啟動時,通過一連串調用node::Start() -> uv_default_loop() -> uv_loop_init() -> uv_async_init() -> uv__async_start(),最終調用uv__io_start(),把loop->async_watcher所持有的io_watcher加進loop->watcher_queue的。uv__async_start()也是創建事件對象/管道的地方,在上圖的7-3-1可以看到。

  來看一下loop->async_watcher和loop->async_watcher.io_watcher封裝的回調函數。

圖7-3-3

  可以看到loop->async_watcher.io_watcher->cb 是uv__async_io;

  loop->async_watcher.cb 是uv__async_event。

  7.2.2節講到worker線程完成w->work()之后,通過uv_async_send通知異步io觀察者,uv_async_send的操作就是往事件對象/管道寫東西,那么當io觀察者收到數據,uv_run()里面的epoll_wait()返回該io_watcher的fd時,uv__async_io會先被回調,在uv__async_io里面會進而調用uv__async_event。看下代碼:

圖7-3-4

  uv__aysnc_io里面取出的wa就是loop->async_watcher,所以wa->cb就是uv__async_event。


 

7.4 線程池異步io之后的回調工作

  講到uv__async_event這一步,我們回想一下此時應該執行什么處理:worker線程執行完了w->work()(其中w是提交線程池的請求結構體 uv__work),然后通知事件循環需要在主線程執行w->done(),而通知的這個過程就是通過 uv_async_send()往管道/事件對象寫數據,激活epoll_wait(),根據返回的fd,由loop->watchers映射表拿到異步io觀察者 —— loop->async_watcher.io_watcher,然后層層回調到uv__async_event,那么這個時候,我們是否要調用線程池完成了w->work()之后剩余的w->done()?

7.4.1 uv__async_event() 到 uv__work_done()

  node里面多次使用void*[2]類型來維護一個鏈表,loop->async_handles也是。可以看到圖6-1-1。那么async_handles保存什么鏈表呢?

  看到圖7-3-4,uv__async_event()就是從loop->async_handles鏈表里,取出struct uv_async_t結構類型的元素h,並調用回調函數h->async_cb()。

  再看到圖7-3-3,uv_async_init()里面,往loop->async_handles里面添加了struct uv_async_t* t。7.3.2.2節講到的一系列調用流程有:uv_loop_init() -> uv_async_init(),看下uv_loop_init()調用uv_async_init()的代碼:

圖7-4-1

  可以看到uv_loop_init()傳給uv_async_init()的uv_async_t 是loop->wq_async,而async_cb是uv__work_done。

  所以最終異步io觀察者被激活之后,主線程回調到了uv__work_done()。uv__work_done在線程池模塊(deps/uv/src/threadpool.c)里面。

 7.4.2 uv__work_done()

  看一下uv__work_done()的代碼:

圖7-4-2

  在7.1.2節就講了post()提交請求時,往全局隊列wq添加一個uv__work數據結構,那么最終uv__work_done()被調用的時候,從該wq取出所有w,執行w->done(),完成最終的回調。這里的w->done()就是7.1節中提到的fs__work_done()。

  注意了,這里的uv__work_done()是在主線程執行的,也就是你的js代碼由始至終在同一個線程里面執行


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM