https://www.jianshu.com/p/f4d853c0ef1e
在並發編程領域,一個非常讓程序員興奮,感到有成就感的事情就是做性能優化,譬如發現某個線程成為了單點瓶頸,然后上多線程。
提到了上多線程,那自然就會引入 thread pool,也就是我們通常說的線程池,我們會將任務扔給線程池,然后線程池里面自己會負責將任務派發到不同的線程去執行,除開任務自身執行的開銷,如何高效的派發也會決定一個線程池是否有足夠好的性能。下面,我們就來聊聊幾種常見的線程池的實現。
Mutex + channel
在 Rust 里面,我們可以通過標准庫提供的 channel 進行通訊,但 channel 其實是一個 multi-producer,single-consumer 的結構,也就是我們俗稱的 MPSC。但對於線程池來說,我們需要的是一個 MPMC 的 channel,也就是說,我們需要有一個隊列,這個隊列可以支持多個線程同時添加,同時獲取任務。
雖然單獨的 channel 沒法支持,但如果我們給 channel 的 Receiver 套上一個 Mutex,在加上 Arc,其實就可以了。通過 Mutex 我們能保證多個線程同時只能有一個線程搶到 lock,然后從隊列里面拿到數據。而加上 Arc 主要是能在多個線程共享了,這里就不說明了。
所以實現也就比較簡單了,如下:
pub struct ThreadPool { tx: Option<Sender<Task>>, handlers: Option<Vec<thread::JoinHandle<()>>>, } impl ThreadPool { pub fn new(number: usize) -> ThreadPool { let (tx, rx) = channel::<Task>(); let mut handlers = vec![]; let arx = Arc::new(Mutex::new(rx)); for _ in 0..number { let arx = arx.clone(); let handle = thread::spawn(move || { while let Ok(task) = arx.lock().unwrap().recv() { task.call_box(); } }); handlers.push(handle); } ThreadPool { tx: Some(tx), handlers: Some(handlers), } } }
Task 其實就是一個 FnBox,因為只有 nightly 版本支持 FnBox,所以我們自定義了一下
pub trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } pub type Task = Box<FnBox + Send>;
上面的代碼邏輯非常的簡單,創建一個 channel,然后使用 Arc + Mutex 包上 Receiver,創建多個線程,每個線程嘗試去獲取 channel 任務然后執行,如果 channel 里面沒任務,recv 哪里就會等着,而其他的線程這時候因為沒法拿到 lock 也會等着。
Condition Variable
拋開 channel,我們還有一種更通用的做法,可以用在不同的語言,譬如 C 上面,也就是使用 condition variable。關於 condition variable 的使用,大家可以 Google,因為在使用 condition variable 的時候,都會配套有一個 Mutex,所以我們可以通過這個 Mutex 同時控制 condition variable 以及任務隊列。
首先我們定義一個 State,用來處理任務隊列
struct State { queue: VecDeque<Task>, stopped: bool, }
對於不同線程獲取任務,我們可以通過
fn next_task(notifer: &Arc<(Mutex<State>, Condvar)>) -> Option<Task> { let &(ref lock, ref cvar) = &**notifer; let mut state = lock.lock().unwrap(); loop { if state.stopped { return None; } match state.queue.pop_front() { Some(t) => { return Some(t); } None => { state = cvar.wait(state).unwrap(); } } } }
首先就是嘗試用 Mutex 拿到 State,如果外面沒有結束,那么就嘗試從隊列里面獲取任務,如果沒有,就調用 Condition Variable 的 wait 進行等待了。
任務的添加也比較簡單
let &(ref lock, ref cvar) = &*self.notifer; { let mut state = lock.lock().unwrap(); state.queue.push_back(task); cvar.notify_one(); }
也是通過 lock 拿到 State,然后放到隊列里面,在通知 Condition Variable。對於線程池的創建,也是比較容易的:
let s = State { queue: VecDeque::with_capacity(1024), stopped: false, }; let notifer = Arc::new((Mutex::new(s), Condvar::new())); for _ in 0..number { let notifer = notifer.clone(); let handle = thread::spawn(move || { while let Some(task) = next_task(¬ifer) { task.call_box(); } }); handlers.push(handle); }
Crossbeam
上面提到的兩種做法,雖然都非常的通用,但有一個明顯的問題,就在於他是有全局 lock 的,在並發系統里面,lock 如果使用不當,會造成非常嚴重的性能開銷,尤其是在出現 contention 的時候,所以多數時候,我們希望使用的是一個 lock-free 的數據結構。
幸運的是,在 Rust 里面,已經有一個非常穩定的庫來提供相關的支持了,這個就是 crossbeam,關於 crossbeam 的相關知識,后面可以再開一篇文章來詳細說明,這里我們直接使用 crossbeam 的 channel,不同於標准庫的 channel,crossbeam 的 channel 是一個 MPMC 的實現,所以我們能非常方便的用到線程池上面,簡單代碼如下:
let (tx, rx) = channel::unbounded::<Task>(); let mut handlers = vec![]; for _ in 0..number { let rx = rx.clone(); let handle = thread::spawn(move || { while let Some(task) = rx.recv() { task.call_box(); } }); handlers.push(handle); }
可以看到,crossbeam 的 channel 使用比標准庫的更簡單,它甚至不需要 Arc 來包一層,而且還是 lock-free 的。
參考這個 benchmark,分別對不同的 ThreadPool 進行測試,在我的機器上面會發現 crossbeam 的性能會明顯好很多,標准庫 channel 其次,最后才是 condition variable。
test thread_pool::benchmark_condvar_thread_pool ... bench: 128,924,340 ns/iter (+/- 39,853,735)
test thread_pool::benchmark_crossbeam_channel_thread_pool ... bench: 1,497,272 ns/iter (+/- 355,120)
test thread_pool::benchmark_std_channel_thread_pool ... bench: 50,925,087 ns/iter (+/- 6,753,377)
Channel Per-thread
可以看到,使用 crossbeam 的效果已經非常好了,但這種實現其實還有一個問題,主要在於它有一個全局的隊列,當並發嚴重的時候,多個線程對這個全局隊列的爭搶,可能成為瓶頸。另外,還有一個問題在於,它的派發機制是任意的,也就是那個線程搶到了任務就執行,在某些時候,我們希望一些任務其實是在某個線程上面執行的,這樣對於 CPU 的 cache 來說會更加友好,譬如有一個任務在執行的時候,又會產生一個后續任務,自然,我們希望這個后續任務在同一個線程執行。
為了解決上面的問題,最直觀的做法就是每個線程一個隊列,這樣我們就能夠顯示的控制任務派發了。一個非常簡單的例子
let mut handlers = vec![]; let mut txs = vec![]; for _ in 0..number { let (tx, rx) = channel::unbounded::<Task>(); let handle = thread::spawn(move || { while let Some(task) = rx.recv() { task.call_box(); } }); txs.push(tx); handlers.push(handle); }
上面我們為每個線程創建了一個 channel,這樣每個線程就不用去爭搶全局的 channel 了。
派發的時候我們也可以手動派發,譬如根據某個 ID hash 到一個對應的 thread 上面,通過 Sender 發送 消息。
Work Stealing
雖然每個線程一個 channel 解決了全局爭搶問題,也提升了 CPU cache 的使用,但它引入了另一個問題,就是任務的不均衡。直觀的來說,就是會導致某些線程一直忙碌,在不斷的處理任務,而另一些線程則沒有任務處理,一直很閑。為了解決這個問題,就有了 Work Stealing 的線程池。
Work Stealing 的原理其實很簡單,當一個線程執行完自己線程隊列里面的所有任務之后,它會嘗試去其它線程的隊列里面偷一點任務執行。
因為 Work Stealing 的實現過於復雜,這里就不描述了,Rust 的 tokio 庫提供了一個 tokio-threadpool,就是基於 Work Stealing 來做的,不過現在只提供了 Future 的支持。
小結
上面簡單的列舉了一些線程池的實現方式,如果你只是單純的想用一個比較簡單的派發功能,基於 crossbeam 的就可以了,復雜一點的可以使用 Work Stealing 的。當然,這里只是大概列舉了一些,如果有更好的實現,麻煩跟我聯系討論,我的郵箱 tl@pingcap.com。
作者:siddontang
鏈接:https://www.jianshu.com/p/f4d853c0ef1e
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。