本作品采用知識共享署名 4.0 國際許可協議進行許可。轉載保留聲明頭部與原文鏈接https://luzeshu.com/blog/nodesource7
https://cnodejs.org/topic/571618c7e84805cd5410ea26
本博客同步在http://www.cnblogs.com/papertree/p/5405202.html
在上篇博客講到,網絡io通過封裝io觀察者(uv__io_t),添加到loop->watcher_queue隊列。在2.2節中講到文件異步io不同於網絡io,文件異步io把請求操作交給線程池處理,所有線程池的異步io操作統一由一個io觀察者來管理,等線程池處理完畢再通過該io觀察者告知事件循環(epoll_wait)有異步io操作完成,需要在事件循環的線程執行回調函數。
這篇博客分以下幾個部分講解其中的細節:
1. 從文件異步io操作到封裝請求交給線程池的過程
2. 線程池的原理、相關的系統支持【互斥鎖、條件變量】
3. 線程池完成io操作后,告知主線程/事件循環的方式 —— 線程池統一的io觀察者,及相關的系統支持【管道、事件對象】
4. 主線程epoll_wait收到線程池的通知后,回調到文件異步io操作的callback的過程
7.1 文件異步io到線程池
上一篇博客以server.listen(80)為例來講解網絡io,這一篇以fs.writeFile('xxx', function (err, data) {});為例來講解文件異步io。
js代碼到libuv的函數,經歷了幾個層次(6.2.1節-6.2.4節,“原生js lib模塊 -> node C++模塊 -> libuv模塊”),這幾個層次文件io和網絡io是類似的,就忽略了。
這里只針對libuv的文件異步io如何封裝成請求對象交給線程池。
7.1.1 libuv的文件io請求對象 —— uv_fs_t
看一下libuv的異步讀文件代碼,deps/uv/src/unix/fs.c:
圖7-1-1
可以看到一次異步文件讀操作在libuv層被封裝到一個uv_fs_t的結構體,req->cb是來自上層的回調函數(node C++層:src/node_file.cc 的After函數)。
異步io請求最后調用uv__work_submit,把異步io請求提交給線程池。這里有兩個函數:
uv__fs_work:這個是文件io的處理函數,可以看到當cb為NULL的時候,即非異步模式,uv__fs_work在當前線程(事件循環所在線程)直接被調用。如果cb != NULL,即文件io為異步模式,此時把uv__fs_work和uv__fs_done提交給線程池。
uv__fs_done:這個是異步文件io結束后的回調函數。在uv__fs_done里面會回調上層C++模塊的cb函數(即req->cb)。
這里需要注意的是,異步模式下,把uv__fs_work、uv__fs_done當成參數調用uv__work_submit向線程池提交異步io請求,此時io操作的主體 —— uv__fs_work函數是在線程池里執行的。但是uv__fs_done必須在事件循環的線程里被回調,因為這個函數最終會回調到用戶js代碼的回調函數,而js代碼里的所有代碼必須在同個線程里面。
7.1.2 線程池的請求對象 —— struct uv__work
來看看uv__work_submit做了什么:
圖7-1-2
uv__work_submit 把傳進來的uv__fs_work、uv__fs_done封裝到uv__work結構體里面,這個結構體表示一個線程操作的請求。通過post把請求提交給線程池,post的原理7.2節講。
看到post函數里面的QUEUE_INSERT_TAIL,把該uv__work對象加進wq鏈表里面。wq是一個全局靜態變量。也就是說,進程空間里的所有線程共用同一個wq鏈表。wq隊列的使用在最下面的7.4.2節會用到。
至於通過void* [2]類型的成員變量w->wq去維護一個鏈表的機制,在6.4節里有介紹。
7.2 線程池的原理 —— 條件變量與互斥鎖
7.2.1 條件變量與互斥鎖基礎
1. 互斥鎖 —— pthread_mutex_t mutex
系統通過pthread_mutex_t結構、及相關的pthread_mutex_lock()、pthread_mutex_unlock()來對共享資源的請求進行加鎖、解鎖。
2. 條件變量 —— pthread_cond_t condition
系統通過pthread_cond_t結構、及相關的pthread_cond_wait()、pthread_cond_signal()函數來實現線程間等待、通知的機制。
【注意:系統提供的條件變量機制必須結合互斥鎖使用,也就是pthread_cond_wait(&condition, &mutex)需要傳條件變量與一個互斥體結構,而且pthread_cond_wait之前必須獲得互斥鎖。其中原因簡單來說就是條件變量本身也是需要加鎖保護的資源。具體解釋可以參考:http://stackoverflow.com/questions/6312342/pthread-cond-wait-and-mutex-requirement】
7.2.2 線程池原理
來看看threadpool.c 文件的幾個相關函數:
圖7-2-1
這里有四個環節:
1. 創建工作線程:
這里的init_once函數調用uv_thread_create創建了nthreads數量的工作線程,nthread默認為4。worker為工作線程的執行函數。
看到圖7-1-2,有一行uv_once(&once, init_once); 【uv_once對應的系統調用是pthread_once】。該行代碼保證了init_once 有且僅被執行一次。在第一次調用uv__work_submit()時會執行一次init_once()。
2. 工作線程進入等待:
看到worker線程最終會陷入uv_cond_wait【對應的系統調用是pthread_cond_wait】進行等待,且idle_threads自增。
這里的&cond、&mutex分別是一個全局的靜態條件變量、互斥體。
3. 提交任務到線程池:
看到post函數通過uv_cond_signal【對應的系統調用是pthread_cond_signal】向相應的條件變量——cond發送信號,處在uv_cond_wait掛起等待的工作線程當中的某個被激活。
worker線程往下執行,從wq取出w(保存的過程見7.1節),執行w->work()(對應7.1節中的uv_fs_work)。
4. 通知主線程的事件循環:
工作線程完成任務后,調用uv_async_send通知主線程某個統一的io觀察者。這里的機制7.3節講。
7.3 線程池統一的io觀察者 —— 管道、事件對象
7.3.1 管道、事件對象
管道、事件對象都是系統提供的機制,都可以用於線程間發送數據,所以這里可以用於線程間的通知。
1. 管道
管道的相關系統調用是pipe()、pipe2()。參考 http://man7.org/linux/man-pages/man2/pipe.2.html
管道會創建兩個fd,往fd[1]寫數據,那么fd[0]就會收到數據。那么只需要把fd[0]添加到epoll_wait()所監聽的io觀察者隊列里面,在工作線程需要通知的時候往fd[1]寫數據,即能在主線程的epoll里面監聽其他工作線程任務完成的通知。
2. 事件對象
事件對象的相關系統調用是eventfd()、eventfd2()。參考 http://man7.org/linux/man-pages/man2/eventfd.2.html
與管道不同的是eventfd()只會創建一個fd,事件對象的讀寫都通過這個fd。事件對象內部維護一個counter,往fd寫一個8字節的整數,會往counter加,而讀的時候會返回counter,如果counter為0,那么讀操作會阻塞住(fd為阻塞模式)。而這個fd也是可以交由epoll機制進行監聽的,那么也可以達到使用管道一樣的目的。
3. 使用哪個?
這里libuv創建異步io觀察者fd時,優先使用eventfd,如果系統不支持事件對象,就使用管道替代。看一下相關實現:
圖7-3-1
可以看到使用uv__eventfd2返回-1(errno = ENOSYS)時,uv__async_start里面使用管道替代了事件對象。而判斷系統是否支持eventfd,是通過__NR_eventfd2宏去判斷。
這里需要注意的是:使用宏進行判斷__NR_eventfd是否defined是在編譯期,而uv__async_start的執行是在運行期,也就是說,如果你在不支持事件對象的系統編譯之后,在支持事件對象的系統上運行,那么uv__eventfd2始終是返回-1的。
7.3.2 異步io觀察者
7.3.2.1 數據結構 —— struct uv__async
在6.1.3節講了持有io觀察者的結構體 uv_tcp_s,6.2.4節講了網絡io操作如何封裝成uv_tcp_t結構體、並構造對應的io_watcher,6.3.1和6.4節講了如何把io_watcher加進uv_loop_t default_loop_struct的watcher_queue隊列里。
那么類似於網絡io操作的io觀察者(uv__io_t io_watcher)由uv_tcp_s結構體來持有,這里要討論的異步io觀察者也是由一個數據結構(struct uv__async)持有的io觀察者。通過把持有的io觀察者(io_watcher)加進loop->watcher_queue隊列,來加進到epoll的觀察者隊列中。
看到6.1.1節中關於struct uv_loop_s default_loop_struct的截圖,發現uv_loop_s里面有個成員 struct uv__async async_watcher。這個就是管理統一異步io觀察者的數據結構,一個事件循環結構體(uv_loop_t)有且只有一個。類似於uv_tcp_s。
看一些uv__async的定義,也持有一個uv__io_t io_watcher,還有封裝了一個cb:
圖7-3-2
7.3.2.2 異步io觀察者的保存與回調
我們知道一個uv_tcp_t的io觀察者,是在用戶調用了網絡io之后,才加進到loop->watcher_queue里面的。那么這個異步io觀察者是在node啟動時,通過一連串調用node::Start() -> uv_default_loop() -> uv_loop_init() -> uv_async_init() -> uv__async_start(),最終調用uv__io_start(),把loop->async_watcher所持有的io_watcher加進loop->watcher_queue的。uv__async_start()也是創建事件對象/管道的地方,在上圖的7-3-1可以看到。
來看一下loop->async_watcher和loop->async_watcher.io_watcher封裝的回調函數。
圖7-3-3
可以看到loop->async_watcher.io_watcher->cb 是uv__async_io;
loop->async_watcher.cb 是uv__async_event。
7.2.2節講到worker線程完成w->work()之后,通過uv_async_send通知異步io觀察者,uv_async_send的操作就是往事件對象/管道寫東西,那么當io觀察者收到數據,uv_run()里面的epoll_wait()返回該io_watcher的fd時,uv__async_io會先被回調,在uv__async_io里面會進而調用uv__async_event。看下代碼:
圖7-3-4
uv__aysnc_io里面取出的wa就是loop->async_watcher,所以wa->cb就是uv__async_event。
7.4 線程池異步io之后的回調工作
講到uv__async_event這一步,我們回想一下此時應該執行什么處理:worker線程執行完了w->work()(其中w是提交線程池的請求結構體 uv__work),然后通知事件循環需要在主線程執行w->done(),而通知的這個過程就是通過 uv_async_send()往管道/事件對象寫數據,激活epoll_wait(),根據返回的fd,由loop->watchers映射表拿到異步io觀察者 —— loop->async_watcher.io_watcher,然后層層回調到uv__async_event,那么這個時候,我們是否要調用線程池完成了w->work()之后剩余的w->done()?
7.4.1 uv__async_event() 到 uv__work_done()
node里面多次使用void*[2]類型來維護一個鏈表,loop->async_handles也是。可以看到圖6-1-1。那么async_handles保存什么鏈表呢?
看到圖7-3-4,uv__async_event()就是從loop->async_handles鏈表里,取出struct uv_async_t結構類型的元素h,並調用回調函數h->async_cb()。
再看到圖7-3-3,uv_async_init()里面,往loop->async_handles里面添加了struct uv_async_t* t。7.3.2.2節講到的一系列調用流程有:uv_loop_init() -> uv_async_init(),看下uv_loop_init()調用uv_async_init()的代碼:
圖7-4-1
可以看到uv_loop_init()傳給uv_async_init()的uv_async_t 是loop->wq_async,而async_cb是uv__work_done。
所以最終異步io觀察者被激活之后,主線程回調到了uv__work_done()。uv__work_done在線程池模塊(deps/uv/src/threadpool.c)里面。
7.4.2 uv__work_done()
看一下uv__work_done()的代碼:
圖7-4-2
在7.1.2節就講了post()提交請求時,往全局隊列wq添加一個uv__work數據結構,那么最終uv__work_done()被調用的時候,從該wq取出所有w,執行w->done(),完成最終的回調。這里的w->done()就是7.1節中提到的fs__work_done()。
注意了,這里的uv__work_done()是在主線程執行的,也就是你的js代碼由始至終在同一個線程里面執行。