https://zhuanlan.zhihu.com/p/30028047
Rust 是 Mozilla 推出的一門系統編程語言,非常看重內存安全,是一門非常優秀的語言。Mozilla 用它構建了其下一代的瀏覽器內核 servo,其工程能力毋庸置疑。
那么,Rust在服務端編程領域會有什么建樹呢?
我們從最簡單的服務端程序模型開始說:
工作上,我們經常會去寫一些服務,不管是經典的 http 服務,還是各種奇怪的中間件服務。但不論如何服務,其代碼上的大框架應該是下面這樣的:
pub fn run_unsleep() {
let worker = num_cpus::get();
let handlers: Vec<_> = (0..worker)
.into_iter()
.map(|_| thread::spawn(move || { loop { do_next(); } }))
.collect();
for h in handlers {
h.join().unwrap();
}
}
在上面的代碼里,我們開啟了足夠數量的工作線程,每一個線程獨占一顆 CPU(超線程)。然后,我們在線程內部,跑了一個循環。
這個循環其實要做的事情很簡單,就是去不斷的拿到下一個任務,然后去執行這個任務。
但是,這個事情似乎是不太完整的,比如我們將 do_next 聲明為如下:
fn do_next() { }
這其實就是一個典型的服務器端程序,只不過能做的事情有限(除了暖機還能干什么?)。
這個時候,你會發現你的 CPU 被占滿了,而且停不下來。
等,等等!
我們需要的是想辦法停下來。
需要知道,我們的服務器資源,尤其是 CPU 資源都是極度寶貴的資源。我們需要有辦法能主動讓出我們的 CPU,有什么辦法主動讓出 CPU 呢?當然不是讓你在外面 sigstop 和 sigcont 。而是需要讓我們的服務在沒有請求的情況下放棄 CPU 。一般情況下,我們會將我們的服務端循環等在一些系統調用上,最經典的當然是 accept(2) 了。
accept 是一個系統調用,我們通過這個調用接收新的 TCP 連接,每一個 TCP 連接都是一個 四元組 —— (源ip, 源端口, 目的ip, 目的端口)。當然了,其實在我們的網絡編程中一般都描述為五元組——畢竟世界上不止有 TCP 一種傳輸層協議。
不過我們的重點不在於四元組和五元組的區別,我們要知道一件事情,系統調用是可能會導致阻塞的,阻塞就一定會導致上下文切換。
進程狀態
我們在學習 OS 課的時候,老師都會給你們畫一個 進程的 三大狀態: 就緒,運行,掛起。三中狀態可以按照一個狀態轉換圖在某些條件下進行有序轉化。
而在 linux 下干脆更簡單的分為了 R (task_runnable/task_running) 和 非R狀態(包括 S、D、Z、X 等等)。這兩個狀態的進程之間相互轉化。
我們知道的,當一個 fd (file descriptor) 上的 某些操作不能被滿足 (比如read的時候沒數據,比如write的時候寫不進去)的時候,系統就會把一個 R 狀態的進程切換成 S 狀態的進程,並保留其現場(堆棧,寄存器等)以便恢復,直到這些特定條件被滿足了之后,由系統將這些 S 狀態的進程切換會R狀態。
我們知道,S 狀態的進程是不會消耗 CPU 時間的(當然 R 狀態的也不一定就在用CPU),而 accept 調用並阻塞了之后,當前線程便會被置為 S 狀態 ,恰恰不會使我們的CPU過頻。回想上面,我們發現 accept、read、write 都有這樣的功能。
於是我們得到了這樣的服務器程序:
pub fn run_accept_each() {
// 首先 bind and listen
let listener = Arc::new(TcpListener::bind("0.0.0.0:12345").unwrap());
let worker = num_cpus::get();
let mut ths = Vec::new();
for _ in 0..worker {
// 利用 Arc 共享資源
let listener = listener.clone();
let th = thread::spawn(move || {
loop {
// 在每個線程里去 accept
let (stream, _addr) = listener.accept().unwrap();
echo(stream);
}
});
ths.push(th);
}
join_all(ths);
}
fn join_all<T>(hs: Vec<thread::JoinHandle<T>>) {
for h in hs {
h.join().unwrap();
}
}
fn echo(mut stream: TcpStream) {
let mut buf = [0u8; 1024];
loop {
let rsize = match stream.read(&mut buf) {
Ok(size) => size,
Err(err) => panic!(err),
};
// EOF: read 0 size
if rsize == 0 {
break;
}
match stream.write_all(&buf[..rsize]) {
Ok(_) => {}
Err(err) => panic!(err),
}
}
}
在這里我們寫了一個最簡單的服務器端 echo 的程序。
然后這里我們就又遇到了一個問題,當內核接收到新的 tcp 請求的時候,是該如何去處理呢這些 accept 呢?在某些上古版本的鏈接里,我們知道了驚群問題。實際上驚群問題就是內核當時設計的時候沒有設計好。每次來一個 tcp 請求的時候會喚醒所有的 accept 調用,當然區別是有的線程拿到了這個 fd 有的沒有。但是,無意義的喚醒在系統層面上來說是極其浪費服務器資源的。不過呢,現在我們這個程序完全不需要擔心這件事兒,因 linux 早在 2.6 版本就改進過這個事情,每次只會在當前 accept 的調用列表里選一個去喚醒而不是頭很硬的去喚醒線程。
accept 與 reuseport
繼續上面的話題,我們知道,現在的內核里 accept 驚群其實是挺久遠的事情了。但是,不驚群並不代表着 accept 就完美無缺了。現在的互聯網服務,經常面臨着在短時間內接受了巨量的新連接的情況。不過別擔心,當然沒有那么多人閑着沒事兒用半開鏈接打你。但是,在巨量的短連接下,只能有一個 accept 響應,accept本身的速度就已經成為了瓶頸。那么怎么辦呢
於是我們有了 SO_REUSEPORT 這個選項。
reuseport 是干什么的呢?我們在學習算法的時候經常提到一個概念叫 ”分而治之“, reuseport 和這個概念很像。它在內核里維持了兩個巨大的 hash 表。第一層 hash 按照 dport 查表,第二層按照 (sip, sport) 查表。第二層 hash 表,將按照我們 reuseport 的 socket fd 數量分區。落到這區內的 tcp 連接將被單獨的進行 accept 。
這樣一來,一個 accept 變成了 N 個互不干擾的 accept,速度自然提升了 N 倍。
但是,這樣就可以了么?
我們知道的是,第二層 hash 表的分區將會隨着 socket fd 的數量進行變化,這一變化就出問題了。
假設我們現在有兩個 fd 互相 reuseport,將 (0, 100) 的 hash 區間平均分成了 (0,50) 和 (50, 100),那么現在,如果某個請求 A 被 hash 到了區間 45,半開連接已建立,但是這個時候突然又多了一個 socket fd 進來。內核為了負載均衡,將會改變這個 hash 環,現在為 (0, 33), (33, 66), (66, 100)。
這個時候,原先 45 區間上的鏈接實際上已經失去了對應的 socket,無法再被正確的路由,因為,server 會發送一個 reset 終止掉這個鏈接。但是,使用 reuseport 選項本身就是為了處理巨量請求的情況,大量的 connection reset 本身就是不可接受的。因此,我們建議使用 reuseport 的時候,只在服務初始化的時候初始化好固定數量的 fd,運行的時候就不要亂動了,就像我廠的 corvus 一樣。
現在嘛,因為 reuseport 的天然分治的特征,就有人動氣了心思:起足夠數量的 worker 線程,並且和 io 線程放到一起,那這樣不就不用寫多線程程序了么?
是。但是,這種模式其實並不是很好的,因為這種設計雖然在使用上天然避免了多線程競爭和復雜,但是如果單鏈接負載過高還是存在着單線程被直接打爆的風險。所以選型的時候,需要結合自己的業務特征來選定。
談談 epoll
上面我們說了 accept,其典型的工作模式就是來一個鏈接分配一個線程 ,然而我們的 CPU、內存 數量有限,不可能分配寶貴的資源給那么多的線程。而且線程多了其調度也是很大的問題,這就觸發了網絡編程里經典的 C10K 問題。當然,本文的重點不是來和大家探討線程調度和上下文切換的開銷,有喜歡的可以看一下 shell老師的[博客][1]。
工程師們為了解決 C10K 問題,長足發展了多路復用這門技術。簡單來說,就是一個線程上同時監聽多個 fd,從最開始的 select 到現在的 epoll,中間又是一個長長的故事,這里不展開講。
我們需要討論的是,accept 不會有驚群,那么 epoll 會有么?
答案是,現在大部分情況下會。
內核在新版本里加入了 EPOLLEXCLUSIVE 的選項(kernel 4.5),但是其實我們線上的服務的大部分內核版本還是很低的,沒有這個兼容就。。。
想修復很簡單,只需要更新內核(有的服務器可能還需要升級下系統),然后重啟下服務器就好了。
(當然了,這想想也是不可能的事兒)
那么如何避免呢?我們翻翻 nginx 的源碼,發現為了避免驚群效應, nginx 用了一個鎖,每次 epoll_wait 之前,都先進行搶鎖,那么這個時候,多個 nginx worker 之間互相等在了 lock 而不是 epoll_wait,這樣。實際上,每次就只有一個進程在進行 epoll_wait,實際上也就不存在什么驚群問題了 —— 群都沒有驚個啥。
mio
我們現在有了一系列的多路復用手段,但是麻煩事兒又來了:我們手段太多了。
尤其是當你寫的程序需要跨平台兼容的時候,問題就更大了。linux 上有 epoll,Unix 上有 kqueue, Windows 上有 iocp。各種技術需要一個統一的抽象層,於是 mio 應運而生。
mio 是 Rust 的一個多路復用+非阻塞調用的網絡庫,用比較小的額外開銷統一了全平台上的多路復用庫。當然了,它不是第一個做這個的庫,很多庫都做過這方面的嘗試,比如用力過猛的 boost::aio,比如被用在 chrome 上的 libevent。當然了,我估計很多寫 C 的人都是用的自己寫的兼容代碼,比如著名的redis。
mio 本身就是一層抽象,抽象的層級比較低,下面是一段典型的 mio 代碼(摘自mio自己的doc example):
use mio::*;
use mio::tcp::{TcpListener, TcpStream};
// Setup some tokens to allow us to identify which event is
// for which socket.
const SERVER: Token = Token(0);
const CLIENT: Token = Token(1);
let addr = "127.0.0.1:13265".parse().unwrap();
// Setup the server socket
let server = TcpListener::bind(&addr).unwrap();
// Create a poll instance
let poll = Poll::new().unwrap();
// Start listening for incoming connections
poll.register(&server, SERVER, Ready::readable(),
PollOpt::edge()).unwrap();
// Setup the client socket
let sock = TcpStream::connect(&addr).unwrap();
// Register the socket
poll.register(&sock, CLIENT, Ready::readable(),
PollOpt::edge()).unwrap();
// Create storage for events
let mut events = Events::with_capacity(1024);
loop {
poll.poll(&mut events, None).unwrap();
for event in events.iter() {
match event.token() {
SERVER => {
// Accept and drop the socket immediately, this will close
// the socket and notify the client of the EOF.
let _ = server.accept();
}
CLIENT => {
// The server just shuts down the socket, let's just exit
// from our event loop.
return;
}
_ => unreachable!(),
}
}
}
我們可以看到,在這段代碼里,mio 的核心便是 Poll, 而且代碼超級像 epoll 的使用方式。我們將 tcp socket fd 注冊為 Token(0) 的 fd,然后接受並直接關閉鏈接。
let _ = server.accept();
Token 即 fd 到 mio 的映射,我們會在mio的高速分配器(slab::Slab) 里存儲下這些 fd ,並進行合適的路由。每次觸發的一個 event,比如可讀可寫事件,都會拿到其對應的 token 。
這樣,由 fd 到 token 再對應上 event 的關系已經建立了,利用 mio 的 poll (會被轉換成對應的 syscall 如 epoll_wait 等),我們上面的暖機程序終於可以正確的用上 多路復用了。
高興不?
你高興的太早了。
mio 有個問題,它太接近底層了, 為了寫好一個正確的 io 庫,你需要一個寫一個復雜並且巨大的狀態機來處理 mio 的各種情況,它有着太多的不足。比如缺少 buffer 管理,比如 IO和業務邏輯耦合。
稍微一個不注意,mio 教你做人不只是說說而已的。
那么有沒有更好寫 mio 的方式呢?當然有了,就是 tokio 。
tokio 是基於 mio 和 futures 庫的更高一層抽象, 事實上,現階段,整個 mio 庫的存在就是完全為 tokio 准備的。
簡單來說,就是我們在寫 mio 的時候需要寫一個超大的超復雜的狀態機,futures 提供了一種讓你簡單寫狀態機的方式,二者結合,生了個娃娃就是 tokio。
那么,首先我們從 futures 開始。
futures
很多語言里都內置了(或者利用庫)支持了 future 、promise模式,Rust也不例外 —— futures 庫就提供了這種支持。
Future 是什么 ? 理論上,我們寫的一切程序動作都可以被視為一個 Future ,打開一個連接、從一個 FD 讀取一些數據、等待一個超時等等。
Future 的定義大概長這樣:
pub trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
其中,Poll是一個類型別名:
pub type Poll<T, E> = Result<Async<T>, E>;
表示一個 future 的結果。
我們可以預想到,一個 future 的結果大概有三種情況:
- 執行失敗:則 poll 返回 Result::Error<Self::Error> 。
- 執行還未完成但是還沒失敗:則 poll 返回 Result::Ok(Async::NotReady) 。
- 執行完成並且返回了數據: 則 poll 返回 Result::Ok(Async::Ready<Self::Item>) 。
並且,我們還需要保證的一點是:每次 poll 調用都必須是可重入的,因為你也不知道最后你的 poll 會被調用幾次。
那么怎么保證可重入呢?常見的辦法是狀態機。
這像是什么呢,像是你拿着一個記事本,將你的經歷都寫了下來,每次重入的都翻一下你的小本本,做到哪里了,然后接着上一次去做。
當然了,光有着一個一個獨立的 Future 是不行的,Future 還需要的是組合。
一個經典的 Future 組合是可以寫成如下代碼(示例代碼,不代表真的有這些 API):
let task = TcpResolver::resolve("192.168.123.123:12345")
.and_then(|tcp_addr| TcpConnection::connect(tcp_addr))
.and_then(|(conn, _remote_addr)| Io::read_to_string(conn))
.and_then(|data| serde_json::from_str(&data))
.and_then(|jobj| check_json(jobj))
.map_err(|err| log_err(err));
task.wait().unwrap();
這里我們頻繁調用 and_then 其實是只表達一個含義:如果調用無錯則繼續進行。
正確與否,當然是靠每個閉包的返回值是 Ok 還是 Err 來判定的了。
那么事情完了么?當然還沒完。
我們現在有了一系列的 Future 組合,但是,請看最下面: 我們調用了 task.wait() 。這個函數通過 executor模塊 spawn 並且循環等待任務完成(wait_future):
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
loop {
match self.poll_future_notify(&unpark, 0)? {
Async::NotReady => unpark.park(),
Async::Ready(e) => return Ok(e),
}
}
協程
這里有人會問了, executor 模塊是個什么玩意兒?
答:這是一個假裝自己是協程的模塊。
注意了,敲黑板,這個 executor 本質上就是一系列的狀態機組合,並沒有開任何的啥程來。不過,這里並不妨礙我們來分析下協程,因為 Rust 已經開始試驗並合並進了自己的 coroutine 實現。
協程是什么?協程在概念上可以認為是非搶占式的用戶態線程。當然有人要說協程是輕量級的線程我是不認的,因為你照樣可以把一個協程寫的和一個線程一樣甚至更重量級(內存,調度開銷)。
不過,rust 即將實現的這個協程和我們現在所熟知的 go 里面的 goroutine 還是有着本質上的不同:因為這個協程本身是 stackless 的。簡單來說就是,這個的協程並不會為單獨的協程開辟一個單獨的棧,當然也就沒有棧擴容,縮容,分裂等一系列東西了,性能也會相對的更好一點(代價就是損失了 M:N 運行的可能)。與之相對的,go 的 goroutine 就是 stackfull 的。關於 goroutine 的棧分析可以看我們往期的[文章][2]這里不展開分析。
剛才說了,協程是非搶占式的。為什么要這么設計?
這個我們就不得不需要從協程的前輩——線程說起。
我們知道,系統調度的最小單位是線程。因此,線程無論設計的如何搶占式,也不會因為兩個線程搶奪CPU造成其他線程餓死。但是協程不行,協程本身是建立在用戶態的,我們沒有辦法在系統級別強制的讓出當前的時間片給其他的協程用。因為一旦你讓出了時間片,系統就會自動把你所依附的線程改成非 R 的狀態,而讓其他沒肉吃的線程吃一口。
事情的真相就是:根本搶不到啊!
協程里,沒有 kernel 這個大佬鎮場子怎么辦?我們只能謙讓一點,誰在調用的時候發現可能會被卡住,就自動放棄掉當前的CPU好了。 比如,我要進行一個 io 操作,讀取一些數據。讀到了還則罷了,讀不到的時候,我們一般會返回一個 EAGAIN (std::io::Error::WouldBlock) 。告訴你,哎呀沒讀到,再試一次看看?這個時候就需要我們的程序主動將自己的標記為阻塞狀態,並啟動用戶態調度程序。
注意這里,從整個線程上來看,它沒有放棄任何CPU,但是我們執行的代碼實際上從一個協程切換到了下一個協程里。那么,如果我們讓一個協程里面調用某些 API 的時候(比如在協程里讀一個文件),使用阻塞 API ,那么一旦這個文件的沒有 ready ,整個線程很可能會被 kernel 強制給調度走。
畢竟吧,你內核大爺還是你大爺。
futures 這個庫吧,我承認它已經裝的非常像了,但是它缺少了一項至關重要的技能:保存現場。
前面我說了,Future 必須保證是可重入的,因為實際上每次 futures 調用的時候都是陷入調用,無法保存現場,無法保存現場也就導致了無法恢復現場,將保護現場的工作,直接交給了用戶。
mio 和 tokio
futures ,單獨的看這個庫其實是不夠完整的。為了要使用這個庫,你會發現你最終寫出來了一個特大號的調度器來,就為了單純的決定該喚醒誰,該阻塞誰。為了這個事兒, alex 同學親自寫了一個 CpuPool 的庫來實現了一個調度器。但是這個東西並不是我們的重點,我們的重點是另一個調度器:tokio。
上面說,協程需要自己將自己標記成阻塞狀態,並且啟動用戶態調度程序。
事實上,調度程序也是有一定要求的,最簡單的就是,將當前需要操作的 fd和需要被調度的協程關聯起來。當然,fd 這個概念,在 Rust 的 mio 庫里已經被抽象成了 Token。
tokio 通過將 socket fd 取值為 Token(fd2),然后將對應的futures任務隊列的 Token 標記為 Token(fd2+1) 的方式,把 Future 和 focket fd 對應起來。當 mio 收集到 epoll_wait 的事件的時候,會自動去尋找其 Token 對應的協程隊列,然后去執行。
當其調用陷入阻塞的時候,則會由用戶自己主動返回 Async::NotReady,調度器將會自動的將當前 fd 關聯的任務的運行狀態改掉阻止其再次進入。
這遍是 tokio 的整個運行模式。
不過這里又有一些問題,我們的 socket fd 上發生的事件可能不是不止一個的。那么,如何表示連續發生的事件呢? Future ?顯然不行。 於是,我們現在有了一種辦法能自由的將多個同類型的 Future 組合,並接受其順序發生,就組成了一個新的組合類型: Stream
Stream
Stream 是什么?
它表示的是一系列 Future 事件依次發生,構成一個流。我們可以不斷的從這個流中取出正確的執行結果,或者暫停這個流,直到終結。
其類型簽名大概如下:
pub trait Stream {
type Item;
type Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}
和 Future 類似,只不過將返回值類型改成了 Option ,因為 Stream 比 Future 多了一個狀態: 流終止。
和我們標注庫里表示 EOF 的辦法一樣——讀0——這里如果返回值是 Async::Ready(None) ,則表示整個流是終止的,后面不會再有值返回了。
那么 Stream 可以來表示什么呢?
一個 socket 上不斷來的連接,一個連接上不斷傳遞來的數據,一個定時器不斷觸發的 tick 等等,只要是連續發生的事件其實都可以表示成 Stream 的。
Stream 和 Framed
我們在寫網絡程序的時候,不光需要處理讀取,還需要正確的處理寫入。Futures 為了解決這個事情,專門引入了一個 trait :Sink 。
Stream 可以理解為上游數據來源, Sink 則是下游輸出。這樣,輸入和輸出都有了。
怎么組合呢?直接在寫么?當然不需要這么麻煩,tokio 為我們提供了兩個 Codec trait, tokio_io::Codec表示TCP 和 tokio_core::net::UdpCodec 表示UDP 。
比如我在 statsd-rs 里的實現:
pub struct RecvCodec;
impl UdpCodec for RecvCodec {
type In = Packet;
type Out = ();
fn decode(&mut self, _addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
debug!("get a new packet");
Ok(Packet::from(buf.to_vec()))
}
fn encode(&mut self, _out: Self::Out, _into: &mut Vec<u8>) -> SocketAddr {
unreachable!("never send back !");
}
}
當然這里只實現了 decode ,因為我這個服務是只接收而不回發的。實現了 UdpCodec ,我們可以簡單的通過 tokio 的 framed api 來將一個 TcpStream 的byte流,轉換成我們需要的 Packet 流。
pub fn run(ring: HashRing, bufs: Arc<Vec<MergeBuffer>>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = Self::build_socket(&*CONFIG.bind.clone(), &handle, true);
info!("worker: bind at {:?}", &CONFIG.bind);
let service = socket.framed(RecvCodec {}).flatten().for_each(|item| {
let pos = ring.position(&item.metric);
Ok(bufs[pos].push(item))
});
core.run(service).unwrap();
}
flatten 即我們在函數式語言里常見的 flatMap,這里其實就是將流展開然后一個一個的塞進后端處理隊列。
更高層次的組合
tokio_core ,這個庫從名字來看就知道了,這是一個非常基礎的庫。tokio 在這個基礎上,對 request/response 一一對應的這種特殊的網絡模式提供了更高級的兩個抽象庫,tokio-proto 和 tokio-service。
tokio-proto 規定了如何解析tcp數據流到基本的數據結構,當然了,用的就是 Codec 這個 trait。並且對 streaming 和 multiple 的請求流做了分解和兼容。 tokio-service 則利用 proto 的請求抽象出了服務接口。
具體的例子可以參考這里: https://tokio.rs/docs/getting-started/simple-server/
不過嘛,口號雖然喊得挺好的,但是實際上 tokio-proto 的局限性太大,靈活度不高,筆者我還是喜歡使用更底層的 tokio_core 庫。相對於高度抽象的上層世界來說,我還是喜歡底層的現充生活,程序員的控制欲?:-D。
例子
我這里根據 tokio 寫了幾個非常簡單的協議轉換工具,權當練手。
https://github.com/wayslog/lin-rs https://github.com/wayslog/statsd-rs
[1]: http://shell909090.org/blog/archives/2703/ 《上下文切換技術》
[2]: https://zhuanlan.zhihu.com/p/28409657 《聊一聊goroutine stack》
作者簡介
趙雪松,2016年5月份加入餓了么。人稱wayslog;Python3,雙引號,大括號不換行;Rust鐵粉,聯合著有《RustPrimer》。