螞蟻集團 CeresDB 團隊 | Rust CPU Affinity 初探


https://rustmagazine.github.io/rust_magazine_2021/chapter_3/rust_cpu_affinity.html

Brief

在看 Apache Cassandra 的時候了解到 ScyllaDB 能在完全兼容它的情況下性能提升很多,通過進一步了解接觸到了 thread per core 這種架構,這篇文章從一個簡單的 cache 結構出發,實現了三個不同的方案,並對它們進行比較,最后給出了在這個過程中學習到的一些東西。

Thread Per Core 簡單來說就是將應用的每一個線程綁定到一個計算核心上,通過 sharding 的方式將計算拆解分配到對應的核上。這是一種 shared nothing 的方式,每個核單獨持有計算所需要的數據,獨立完成計算任務,從而避免掉多余的線程同步開銷。同時每個核心和工作線程一一對應,減少上下文切換的開銷。

在 waynexia/shard-affinity 中,我分別用普通的不做限制調度、local set 給計算任務分組以及 綁定任務、核心與線程三種方式實現同一個目的的 cache 結構。這三種實現分別對應 shard-affinity/load/src 目錄下的 threading-rslocal_set-rs 和 affinity-rs 三個文件。接下來將對這三種方法實現方法進行分析。下文提到的原始代碼都在這個倉庫里面,為了簡潔進行了部分省略。

Cache

假設我們有一個類似 Map<Id, Data> 的結構,它緩存了我們所需要的數據,請求分為對它進行 append() 或者 get(),通過讀寫鎖進行線程同步並提供內部可變性,對外暴露 &self 的接口。

 

pub struct CacheCell { items: RwLock<Map<Id, RwLock<Item>>>, } impl CacheCell { pub fn get(&self, id: Id, size: usize) -> Option<Bytes>{} pub fn append(&self, id: usize, bytes: Bytes) {} } 

首先為了能讓多個任務在同時操作 cache 的時候仍能得到符合預期的結果,我們可以使用 lock-free 的結構,或者對它加上一把鎖將並發的操作串行化。而我們發現對不同的 id 進行的操作並不會互相影響。所以可以將線程同步所影響的結構粒度變小,以這個 cache 所參考的 gorilla in-memory data structure 為例,將 id 分為進行分組,由對應的 cell 進行管理。將鎖的粒度變小,以支持更高的並發操作。

圖一,from Gorilla paper Fig.7: Gorilla in-memory data structure.

選這個結構作為實例有兩個原因,首先這是一個實際生產系統所使用的結構,比較有實際意義;並且它比較簡單易於實現,而且本身就已經對 id 進行了 sharding,方便進行后續的使用。

Threading

先來看比較常見的做法,拿若干個 cache 放一起合成一個 Vec<Cache>>,根據每次請求的 id 路由到對應的 cache 進行操作。

 

impl ThreadingLoad{ pub fn append(&self, id: Id, bytes: Bytes) { self.shards[route_id(id)].append(id, bytes); } } 

而在使用的時候,則是開一個多線程的 tokio runtime,向里面 spawn 不同 id 的請求。

 

let rt = Builder::new_multi_thread().build(); let load = ThreadingLoad::new(); rt.spawn(async move { let id = random::<usize>(); load.append(id, bytes); }) 

在這之后,由 tokio 去調度任務執行,完成之后給我們結果,我們不用關心這個任務具體是怎樣被調度的,產生的計算發生在哪個核上。而且我們底下的所有結構都付出了代價讓它們 Send 和 Sync,也不用去擔心一個對象同時被多個東西操作會出現奇怪的結果。

LocalSet

這里是使用 tokio 的 LocalSet 來實現的。它能將指定的任務綁在同一個線程上進行執行。這樣子帶來的好處就是我們可以使用 !Send 的東西了。

具體來說,由上面我們知道不同的 id 之間的操作不會互相影響,所以能夠將鎖粒度變小。同樣的,不同 id 的任務計算所需要用到的數據也不會重疊,也就是避免了一份數據可能被多個內核同時訪問的場景,從而不需要考慮我們的修改對其他內核的可見性。基於這一點,之前付出的性能代價來給數據實現 Send 和 Sync 也可以被節省下來。比如引用計數可以從 Arc 變成 Rc,或者說所有為了保證可見性所加的指令屏障都可以去掉。

從實現來看,在我的這台有十六個邏輯核心的設備上,將所有的 shards 分給15個線程進行管理,另外一個來進行任務的分發,任務分發線程與其余每個線程之間都有一個 channel 來進行任務的傳輸。這里分發的任務有兩種:

 

enum Task { Append(Id, Bytes, oneshot::Sender<()>), Get(Id, usize, oneshot::Sender<()>), } 

每個里面包含對應任務所需要的參數,以及一個用於通知任務完成的 channel。每次請求到來時,任務分發線程組裝好所需要的參數,根據 id 發送給對應的執行線程,之后等待執行結果。

 

pub async fn append(&self, id: Id, bytes: Bytes) { let (tx, rx) = oneshot::channel(); let task = Task::Append(id, bytes, tx); self.txs[route_id(id)].send(task).unwrap(); rx.await.unwrap() } 

Affinity

在上面的實現中,我們只是將一組任務所需要的數據和計算綁在了一起,避免線程同步的開銷。在運行中核心之間負載不均衡的時候,能夠觀察到明顯的操作系統調度的行為。這樣子只減少了開始提到的兩種開銷中的一種,上下文切換的開銷仍然還在。操作系統的調度很多時候並不能明白應用的行為,所以在第三種方法中我們將每個線程與核綁定起來,或者是說告訴操作系統要去如何調度我們的線程。

線程的分配和上面 LocalSet 方案一樣,將 shards 分配到除了一個分發線程之外的其余線程中,並每個線程綁一個核。通過 core_affinity crate 來設置 cpu affinity

 

let core_ids = core_affinity::get_core_ids().unwrap(); core_affinity::set_for_current(_); 
 

for core_id in core_ids { thread::spawn(move || { core_affinity::set_for_current(core_id); }); } 

除了設置了 cpu affinity 之外,還有其他地方與上一種方案不同。首先這里在 channels 中分發的是已經構造好的 future,而不是分發參數之后再構造;其次這里的 runtime 是一個簡單的 FIFO 隊列;最后每個線程的 caches 通過 thread local storage 的方式存儲。

 

self.runtime.spawn(route_id(id), async move { thread_local! (static SHARD:AffinityShard = AffinityShard::new() ); SHARD.with(|shard| { shard.append(id, bytes); }); tx.send(()).unwrap(); }); 

這些區別只是單純展現實現差異,並且由於 cache 內部的內存還是采用的默認分配器從堆上分配,這里的 TLS 實際上也沒有起到什么作用,后文會繼續提到這個點。

在這種情況下,每個計算線程可以在一定程度上簡化成一個單線程模型進行考慮,整個系統也變成了非搶占式、協作的調度,利用 rust 的 coroutine 由任務自己在需要等待資源的時候通過 await yield 出來。除了之前提到的那些方面之外相信還有許多其他可以開發的空間。

以及這種 affinity 的方案也是一個能很好的在應用側進行 NUMA 實踐的場景,結合前面提到的 TLS,另一種方法就是使用一個感知 NUMA 的內存分配器。不過我的設備並不支持 NUMA,所以沒有進行進一步的測試。

Test

在 shard_affinity/src 下有三個 binary 代碼文件,分別是對三種情況進行的一個簡單的測試。工作負載的參數可以在 shard_affinity/src/lib.rs 下看到。在我的環境下,三個方案以 128 並發分別進行 1024 次寫以及 4096 次讀 16KB 的數據耗時如下。為了讓數據集中,將 id 的范圍設置到了 0 至 1023.

圖二,本地進行測試結果。縱坐標為延時(毫秒),越低越好。

可以看到,local set 和 affinity 兩種方案的表現並不如 threading 的好。初步分析時在 local set 和 affinity 兩種方案下都是由一個線程做入口進行任務生成和分發,即多出了額外的任務路由開銷,在測試的時候能看到 cpu 的負載也是一高多底,而且由於模擬的任務單個執行時間都比較短,路由線程也會更先到達瓶頸。

在將工作線程數都調整為 8 (邏輯核心數量的一半)之后,可以看到 threading 和 affinity 的差別有所減小。對於目前仍然存在的 gap,通過 flamegraph 分析可能是 affinity 需要對每個任務收發請求和結果帶來的.

圖三,調整 worker 數量之后的結果。縱坐標為延時(毫秒),越低越好。

由於所有的內存數據,即狀態都被預先分散到各個核上,因此對 sharding 的方案也有要求。當 affinity 由於熱點等原因出現負載不均衡時,進行 re-balance 一般會是一個比較耗時的操作,靈活性這方面不如 threading 模式。此外計算的分布方法也很重要,比如目前由一個線程向其他線程分發的方式就在測試中出現了問題。考慮到實際的系統計算負載的組成更加復雜,如何很好的分散計算任務也是需要慎重決定的。

Others

在 affinity 的實現中,為了展示大部分組件都是手造的簡單模型。而 thread per core 其實已經有許多優秀的框架能夠簡化在這種架構下開發的難度,比如開頭提到的 scylladb 所使用的框架 seastar,這篇文章的寫作過程中也參考了它們的很多文檔。rust 也有類似的框架 glommio,這是一個比較新的庫,前不久剛放出第一個比較正式的 release。

在 thread per core 架構下,除了應用的邏輯需要發生變化,許多常用的組件也都要產生改動,為了一般多線程場景設計的那些向線程同步付出了代價的結構如使用了 Arc 的地方是不是可以換成 Rc 等,這些都是需要考慮的。也希望能圍繞這個發展出很好的生態。

Conclusion

在簡單的對比過不同方法的實現和性能之后,從我的觀點來看 thread per core 是一個非常值得嘗試的方法,它能夠在某種程度上簡化開發時所考慮的場景,也很適合目前動輒幾十上百核的服務器,而且也有 scylladb 這種成熟的實踐。不過這個對於已經基本成型的系統來說所需要作的改動比較大。我們期望 thread per core 帶來的提升是通過減小同步開銷以及提高的緩存命中率實現更低的延時以及更平穩的性能,而且這些改動所能帶來的提升與增加的復雜度,工作量和風險性相比則需要進行權衡。

關於我們

我們是螞蟻智能監控技術中台的時序存儲團隊,我們正在使用 Rust 構建高性能、低成本並具備實時分析能力的新一代時序數據庫,歡迎加入或者推薦,目前我們也正在尋找優秀的實習生,也歡迎廣大應屆同學來我們團隊實習,請聯系:jiachun.fjc@antgroup.com


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM