https://rustcc.cn/article?id=e6d50145-4bc2-4f1e-84da-c39c8217640b
前提
這篇文章主要描述了Rust中異步的原理與相關的實現,Rust異步也是在最近的版本(1.39)中才穩定下來。希望可以通過這邊文章在提高自己認知的情況下,也可以給讀者帶來一些解惑。(來自於本人被Rust異步毒打的一些經驗之談).
閱讀這篇文章需要對操作系統,IO多路復用,以及一些數據結構有一定的概念。
老生常談,幾乎所有的語言中異步相關的解釋都是統一的:線程切換開銷大,且資源浪費(主要集中在內存上),這篇文章假定讀者已對這些情況已知曉。
Future
Future
字面的意思就是未來發生的事情,在程序中則代表了一系列暫時沒有結果的運算子,Future
需要程序主動去poll
(輪詢)才能獲取到最終的結果,每一次輪詢的結果可能是Ready
或者Pending
。
當Ready
的時候,證明當前Future
已完成,代碼邏輯可以向下執行;當Pending
的時候,代表當前Future
並未執行完成,代碼不能向下執行,看到這里就要問了,那什么時候才能向下執行呢,這里的關鍵在於Runtime
中的Executor
需要不停的去執行Future
的poll
操作,直至Future
返回Ready
可以向下執行為止。等等,熟悉Linux
的同學可能要說了,怎么感覺和Epoll
模型是非常的相似呢,沒錯,這確實非常相像(但是依然有些許不通,Future
可以避免空的輪詢),看樣子優秀的設計在哪里都可以看到類似的身影。為了實現Rust聲稱的高性能與零開銷抽象,這里做了一些優化,下面一一講述。
Future結構
pub enum Poll<T> { Ready(T), Pending, } pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; }
Future
的定義非常簡單,Output
代表了Future
返回的值的類型,而poll
方法是執行Future
的關鍵,poll
方法可以返回一個Poll
類型,Poll
類型是一個Enum
,包裝了Ready
和Pending
兩種狀態。
Context
Context
提供了對Future
進行調度的功能。目前Context
作為一個結構體,有一個核心成員Waker
,用來喚醒綁定的Future
. 未來不排除在Context
添加新的字段。
pub struct Context<'a> { waker: &'a Waker, // Ensure we future-proof against variance changes by forcing // the lifetime to be invariant (argument-position lifetimes // are contravariant while return-position lifetimes are // covariant). _marker: PhantomData<fn(&'a ()) -> &'a ()>, }
pub struct Waker { waker: RawWaker, } impl Waker { /// 喚醒綁定在 Waker 上的數據,通常是 Future pub fn wake(self) {} pub fn wake_by_ref(&self) {} pub fn will_wake(&self, other: &Waker) -> bool {} pub unsafe fn from_raw(waker: RawWaker) -> Waker {} } pub struct RawWaker { /// A data pointer, which can be used to store arbitrary data as required /// by the executor. This could be e.g. a type-erased pointer to an `Arc` /// that is associated with the task. /// The value of this field gets passed to all functions that are part of /// the vtable as the first parameter. data: *const (), /// Virtual function pointer table that customizes the behavior of this waker. vtable: &'static RawWakerVTable, } /// RawWaker 行為的虛函數表 pub struct RawWakerVTable { clone: unsafe fn(*const ()) -> RawWaker, wake: unsafe fn(*const ()), wake_by_ref: unsafe fn(*const ()), drop: unsafe fn(*const ()), }
Runtime
Runtime
由兩部分組成,Executor
和Reactor
。
Executor
為執行器,沒有任何阻塞的等待,循環執行一系列就緒的Future
,當Future
返回pending
的時候,會將Future
轉移到Reactor
上等待進一步的喚醒。
Reactor
為反應器(喚醒器),輪詢並喚醒掛載的事件,並執行對應的wake
方法,通常來說,wake
會將Future
的狀態變更為就緒,同時將Future
放到Executor
的隊列中等待執行。
執行流程
下面的序列圖大概簡單的描繪了Future
在Executor
和Reactor
之間來回轉移的流程與狀態變化。
sequenceDiagram
participant Executor
participant Reactor
activate Executor
Executor->>Reactor: Pending Future deactivate Executor Note left of Executor: Execute other Future activate Reactor Reactor->>Executor: Ready Future deactivate Reactor activate Executor deactivate Executor
上面說明了一個簡單的Future
的執行,如果是一個比較復雜的Future
的話,比如中間會有多次IO
操作的話,那么流程時怎么樣的呢?看下面一段代碼:(僅僅作為demo,不代表可以直接使用)
async fn read_and_write(s: TcpStream) { let (mut r, mut w) = s.split(); let mut buffer = r.read().await.unwrap(); buffer.append("Hello,world"); w.write_all(buffer.as_bytes()).await.unwrap(); }
對應的執行流程為:
sequenceDiagram
participant Executor
participant Reactor
activate Executor
deactivate Executor
Executor->>Reactor: Pending on r.read() Note left of Executor: Execute other Future activate Reactor Reactor->>Executor: r.read() is ready Note left of Executor: Execute current Future deactivate Reactor Executor->>Reactor: Pending on w.write_all() Note left of Executor: Execute other Future activate Reactor deactivate Reactor Reactor->>Executor: w.write_all() is ready
上面的這些例子系統中只展示了一個
Future
的執行情況,真實的生產環境中,可能有數十萬的Future
同時在執行,Executor
和Reactor
的調度模型要更復雜一些。
總結
一句話概括Runtime
,Future
不能馬上返回值的時候,會被交給Reactor
,Future
的值准備就緒后,調用wake
傳遞給Executor
執行,反復執行,直至整個Future
返回Ready
。
Executor
通常來說,Executor
的實現可以是單線程與線程池兩個版本,兩種實現間各有優劣,單線程少了數據的競爭,但是吞吐量卻容易達到瓶頸,線程池的實現可以提高吞吐量,但是卻要處理數據的競爭沖突。下面我們以async-std
來分析基於線程池的實現:
/// The state of an executor. struct Pool { /// 全局任務隊列 injector: Injector<Runnable>, /// 線程的本地隊列,用來進行任務的偷取 stealers: Vec<Stealer<Runnable>>, /// 存放空閑的線程,用來后續的喚醒並執行任務 sleepers: Sleepers, } // 全局的線程池 static POOL: Lazy<Pool> = Lazy::new(|| { let num_threads = num_cpus::get().max(1); let mut stealers = Vec::new(); // Spawn worker threads. for _ in 0..num_threads { let worker = Worker::new_fifo(); stealers.push(worker.stealer()); let proc = Processor { worker, slot: Cell::new(None), slot_runs: Cell::new(0), }; thread::Builder::new() .name("async-std/executor".to_string()) .spawn(|| { let _ = PROCESSOR.with(|p| p.set(proc)); abort_on_panic(main_loop); }) .expect("cannot start a thread driving tasks"); } Pool { injector: Injector::new(), stealers, sleepers: Sleepers::new(), } }); /// 工作線程的狀態 struct Processor { /// 本地任務隊列 worker: Worker<Runnable>, /// 存放了比本地隊列中任務優先級更高的任務,通常第一次spawn會放到這里, /// 執行一次poll來快速判斷狀態,對於無阻塞的任務更高效,不需要等待。 slot: Cell<Option<Runnable>>, /// How many times in a row tasks have been taked from the slot rather than the queue. slot_runs: Cell<u32>, } fn main_loop() { loop { match find_runnable() { Some(task) => task.run(); None => { // 實際上,這里根據空循環的次數,會陷入睡眠狀態或出讓CPU資源,直到新的task來喚醒。 } } } } fn find_runnable() -> Option<Task> { // 優先從本地的隊列中獲取 let task = get_local(); if task.is_some() { return task; } // 其次從全局隊列中獲取 let task = get_global(); if task.is_some() { return task; } // 最后嘗試從其他線程的本地隊列中偷取 steal_other() } /// 安排新的任務到Executor的執行隊列中 pub(crate) fn schedule(task: Runnable) { PROCESSOR.with(|proc| { // If the current thread is a worker thread, store it into its task slot or push it into // its local task queue. Otherwise, push it into the global task queue. match proc.get() { // 如果當前線程為worker線程,插入到當前線程的第一優先級任務槽 Some(proc) => { // Replace the task in the slot. if let Some(task) = proc.slot.replace(Some(task)) { // 嘗試把任務的優先級提升到最高,並把上一個優先級最高的任務放到當前線程任務隊列 // If the slot already contained a task, push it into the local task queue. proc.worker.push(task); POOL.sleepers.notify_one(); } } // 如果當前線程不是worker線程的話,放到全局隊列 None => { // 將任務放到全局隊列中 POOL.injector.push(task); // 嘗試喚醒一個睡眠的worker線程 POOL.sleepers.notify_one(); } } }) }
這里做了大量的簡化,整個Executor是一個線程池,每個線程都在不斷的尋找可執行的task,然后執行,然后再找下一個task,再執行,永遠重復。
從上面的main_loop中可以看到,cpu並不是一直毫無意義的空轉,中間會有一些策略來優化cpu的使用。
Reactor
Reactor
作為反應器,上面同時掛載了成千上萬個待喚醒的事件, 這里使用了mio
統一封裝了操作系統的多路復用API
。在Linux
中使用的是Epoll
,在Mac
中使用的則是Kqueue
,具體的實現在此不多說。
在Future的基礎上,出現了AsyncRead/AsyncWrite/AsyncSeek
等抽象來描述IO操作,在執行對應的Read/Write/Seek
操作時,如果底層的數據尚未准備好,會把所在的Future注冊至Reactor。Reactor的流程如下:
loop {
poll.poll(&events, timeout);
for event in events.iter() { if (event.is_readable()) { for waker in event.readers.wakers { waker.wake(); } } if (event.is_writeable()) { for waker in event.writers.wakers { waker.wake(); } } } }
Reactor
會不斷的poll
就緒的事件,然后依次喚醒綁定在事件上的waker
,waker
喚醒的時候會把對應的task
移動到Executor
的就緒隊列上安排執行。
結合
Executor
的運作原理不難發現,Executor
肯定不會poll
到未就緒的task
,因為只有就緒的任務才會被Reactor
放到Executor
的執行隊列中,Executor
的資源利用率再一次被提高,這就是整個異步體系的高明之處。
Stream
Future
是異步開發中最基礎的概念了,如果說Future
代表了一次性的異步值,那么Stream
則代表了一系列的異步值。Future
是1,Stream
是0,1或者N。 簽名如下:
pub trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }
Stream
對應了同步原語中的Iterator
的概念,回憶一下,是不是連簽名都是如此的相像呢!
pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; }
Stream
用來抽象源源不斷的數據源,當然也可以斷(當 poll
到 None
的時候)。可以用來抽象 Websocket Connection
讀取端,在Websokcet
中,服務端源源不斷的接受客戶端的值並處理,直至客戶端斷開連接。更進一步的抽象,MQ
中的Consumer
, Tcp
中接收方,都可以看作是一個Stream
, 因此Stream
的抽象對異步編程意義非凡。
思考: 除了上面的幾種情況,還有什么可以抽象成
Stream
模型呢?
Sink
有了代表一次性的異步值Future
, 也有了代表可重復的異步值的Stream
, 因此,需要有一個代表一次或多次的異步值的通道,也就是接下來的Sink
。通常來說, Sink
可以來抽象網絡連接的寫入端,消息隊列中的 Producer
。
pub trait Sink<Item> { type Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>; fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; }
在Sink的上層,我們可以封裝 send
以及 send_all
等方法,用來抽象對應的 Future
與 Stream
.
Timer
很多情況下,我們希望可以延時執行一些操作,比如定時發送郵件,每隔一段時間生成一次報表。我們首先想到不就是sleep
一段時間就行了,下面的代碼:
pub fn get_next_day() -> Time { thread::sleep(Duration::hour * 24); current_time() } pub fn run_every_hour() { for thread::sleep(Duration::hour) { do_something(); } }
是不是很機智呢!😂😂😂! 遺憾的是,我們寫完這段代碼,提交后,還沒上線,估計就要滾蛋了。因此,我們想要的是一個不阻塞當前線程的定時器,定時器到期自動喚醒並執行之后的操作。
不同於Tcp/Udp/Uds
,mio
沒有提供對Timer
的封裝。
通常來說,對定時器的處理要么是時間輪,要么堆,要么紅黑樹(時間復雜度更為平均O(log n)
)。時間輪比較典型的案例就是在Kafka
中的使用了,Go Runtime
用的則是堆,紅黑樹和堆的實現大致相同。
- 時間輪算法可以想象做鍾表,每一格存儲了到期的定時器,因此時間輪的最小精度為每一格所代表的時間(因此時間輪算法不適合用於對精度要求高的場景)。如果定時器的時間超過時間輪所能表示的時間怎么辦呢,也簡單,可以通過兩種方式來優化。
- 多級時間輪來優化,可以想象,在鍾表上,秒針每走一圈,分針走一格,同理分針走一圈,時針走一格,因此多級時間輪中,第一級的時間最為精確,第二級次之,第三級再次之..., 超過某一級時間輪所能表示的事件后,將定時器放到下一級時間輪中。
- 超過時間輪所能表示的時間范圍后,對時間取余,插入到余數所在的格子中,這樣一來,每個格子中存放的定時器需要加入輪數的記錄,用來表明還差多少輪才能執行。每個格子中在插入新的定時器時,可以使用堆來堆定時器進行排序。
- 堆定時器(紅黑樹定時器)
使用最小堆來維護所有的定時器。一個工作線程不斷的從堆里面尋找最近的定時器,如果定時器的時間比當前時間小,則喚醒該定時器對應的task,如果未達到設定的時間,則進行Thread::park(deadline-now)
操作,讓出當前cpu一段時間。
目前futures-timer的實現為全劇唯一的一個堆。存在可優化空間, 比如
Go 1.14
的實現,把定時器提交到當前worker thread的本地堆里面,用來避免鎖競爭,提高性能。
組合子
上面定義了實現異步的最基本概念,Future
, Stream
以及Sink
。
但是很多情況下,我們直接使用它們來構建我們的應用是非常困難的,例如:多個互為競爭關系的Future
,我們只需其中任意一個Future
返回即可,能想到的做法是,我們不斷的遍歷所有的Future
,直到某一個返回Ready
:
loop { for f in futures { if f.is_ready() { return f.output(); } } }
我們可以把上面的邏輯給包裝一下,提供一個名為select!(futures...)
的宏,select
便可作為一個組合子而存在。類似的組合子還有很多,比如join(futures...)
,等待所有Future
完成。
更多的可以參考futures-util
.
Async/Await
上面所有的概念共同組成了Rust
的異步生態,那么現在想象一下,如何獲取一個Future
運行的結果呢。一個可能的做法如下:
loop { match f::poll(cx) { Poll::Ready(x) => return x; Poll::Pending => {} } }
如果每次都要用戶這么做的話,將會是多么痛苦的一件事兒呀,還不如用注冊回調函數來實現異步呢!
有沒有更精煉的方式來獲取Future
的值呢,這就是async/await
出現的原因了。本質上來說,async/await
就是上面代碼段的一個語法糖,是用戶使用起來更加的自然。上面的代碼可以替換成:
let x = f.await;
是不是有非常大的簡化呢!
總結
雖然上面提到了各種各樣的概念,但是仔細捋一下,便會發現整個異步可以分為三層:
Future/Stream/Sink
,Reactor/Executor
直接作用於前面的三種類型。此層是為底層,一般用戶很少接觸,庫的開發者接觸較多。- 組合子層,為了提供更為復雜的操作,誕生了一系列的異步組合子,使得異步變得更利於使用,用戶會使用這些組合子來完成各種各樣的邏輯。
async/await
,准確的說,這層遠沒有上面兩層來的重要,但是依然不可或缺,這層使得異步的開發變得輕而易舉。
注意的地方
- 不要在任何異步函數中執行任何阻塞操作,不僅僅是
thread::sleep
, 還有標准庫的Tcp/Udp
, 以及sync
中的channel
,Mutex
,RWLock
都不應該繼續使用,除非你知道你在干什么!替換為async-std
與futures
中實現的版本。 - 如非必要,不要自己嘗試去實現
Future
,自己實現的沒有觸發wake
操作的話,將永遠不會喚醒,取而代之,用已經實現好的Future
進行組合。 - 使用
async/await
代替所有需要異步等待的點,這將會極大的簡化你的代碼。