Rust-線程:使用消息傳遞在線程間傳送數據


一個日益流行的確保安全並發的方式是消息傳遞(message passing),這里線程或actor通過發送包含數據的消息來相互溝通。這個思想來源於Go編程語言文檔中的口號:“不要通過共享內存來通訊;而是通過通訊來共享內存。” ("Do not communicate by sharing memory; instead, share memory by communicating.")

Rust中一個實現消息傳遞並發的主要工具是通道(channel),Rust標准庫提供了其實現的編程概念。你可以將其想像為一個水流的通道,比如何流或小溪。如果你將諸如小船之類的東西放入其中,它們會順流而下到達下游。

編程中的通道有兩部分組成,一個發送者(transmitter)和一個接收者(receiver)。發送者位於上游戲位置,接收者則位於下游,代碼中的一部分調用發送者的方法以及希望發送的數據,另一部分則檢查接收端收到消息。當發送者或者接上者任一被丟棄時可以認為通道被關閉(closed)了。

這里,我們將開發一個程序,它會在一個線程生成值向通道發送,而在另一個線程生成值向通道發送,而在另一個線程會接收值並打印出來。這里會通過通道在線程間發送簡單值來展示這個功能。

首先,在示例1中,創建了一個通道但沒有做任何事。

let (tx,rx) = mpsc::channel();

 創建一個通道,並將其兩端賦值給tx和rx,注意這還不能編譯,因為Rust不知道我們想要在通道中發送什么類型:

error[E0282]: type annotations needed for `(Sender<T>, std::sync::mpsc::Receiver<T>)`
  --> src/main.rs:60:19
   |
60 |     let (tx,rx) = mpsc::channel();
   |         -------   ^^^^^^^^^^^^^ cannot infer type for type parameter `T` declared on the function `channel`
   |         |
   |         consider giving this pattern the explicit type `(Sender<T>, std::sync::mpsc::Receiver<T>)`, where the type parameter `T` is specified

這里使用 mpsc::channel 函數創建一個新的通道;mpsc是 多個生產者,單個消費者 (multiple producer, single consumer) 的縮寫。簡而言這,Rust標准庫實現通道的方式意味着一個通道可以有多個產生值的發送端,但只能有一人消費這些值的接收端。

mpsc::channel函數返回一個元組:第一個元素是發送端,而第二個元素是接收端。由於歷史原因,tx和rx通常作為發送者和接收者的縮寫。這里使用let語句是一個方便提取mpsc::channel返回的元組中的一部分的手段。

讓我們將發送端移動到一個新建線程中並發送一個字符串,這樣新建線程就可以和主線程通訊了。以下示例2,將tx移動到一個新建的線程中並發送"hi":

    let (tx,rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

這里再次使用thread::spawn來創建一個新線程並使用movetx移動到閉包中這樣新建線程就擁有tx了。新建線程需要擁有通道的發送端以便能向通道發送消息。

通道的發送端有一個send方法用來獲取需要放入通道的值。send方法返回一個Result<T,E>類型,所以如果接收端已經被丟棄,將沒有發送值的目標,所以發送操作會返回錯誤。

在示例3中,我們在主線程中從通道的接收端獲取值。這類似在河的下游撈起橡皮鴨或接收聊天信息:

    let (tx,rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
    let received = rx.recv().unwrap();
    println!("Got:{}", received);

 在主線程中接收並打印內容"hi"。

通道的接收端有兩個有用的方法:recvtry_recv。這里,我們使用了recv,這是receive的縮寫。這個方法會阻塞主線程執行直到從通道中接收一個值。一旦發送了一個值,recv會在一個Result<T,E>中返回它。當通道發送端關閉,recv會返回一個錯誤表明不會再有新的值到來了。

try_recv不會阻塞,相反它立刻返回一個Result<T,E>:ok值包含可用的信息,而Err值代表此時沒有任何消息。如果線程在等待消息過程中還有其他工作時使用try_recv很有用:可以編寫一個循環來頻繁調用try_recv,在有可用消息進行處理,其余時候則處理一會其他工作直到再次檢查。

出於簡單的考慮,這個例子使用了recv;主線程中除了等待消息之外沒有任何其他工作,所以阻塞主線程是合適的。

如果運行示例3的代碼,我們將會看到主線程打印出這個值:

Got: hi

通道與所有權轉移

所有權規則在消息傳遞中扮演了重要角色,其有助於我們編寫安全的並發代碼。防止並發編程中的錯誤是在Rust程度中考慮所有權的一大優勢。現在讓我們做一個試驗看看通道與所有權如何一同協作以避免產生問題:我們將嘗試在新建線程中的通道中發送完val之后再使用它。嘗試編譯示例4中的代碼並看看為何這是不允許的:

    let (tx,rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });
    let received = rx.recv().unwrap();
    println!("Got:{}", received);

示例4:在我們已經發送到通道后,嘗試使用val引用

 

這里嘗試在通過tx.send發送val到通道中之后將其打印出來。允許這么做是一個壞主意:一旦將值發送到另一個線程后,那個線程可能會在我們再次使用它之前就將其修改或者丟棄。其他線程對值可能的修改會由於不一致或不存在的數據而導致錯誤或意外的結果。然而,嘗試編譯示例4的代碼時,Rust會給出一個錯誤:

error[E0382]: borrow of moved value: `val`
  --> src/main.rs:57:31
   |
55 |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
56 |         tx.send(val).unwrap();
   |                 --- value moved here
57 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move

我們的並發錯誤會造成一個編譯時錯誤。send函數獲取其參數的所有權並移動這個值歸接收者所有。這可以防止在發送后再次意外的使用這個值;所有權系統檢查一切是否合乎規則。

發送多個值並觀察接收者的等待

示例3中的代碼可以編譯和運行。不過它並沒有明確的告訴我們兩個獨立的線程通過通道相互通訊。示例5則有一些改進會證明示例3中的代碼是並發執行的:新建線程現在會發送多個消息並在每個消息之間暫停一秒鍾。

    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1))
        }
    });
    for received in rx {
        println!("Got: {}", received);
    }

示例5:發送多個消息,並在每次發送后暫停一段時間 

 

這一次,在新建線程中有一個字符串vector希望發送到主線程。我們遍歷他們,單獨的發送每一個字符串並通過一個Duration值調用thread::sleep函數來暫停一秒。

在主線程中,不再顯示調用recv函數:而是將rx當作一個迭代器。對於每一個接收到的值,我們將其打印出來。當通道被關閉時,迭代器也將結束。

運行示例5的代碼,如下輸出,每一行都會暫停一秒:

 

Got: hi
Got: from
Got: the
Got: thread

 

因為主線程中的for循環里並沒有任何暫停或等待的代碼,所以可以說主線程是在等待從新建線程中接收值。

通過克隆發送者來創建多個生產者

之前我們提到了mpsc是multiple producer,single consumer的縮寫。可以運用mpsc來擴展示例5中的代碼來創建向同一接收者發送值的多個線程。這可以通過克隆通道的發送端來做到,如示例6所示:

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1))
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("message"),
            String::from("for"),
            String::from("you"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1))
        }
    });


    for received in rx {
        println!("Got: {}", received);
    }

示例6:從多個生間者發送多個消息

這一次,在創建新線程這前,我們對通道的發送端調用了clone方法。這會給我們一個可以傳遞給第一個新建線程的發送端句柄。我們會將原始的通道發送端傳遞給第二個新建線程。這樣就會有兩個線程,每個線程將向通道的接收端發送不同的消息。

如果運行這些代碼,我們可能會看到這樣的輸出:

Got: hi
Got: more
Got: message
Got: from
Got: for
Got: the
Got: thread
Got: you

雖然你可能會看到這些值以不同的順序出現;這依賴於你的系統。這也就是並發既有趣又困難的原因。如果通過thread::sleep做實驗,在不同的線程中提供不同的值,就會發現他們的運行更加不確定,且每次都會產生不同的輸出。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM