我寫這篇短文的時候,正值Rust1.0發布不久,嚴格來說這是一門兼具C語言的執行效率和Java的開發效率的強大語言,它的所有權機制竟然讓你無法寫出線程不安全的代碼,它是一門可以用來寫操作系統的系統級語言,如果說新一代編程語言是什么,那就Rust了。
下面我注重介紹Rust的多線程編程是怎樣,其中大部分內容參考翻譯自Rust的官方文檔,請看:
Concurrency並發
在計算機科學上,並發Concurrency 和並行 parallelism是非常重要的話題,也是軟件產業一個熱門的話題。計算機CPU有了越來越多的的核,但很多程序員沒有准備好充分利用它們。
Rust的內存安全特性也應用於並發。Rust程序必須內存安全,沒有數據競爭。Rust的類型系統 很勝任這工作,很容易讓你理解在編譯時的並行代碼。
在談論Rust並發特色之前,了解一些東西很重要:Rust是一門足夠低級的語言,所有這些都由標准庫提供,而不是語言本身。這意味着如果你你不喜歡Rust處理並發的某些方面,你可以用其它方式來實現。 mio 就是這方面實踐的一個真實的例子。
背景: Send
和 Sync
並發很難解釋清楚。在Rust中,我們由一個強大的靜態的類型系統來幫助我們理解我們的代碼。Rust本身給我們兩個特性,幫助我們實現並發編程。
Send
第一個類是 Send
. 當類型 T
實現了 Send
, 它告訴編譯器這個類型的實例的所有權可以在多個線程之間安全傳遞。
實施強制的限制條件是很重要的。例如,如果我們由一個通道在兩個線程之間,我們可能會想在兩個線程之間傳遞數據。因此,我們要保證傳遞的數據類型要實現 Send
特性。
相反,如果我我們用FFI包裹了一個線程不安全的類庫,我們不會去實現 Send
, 編譯器會幫助我們確保它不會離開當前線程。
Sync
第二個特性是 Sync
. 當一個類型實現了 Sync
, 它向編譯器表明這個類型的數據在多線程並發是不可能導致內存的不安全。例如,由原子引用計數的不可變數據的共享是線程安全的。Rust提供了一個類型 Arc<T>
, 它實現了 Sync
, 因此它可以在線程之間共享。
這兩個特性運行你使用類型系統來保證你代碼在並發情況下的所有權。在解釋為什么之前,讓我們先創建一段並行的Rust代碼。
線程Threads
Rust標准庫中的線程,允許你並行的運行Rust代碼。下面是一個使用了 std::thread
的例子:
use std::thread; fn main() { thread::spawn(|| { println!("Hello from a thread!"); }); }
thread::spawn()
方法接受一個在新線程運行的閉包。spawn方法返回一個線程的處理對象,可以用來等待子線程結束和取得線程返回結果:
use std::thread; fn main() { let handle=thread::spawn(|| { "Hello from a thread!" }); println!("{}", handle.join().unwrap()); }
很多語言有執行多線程的能力,但是非常不安全。有很多關於如何防止共享狀態數據導致錯誤的書籍。Rust通過在編譯時防止數據競爭來解決這個問題。讓我們談論一下怎樣真正地在線程之間安全地共享數據。
安全地共享可變狀態
歸功於Rust的類型系統,我們與一個看似謊言的概念:安全地共享可變狀態。很多程序員都認同共享可變狀態是非常非常糟糕的。
有人曾經說過:
共享可變狀態是萬惡的根源。大多數語言嘗試從“可變”這個方向來解決這個問題,但Rust通過“共享”這方面來解決這個問題。
ownership system 幫助我們防止錯誤地使用指針,同樣也幫助我們排除數據競爭。數據競爭是並發編程中最恐怖地bug之一。
舉例說明,下面是一個Rust程序,里面有一個其它語言經常會出現地數據競爭。但是在Rust中是無法編譯通過地:
use std::thread; fn main() { let mut data=vec![1u32, 2, 3]; for i in 0..3 { thread::spawn(move|| { data[i] +=1; }); } thread::sleep_ms(50); }
編譯時,提示錯誤如下:
8:17 error: capture of moved value: `data`
data[i] += 1;
^~~~
在這種情況,從代碼我們知道我們的代碼應該是安全的,但是Rust不確定。事實上是不安全地,如果我們在多個線程中有 data
的引用,線程拿走了引用的所有權,我們就有了三個擁有者了。這是不行的。我們可以通過 Arc<T>
來改正,它是一個原子引用計數器指針。原子意思是在多線程共享是安全的。
Arc<T>
假定一個它的內容有多個所有權,但是仍然可以安全地共享。它假定它地內容是線程同步的。但在我們這中情況,我們向改變里面的數據。我們需要一個可以保證一次只能由一個線程改數據的類型。這個類型就是 Mutex<T>
。下面老師第二個版本的代碼。雖然依然編譯不通過,但是是不同原因:
use std::thread; use std::sync::Mutex; fnmain() { let mut data=Mutex::new(vec![1u32, 2, 3]); for i in 0..3 { let data=data.lock().unwrap(); thread::spawn(move|| { data[i] +=1; }); } thread::sleep_ms(50); }
下面是錯誤信息:
<anon>:9:9: 9:22 error: the trait `core::marker::Send` is not implemented for the type `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` [E0277]
<anon>:11 thread::spawn(move || {
^~~~~~~~~~~~~
<anon>:9:9: 9:22 note: `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` cannot be sent between threads safely
<anon>:11 thread::spawn(move || {
^~~~~~~~~~~~~
你看看, Mutex
由一個 lock
方法,方法的簽名是:
fn lock(&self) ->LockResult<MutexGuard<T>>
因為 MutexGuard<T>
沒有實現Send,我們不可以不能在線程之間傳輸這個對象,所以報錯。
我們可以使用 Arc<T>
來修正這個錯誤。下面是可以編譯通過的版本:
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let data=Arc::new(Mutex::new(vec![1u32, 2, 3])); for i in 0..3 { let data=data.clone(); thread::spawn(move|| { let mut data=data.lock().unwrap(); data[i] +=1; }); } thread::sleep_ms(50); }
我們調用了Arc的 clone()
方法 ,增加了內部的引用計數。它們返回只移動到了一個新的線程。我們細看一下線程的主體:
thread::spawn(move|| { let mut data=data.lock().unwrap(); data[i] +=1; });
首先,我們調用 lock()
, 取得了一個互斥鎖。因為可能會失敗,它返回一個結果Result<T, E>
, 因為只是舉例說明,我們直接調用 unwrap()
來獲取data的一個引用。真實代碼可能要寫更全面的代碼來作錯誤處理。因為我們現在有一個互斥鎖了,所以可以自由地改變數據。
最后,當線程運行,我們等一段時間,但是這是有點不切實際:等多久合適呢,很難猜測,這是程序運行CPU執行情況決定的。
一種更加精准的計時器是使用Rust標准庫提供的機制來實現線程同步。讓我們講一下這中機制: channels.
通道Channels
下面代碼是使用channel來同步,而不是漫無目的地等待:
1 use std::sync::{Arc, Mutex}; 2 use std::thread; 3 use std::sync::mpsc; 4 5 fn main() { 6 let data=Arc::new(Mutex::new(0u32)); 7 8 let (tx, rx) =mpsc::channel(); 9 10 for _ in 0..10 { 11 let (data, tx) = (data.clone(), tx.clone()); 12 13 thread::spawn(move|| { 14 letmutdata=data.lock().unwrap(); 15 *data+=1; 16 17 tx.send(()); 18 }); 19 } 20 21 for _ in 0..10 { 22 rx.recv(); 23 } 24 }
我們使用mpsc::channel()
方法類構造一個channel。我們用10個線程分別向通道發送一個簡單地()
然后在主線程接收。
send方法是泛型地,我們可以向通道發送任何類型地數據。
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) =mpsc::channel(); for _ in0..10 { let tx=tx.clone(); thread::spawn(move|| { let answer=42u32; tx.send(answer); }); } rx.recv().ok().expect("Could not receive answer"); }
一個 u32
數據被發送,因為我們可以復制一份。因此我們可以創建一個線程,叫它計算答案,然后通過將答案通過channel發送給我們。
Panics致命異常
一個 panic!
會使執行中地線程崩潰。你可以這樣寫:
use std::thread; let result=thread::spawn(move|| { panic!("oops!"); }).join(); assert!(result.is_err());
我們的線程返回了一個結果,我們可以通過這個返回結果檢查線程師父拋異常。