原文鏈接:https://dev.to/imaculate3/fearless-concurrency-5fk8 >
原文標題:That's so Rusty! Fearless concurrency
公眾號:Rust 碎碎念
翻譯: Praying
並發程序是指運行多個任務的程序(或看上去是多任務),即兩個及以上的任務在重疊的時間跨度內運行。這些任務由線程——最小的處理單元執行。在其背后,並不完全是多任務(並行)處理,而是線程之間以普通人無法感知的速度進行上下文快速切換。很多現代應用程序都依賴於這種錯覺,比如服務器可以在處理請求的同時等待其他請求。當線程間共享數據時可能會出很多問題,最常見的兩種 bug 是:競態條件和死鎖。
-
競態條件常發生於多個線程以不一致的順序訪問/修改共享數據。這對於那些必須以原子性執行的事務會造成很嚴重的影響。每次只能有一個線程訪問共享數據是必要的,並且無論線程以何種順序運行,程序都應該工作良好。
-
死鎖發生於兩個及以上的線程互相等待對方采取行動,由於資源的鎖定,沒有線程能夠獲取所需資源,從而導致無限掛起。比如,線程 1 需要兩個資源,它已經獲取了資源 1 的鎖並在等待線程 2 釋放資源 2,但是此時,線程 2 也在等待線程 1 釋放資源 1,這個時候就會發生死鎖。這種情況被稱作循環等待。如果這些線程可以在等待獲取鎖的同時並持有資源,且不支持搶占,死鎖就無法恢復。
在任何一門編程語言中要避免這些問題都需要大量的練習。當這些問題真的滲透到項目中,它們很難被檢測出來。最好的情況下,這些 bug 表現為崩潰和掛起,最壞的情況下,程序將以不可預測的方式運行。換句話說,無畏並發的保證不可輕視。恰好,Rust 宣稱可以提供這種保證,不是么?
Rust 並發(Rust Concurrency)
Rust 並發的主要構成是線程和閉包。
1. 閉包(Closures)
閉包是指能夠訪問在其所被定義的作用域內的變量的匿名函數。它們是 Rust 的函數式特性之一。它們可以被賦予變量,作為參數傳遞以及從函數中返回。它們的作用域僅限於局部變量,因此,不能暴露在 crate 之外。在語法上,除了沒有名字之外,它們和函數非常相似,參數在豎線括號(||)中傳遞,類型標注是可選的。和函數不同的是,閉包能夠從其被定義的作用域內訪問變量。這些被捕獲的變量可以以借用(borrow)或移動(move)的方式進入閉包,具體取決於變量的類型以及該變量如何在閉包中被使用。下面是一個閉包(被賦於print_greeting
)的例子,該閉包接收一個參數,並且捕獲變量generic_greeting
。
fn main() {
let generic_greeting = String::from("Good day,");
let print_greeting = |name| println!("{} {}!", generic_greeting, name);
let person = String::from("Crab");
print_greeting(person);
// println!("Can I use generic greeting? {}", generic_greeting);
// println!("Can I use person {}", person);
}
在上面的例子中,變量person
和generic_greeting
都被移動(move)到閉包當中,因此,在調用閉包之后,它們就不能再被使用了。取消最后兩行打印語句的注釋,程序將無法編譯。
2. 線程(Threads)
Rust 並發是通過生成(spawn)多個線程來實現的,這些線程運行處於無參數閉包中的不同任務。當一個線程被生成(spawn)時,返回類型時JoinHandle
類型,除非JoinHandle
被 joined,否則主線程不會等待它完成。因此,傳遞給線程的閉包必須拿到被捕獲變量的所有權以確保在線程最終執行的時候,這些被捕獲變量依然時有效的。這一點在下面的例子中有所體現。
use std::thread;
use std::time::Duration;
fn main() {
let t1 = thread::spawn(|| {
for i in 1..10 {
println!("Greeting {} from other thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("Greeting {} from main thread!", i);
thread::sleep(Duration::from_millis(1));
}
t1.join().unwrap();
}
我們得到了來自兩個線程的完整輸出,來自生成(spawn)線程的 9 個 greetings 和來自主線程的 4 個 greetings。
當注釋掉
t1.join().unwrap()
時,程序會在主線程執行完成后退出。你可以在Rust playground[1]上進行嘗試。
需要注意的是,多次運行程序得到的結果可能有所不同。這樣的不確定性也是並發的特點之一,也正如我們將要看到的那樣,這也是很多 bug 的根源。
就多任務處理而言,線程之間共享信息非常關鍵。在標准庫中,Rust 支持了兩種通信方式:消息傳遞(Message Passing)和共享狀態(Shared-State)。
1. 消息傳遞(Message Passing)
Rust 支持 channel,線程可以通過 channel 來發送和接收消息。一個例子就是多生產者單消費者(縮寫為mpsc
)channel。這種 channel 允許多個發送方和單個接收方通信,這些發送方和接收方可能處於不同的線程中。channel 在發送方結尾處獲取變量的所有權,並在接收方結尾處將其丟棄。下面的例子展示了消息是如何在兩個發送方和一個接收方之間的 channel 傳遞的。
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx1, rx) = mpsc::channel();
let tx2 = mpsc::Sender::clone(&tx1);
thread::spawn(move || {
println!("Sending message from thread 1");
tx1.send(String::from("Greeting from thread 1")).unwrap();
});
thread::spawn(move || {
println!("Sending message from thread 2");
tx2.send(String::from("Greeting from thread 2")).unwrap();
});
for recvd in rx {
println!("Received: {}", recvd);
}
}
第二個發送方通過對第一個發送方克隆(clone)產生。這里不需要 join 這些線程,因為接收方會阻塞直到收到消息,在那里等待線程執行完成。
2. 共享狀態(Shared state)
另一種通信方式是共享內存。消息傳遞固然很好,但是它受單個所有權限制。對象必須被移動(move)/克隆(clone)才能傳送到另一個線程,如果對象被移動(move),它就變為不可用,如果它被克隆(clone),在該對象上的任何更新都必須要通過消息傳遞來通信。解決這個問題的方案是多所有權( multiple ownership),你可以回顧smartpointers post[2]這篇文章(譯注: smartpointers 這篇文章已翻譯,譯文為Rust與智能指針
),多所有權( multiple ownership)通過引用計數智能指針Rc<T>
來實現。我們看到,通過和RefCell<T>
組合使用,我們可以創建可變的共享指針。為什么不在一個多線程程序中使用它們呢?下面是一個嘗試:
fn main() {
let counter = Rc::new(RefCell::new(0));
let mut threads = vec![];
for _ in 0..5 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = (*counter).borrow_mut();
*num += 1;
});
threads.push(handle);
}
for thread in threads {
thread.join().unwrap();
}
println!("Result: {}", *counter.borrow());
}
counter
的共享指針( shared pointer )被發送到 5 個線程,每個線程都將其增加 1,在線程結束后打印出這個數字。上面這段代碼能運行么?
這段代碼無法編譯,但是錯誤信息給出了提示。
Rc<RefCell<T>>
不能在線程間安全地被傳遞。(上圖中)再往下就是原因了:它沒有實現Send
trait。看起來我們的意圖已經被正確地表達了,但是我們需要這些指針的線程安全版本。安全版本的替代選項是否存在?確實,Rc<T>
的替代選項是原子引用計數類型Arc<T>
。除了Rc<T>
的屬性(共享所有權)外,它還可以通過實現了Send
trait 在線程間安全地共享。對上面的代碼進行替換,我們應該可以能夠更接近正確的運行狀態。下面的代碼可以編譯嘛?
use std::cell::RefCell;
use std::thread;
use std::sync::Arc;
fn main() {
let counter = Arc::new(RefCell::new(0));
let mut threads = vec![];
for _ in 0..5 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = (*counter).borrow_mut();
*num += 1;
});
threads.push(handle);
}
for thread in threads {
thread.join().unwrap();
}
println!("Result: {}", *counter.borrow());
}
還是不行,現在我們有了一個和之前類似但是略微不同的錯誤。RefCell<T>
不能在線程間共享,因為它沒有實現Send
和Sync
trait。
如果
RefCell<T>
不適用於並發,那就必須要有一個線程安全的替代選項。事實上,Mutex(Mutex<T>
)就是這個替代選項。除了提供內部可變性之外,mutex 還可以在線程間共享。線程在訪問 mutex 對象之前必須先獲取一個鎖,以確保一次只有一個線程訪問。鎖定一個 mutex 會返回一個LockResult<MutexGuard<T>>
類型的智能指針。LockResult
是一個枚舉(enum),可以是Ok<T>
或Error
。簡單起見,我們通過調用unwrap()
將其導出,如果是Ok
,unwrap()
會返回其內部對象(這里是MutexGuard
),如果是Error
則會 panic。MutexGuard
是另一個智能指針,它可以被解引用以獲取其內部對象,當LockResult
離開作用域時,鎖會被釋放。在我們的代碼中,我們將RefCell<T>
替換為Mutex<T>
並更新其操作方式,更新后的代碼如下:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut threads = vec![];
for _ in 0..5 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num_guard = (*counter).lock().unwrap();
*num_guard += 1;
});
threads.push(handle);
}
for thread in threads {
thread.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
讓我們高興的是,它奏效了!

值得注意的是,Arc<T>
和Mutex<T>
的原子性是有性能開銷的。在單線程程序中也可以使用它們但這並非明智之舉。
了解了上述情況后,Rust 並發是否如其所聲稱的那樣無須畏懼(fearless)呢?答案就在於它處理大多數常見並發陷阱的方式,其中的一些上面已經介紹過了。
1. 競態條件(Race conditions)
從Mutable[3]這篇文章我們知道,數據競爭/不一致發生於兩個或更多的指針同時訪問相同的數據,其中至少有一個指針被用於寫入數據並且對數據的訪問沒有得到同步。通過保證一個 mutex 總是和一個對象關聯,進而保證對對象的訪問總是同步的(synchronized)。這一點和 C++不同,在 C++中,mutex 是一個單獨分開的實體,程序員必須人為地保證在訪問一個資源之前要獲取一個鎖。下面是一個關於不正確使用 mutex 如何引發不一致性的例子:
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std;
mutex mtx;
int counter = 0;
void increment_to_5(int id)
{
cout << "Executing from thread: " << id << endl;
if (counter < 5)
{
lock_guard<mutex> lck(mtx);
counter++;
}
}
int main()
{
thread threads[10];
for (int i = 0; i < 10; ++i)
threads[i] = thread(increment_to_5, i);
for (auto &thread : threads)
thread.join();
cout << "Final result: " << counter << endl;
return 0;
}
函數increment_to_5()
在不同的線程中被調用,這段代碼的期望是增加counter
變量直到它達到 5。
盡管只在臨界區(增加 counter)之前鎖定 mutex 是合理的,但是這個鎖應該在將counter
和 5 進行比較之前就應該獲取。否則,多線程可能會讀取一個過期的值,並且將其增加從而導致超過預期的 5。在獲取鎖之前添加一個小的延遲,這個結果就能很容易地復現出來,如下所示:
cout << "Executing from thread: " << id << endl;
if (counter < 5)
{
this_thread::sleep_for(chrono::milliseconds(1));
lock_guard<mutex> lck(mtx);
counter++;
}
}
counter
最終的值在每次運行之后都會不同。如果把這段代碼用 Rust 來寫就不會出現這種問題,因為counter
是一個 mutex 類型,並且在任何訪問之前都要先獲取鎖。下面的代碼運行多次之后將會產生相同的結果:
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut threads = vec![];
for i in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
println!("Executing from thread: {}", i);
let mut num_guard = (*counter).lock().unwrap();
if *num_guard < 5 {
thread::sleep(Duration::from_millis(1));
*num_guard += 1;
}
});
threads.push(handle);
}
for thread in threads {
thread.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
2. 死鎖
雖然 Rust 減輕了一些並發風險,但是它還不能夠在編譯期檢測死鎖。下面的例子展示了兩個線程如何在兩個 mutex 上發生死鎖:
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
let v1 = Arc::new(Mutex::new(0));
let v2 = Arc::new(Mutex::new(0));
let v11 = Arc::clone(&v1);
let v21 = Arc::clone(&v2);
let t1 = thread::spawn(move ||
{
println!("t1 attempting to lock v1");
let v1_guard = (*v11).lock().unwrap();
println!("t1 acquired v1");
println!("t1 waiting ...");
thread::sleep(Duration::from_secs(5));
println!("t1 attempting to lock v2");
let v2_guard = (*v21).lock().unwrap();
println!("t1 acquired both locks");
}
);
let v12 = Arc::clone(&v1);
let v22 = Arc::clone(&v2);
let t2 = thread::spawn(move ||
{
println!("t2 attempting to lock v2");
let v1_guard = (*v22).lock().unwrap();
println!("t2 acquired v2");
println!("t2 waiting ...");
thread::sleep(Duration::from_secs(5));
println!("t2 attempting to lock v1");
let v2_guard = (*v12).lock().unwrap();
println!("t2 acquired both locks");
}
);
t1.join().unwrap();
t2.join().unwrap();
}
從輸出結果中可以看出,在每個線程各自獲取第一個鎖后,程序被掛起了。

類似的方式,在消息傳遞時也會發生死鎖,如下面代碼所示:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
thread::spawn(move || {
println!("Waiting for item 1");
rx1.recv().unwrap();
println!("Sending item 2");
tx2.send(()).unwrap();
});
println!("Waiting for item 2");
rx2.recv().unwrap();
println!("Sending item 1");
tx1.send(()).unwrap();
}
因為每個 channel 在發送之前都要等待接收一個 item,所以兩個 channel 都會一直等待下去。

3. 循環引用(Reference cycles)
通過smart pointers post[4]這篇文章,我們知道,Rc
指針可能引發循環引用。Arc
指針對此也不能幸免,但是類似的,它可以通過 Atomic Weak 指針減少這種情況。
通過上面的觀察,可以說 Rust 的並發並非 100%的完美無暇。它在編譯期避免了競態條件(race conditions),但是無法避免死鎖和循環引用。 回過頭來看,這些問題並不是 Rust 獨有的,與大多數語言相比,Rust 的表現要好得多。Rust 中的並發不一定是無須畏懼(fearless)的,但它不那么可怕。
致謝
感謝太陽快遞員
、惰性氣體
、沒得
三位同學對fearless一詞翻譯提供的建議。
參考資料
Rust playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=0879538b56c60c89c4d083308424c701
[2]smartpointers post: https://dev.to/imaculate3/that-s-so-rusty-smart-pointers-245l
[3]Mutable: https://dev.to/imaculate3/that-s-so-rusty-mutables-5b40
[4]smart pointers post: https://dev.to/imaculate3/that-s-so-rusty-smart-pointers-245l