高效線程池(threadpool)的實現
Nodejs編程是全異步的,這就意味着我們不必每次都阻塞等待該次操作的結果,而事件完成(就緒)時會主動回調通知我們。在網絡編程中,一般都是基於Reactor線程模型的變種,無論其怎么演化,其核心組件都包含了Reactor實例(提供事件注冊、注銷、通知功能)、多路復用器(由操作系統提供,比如kqueue、select、epoll等)、事件處理器(負責事件的處理)以及事件源(linux中這就是描述符)這四個組件。一般,會單獨啟動一個線程運行Reactor實例來實現真正的異步操作。但是,依賴操作系統提供的系統調用來實現異步是有局限的,比如在Reactor模型中我們只能監聽到:網絡IO事件、signel(信號)、超時事件以及一些管道事件等,但這些事件也只是通知我們資源可讀或者可寫,真正的讀寫操作(read和write)還是同步的(也就是你必須等到read或者write返回,雖然linux提供了aio,但是其有諸多槽點),那么Nodejs的全異步是如何做到的呢?你可能會很快想到,就是啟用單獨的線程來做同步的事情,這也是libuv的設計思路,借用官網的一張圖,說明一切:

由上圖可以看到,libuv實現了一套自己的線程池來處理所有同步操作(從而模擬出異步的效果),下面就來看一下該線程池的具體實現吧!
一、線程池模型
說道線程池,在java領域中,jdk本身就提供了多種線程池實現,幾乎所有的線程池都遵循以下模型(任務隊列+線程池):

libuv自身定義了一個非常精煉、高效的隊列(雙向循環鏈表),只用了幾個簡單的宏定義將其實現,具體實現方式可以參見我的另一篇博文:libuv高效隊列的實現。現在隊列有了,來看一下task的定義:
1 struct uv__work {
2 void (*work)(struct uv__work *w);
3 void (*done)(struct uv__work *w, int status);
4 struct uv_loop_s* loop;
5 void* wq[2];
6 };
uv__work就代表一個task,可以看到里面有兩個函數指針(work代表任務實際操作,done用於對任務進行狀態確認)。wq成員就是一個QUEUE的節點, uv__work就是通過wq與其他 uv__work連接成一個隊列。
下面來看一下threadpool的初始化,代碼如下:
1 #define MAX_THREADPOOL_SIZE 128
2
3 static uv_once_t once = UV_ONCE_INIT;
4 static uv_cond_t cond;
5 static uv_mutex_t mutex;
6 static unsigned int idle_threads;//當前空閑的線程數
7 static unsigned int nthreads;
8 static uv_thread_t* threads;
9 static uv_thread_t default_threads[4];
10 static QUEUE exit_message;
11 static QUEUE wq;//線程池全部會檢查這個queue,一旦發現有任務就執行,但是只能有一個線程搶占到
12 static volatile int initialized;
13
14
15 static void init_once(void) {
16 unsigned int i;
17 const char* val;
18 // 線程池中的線程數,默認值為4
19 nthreads = ARRAY_SIZE(default_threads);
20 val = getenv("UV_THREADPOOL_SIZE");
21 if (val != NULL)
22 nthreads = atoi(val);
23 if (nthreads == 0)
24 nthreads = 1;
25 if (nthreads > MAX_THREADPOOL_SIZE)
26 nthreads = MAX_THREADPOOL_SIZE;
27
28 threads = default_threads;
29 if (nthreads > ARRAY_SIZE(default_threads)) {
30 // 分配線程句柄
31 threads = uv__malloc(nthreads * sizeof(threads[0]));
32 if (threads == NULL) {
33 nthreads = ARRAY_SIZE(default_threads);
34 threads = default_threads;
35 }
36 }
37 // 初始化條件變量
38 if (uv_cond_init(&cond))
39 abort();
40 // 初始化互斥鎖
41 if (uv_mutex_init(&mutex))
42 abort();
43
44 // 初始化任務隊列
45 QUEUE_INIT(&wq);
46
47 // 創建nthreads個線程
48 for (i = 0; i < nthreads; i++)
49 if (uv_thread_create(threads + i, worker, NULL))
50 abort();
51
52 initialized = 1;
53 }
上面的代碼中,一共創建了nthreads個線程,那么每個線程的執行代碼是什么呢?由線程創建代碼:uv_thread_create(threads + i, worker, NULL),可以看到,每一個線程都是執行worker函數,下面看看worker函數都在做什么:
1 /* To avoid deadlock with uv_cancel() it's crucial that the worker
2 * never holds the global mutex and the loop-local mutex at the same time.
3 */
4 static void worker(void* arg) {
5 struct uv__work* w;
6 QUEUE* q;
7
8 (void) arg;
9
10 for (;;) {
11 // 因為是多線程訪問,因此需要加鎖同步
12 uv_mutex_lock(&mutex);
13
14 // 如果任務隊列是空的
15 while (QUEUE_EMPTY(&wq)) {
16 // 空閑線程數加1
17 idle_threads += 1;
18 // 等待條件變量
19 uv_cond_wait(&cond, &mutex);
20 // 被喚醒之后,說明有任務被post到隊列,因此空閑線程數需要減1
21 idle_threads -= 1;
22 }
23
24 // 取出隊列的頭部節點(第一個task)
25 q = QUEUE_HEAD(&wq);
26
27 if (q == &exit_message)
28 uv_cond_signal(&cond);
29 else {
30 // 從隊列中移除這個task
31 QUEUE_REMOVE(q);
32 QUEUE_INIT(q); /* Signal uv_cancel() that the work req is
33 executing. */
34 }
35
36 uv_mutex_unlock(&mutex);
37
38 if (q == &exit_message)
39 break;
40
41 // 取出uv__work首地址
42 w = QUEUE_DATA(q, struct uv__work, wq);
43 // 調用task的work,執行任務
44 w->work(w);
45
46 uv_mutex_lock(&w->loop->wq_mutex);
47 w->work = NULL; /* Signal uv_cancel() that the work req is done
48 executing. */
49 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
50 uv_async_send(&w->loop->wq_async);
51 uv_mutex_unlock(&w->loop->wq_mutex);
52 }
53 }
可以看到,多個線程都會在worker方法中等待在conn條件變量上,一旦有任務加入隊列,線程就會被喚醒,然后只有一個線程會得到任務的執行權,其他的線程只能繼續等待。
那么如何向隊列提交一個task呢?看以下代碼:
1 void uv__work_submit(uv_loop_t* loop,
2 struct uv__work* w,
3 void (*work)(struct uv__work* w),
4 void (*done)(struct uv__work* w, int status)) {
5 uv_once(&once, init_once);
6 // 構造一個task
7 w->loop = loop;
8 w->work = work;
9 w->done = done;
10 // 將其插入任務隊列
11 post(&w->wq);
12 }
接着看post做了什么:
1 static void post(QUEUE* q) {
2 // 同步隊列操作
3 uv_mutex_lock(&mutex);
4 // 將task插入隊列尾部
5 QUEUE_INSERT_TAIL(&wq, q);
6 // 如果當前有空閑線程,就向條件變量發送信號
7 if (idle_threads > 0)
8 uv_cond_signal(&cond);
9 uv_mutex_unlock(&mutex);
10 }
有提交任務,就肯定會有取消一個任務的操作,是的,他就是uv__work_cancel,代碼如下:
1 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
2 int cancelled;
3
4 uv_mutex_lock(&mutex);
5 uv_mutex_lock(&w->loop->wq_mutex);
6
7 // 只有當前隊列不為空並且要取消的uv__work有效時才會繼續執行
8 cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
9 if (cancelled)
10 QUEUE_REMOVE(&w->wq);// 從隊列中移除task
11
12 uv_mutex_unlock(&w->loop->wq_mutex);
13 uv_mutex_unlock(&mutex);
14
15 if (!cancelled)
16 return UV_EBUSY;
17
18 // 更新這個task的狀態
19 w->work = uv__cancelled;
20 uv_mutex_lock(&loop->wq_mutex);
21 QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
22 uv_async_send(&loop->wq_async);
23 uv_mutex_unlock(&loop->wq_mutex);
24
25 return 0;
26 }
至此,一個線程池的組成以及實現原理都說完了,可以看到,libuv幾乎是用了最少的代碼完成了高效的線程池,這對於我們平時寫代碼時具有很好的借鑒意義,文中涉及到uv_req_t以及uv_loop_t等結構我都直接跳過,因為這牽扯到libuv的其他組件,我將在以后的源碼剖析中逐步闡述,謝謝你能看到這里。

