基於ASIO的協程與網絡編程


協程

協程,即協作式程序,其思想是,一系列互相依賴的協程間依次使用CPU,每次只有一個協程工作,而其他協程處於休眠狀態。協程可以在運行期間的某個點上暫停執行,並在恢復運行時從暫停的點上繼續執行。 
協程已經被證明是一種非常有用的程序組件,不僅被python、lua、ruby等腳本語言廣泛采用,而且被新一代面向多核的編程語言如golang rust-lang等采用作為並發的基本單位。 
協程可以被認為是一種用戶空間線程,與傳統的線程相比,有2個主要的優點:

  • 與線程不同,協程是自己主動讓出CPU,並交付他期望的下一個協程運行,而不是在任何時候都有可能被系統調度打斷。因此協程的使用更加清晰易懂,並且多數情況下不需要鎖機制。
  • 與線程相比,協程的切換由程序控制,發生在用戶空間而非內核空間,因此切換的代價非常小。

網絡編程模型

首先來簡單回顧一下一些常用的網絡編程模型。網絡編程模型可以大體的分為同步模型和異步模型兩類。

  • 同步模型:

同步模型使用阻塞IO模式,在阻塞IO模式下調用read等IO函數時會阻塞線程直到IO完成或失敗。同步模型的典型代表是thread per connection模型,每當阻塞在主線程上的accept調用返回時則創建一個新的線程去服務於新的socket的讀/寫。這種模型的優點是程序簡潔,編寫簡單;缺點是可伸縮性收到線程數的限制,當連接越來越多時,線程也越來越多,頻繁的線程切換會嚴重拖累性能。

  • 異步模型:

異步模型一般使用非阻塞IO模式,並配合epoll/select/poll等多路復用機制。異步模型可以使一個線程同時服務於多個IO對象。異步模型的典型代表是reactor模型。在reactor模型中,我們將所有要處理的IO事件注冊到一個中心的IO多路復用器中(一般為epoll/select/poll),同時主線程阻塞在多路復用器上。一旦有IO事件到來或者就緒,多路復用器返回並將對應的IO事件分發到對應的處理器(即回調函數)中,最后處理器調用read/write函數來進行IO操作。異步模型的特點是性能和可伸縮性比同步模型要好很多,但是其結構復雜,不易於編寫和維護。在異步模型中,IO之前的代碼(IO任務的提交者)和IO之后的處理代碼(回調函數)是割裂開來的,雖然可以通過多線性來提供並發,但是又不得不通過安全隊列或加鎖的方式來保證邏輯層是單線程的。

協程與網絡編程

協程為克服同步模型和異步模型的缺點,並結合他們的優點提供了可能:

首先簡單認識一下調度器:一個線程運行一個調度器,可以在一個調度器上創建若干個協程。調度器負責調度這些協程。並且調度器在其內部維護了一個多路復用器(epoll/select/poll)。現在假設我們有3個協程A,B,C分別要進行數次IO操作。這3個協程運行在同一個調度器(線程)的上下文中,並依次使用CPU。協程A首先運行,當它執行到一個IO操作,但該IO操作並沒有立即就緒時,A將該IO事件注冊到調度器中,並主動放棄CPU。這時調度器將B切換到CPU上開始執行,同樣,當它碰到一個IO操作的時候將IO事件注冊到調度器中,並主動放棄CPU。調度器將C切換到cpu上開始執行。當所有協程都被“阻塞”后,調度器檢查注冊的IO事件是否發生或就緒。假設此時協程B注冊的IO事件已經就緒,調度器將恢復B的執行,B將從上次放棄CPU的地方接着向下運行。A和C同理。這樣,對於某個協程而言,我們采用的是同步的模型;但是對於整個調度器(線程)而言,實際上卻是異步的模型。好了,原理說完了,我們來看一個實際的例子,echo server。

echo server

在這個例子中,我們將使用orchid庫來編寫一個echo server。orchid庫是一個構建於boost基礎上的 協程/網絡IO 庫。

echo server首先必須要處理連接事件,我們創建一個協程來專門處理連接事件:

typedef boost::shared_ptr<orchid::socket> socket_ptr;
//處理ACCEPT事件的協程
void handle_accept(orchid::coroutine_handle co) {
    try {
        orchid::acceptor acceptor(co -> get_scheduler().get_io_service());//構建一個acceptor
        acceptor.bind_and_listen("5678",true);
        for(;;) {
            socket_ptr sock(new orchid::socket(co -> get_scheduler().get_io_service()));
            acceptor.accept(*sock,co);
            co -> get_scheduler().spawn(boost::bind(handle_io,_1,sock),orchid::minimum_stack_size());//在調度器上創建一個協程來服務新的socket。
                                            //第一個參數是要創建的協程的main函數,
                                            //第二個參數是要創建的協程的棧的大小。
        }
    }
    catch(boost::system::system_error& e) {
        cerr<<e.code()<<" "<<e.what()<<endl;
    }
}

在上面的代碼中,我們創建了一個acceptor,並讓它監聽5678端口,然后在"阻塞"等待連接到來,當連接事件到來時,創建一個新的協程來服務新的socket。處理套接字IO的協程如下:

在orchid中,協程的main函數必須滿足函數簽名void(orchid::coroutine handle),如handle accept所示,其中參數co是協程句柄,代表了當前函數所位於的協程。

//處理SOCKET IO事件的協程
void handle_io(orchid::coroutine_handle co,socket_ptr sock) {
    orchid::tcp_ostream out(*sock,co);
    orchid::tcp_istream in(*sock,co);
    for(std::string str;std::getline(in, str) && out;)
    {
        out<<str<<endl;
    }

}

IO處理協程首先在傳入的套接字上創建了一個輸入流和一個輸出流,分別代表了TCP的輸入和輸出。然后不斷地從輸入流中讀取一行,並輸出到輸出流當中。當socket上的TCP連接斷開時,輸入流和輸出流的eof標志為會被置位,因此循環結束,協程退出。

orchid可以使用戶以流的形式來操作套接字。輸入流和輸出流分別提供了std::istream和std::ostream的接口;輸入流和輸出流是帶緩沖的,如果用戶需要無緩沖的讀寫socket或者自建緩沖,可以直接調用orchid::socket的read和write函數。但是需要注意這兩個函數會拋出boost::system _ error異常來表示錯誤。

細心的讀者可能已經發現,handle io的函數簽名並不滿足void(orchid::coroutine handle),回到handle accept中,可以發現,實際上我們使用了boost.bind對handle io函數進行了適配,使之符合函數簽名的要求。

最后是main函數:

int main() {
    orchid::scheduler sche;
    sche.spawn(handle_accept,orchid::coroutine::minimum_stack_size());//創建協程
    sche.run();
}

在上面這個echo server的例子中,我們采用了一種 coroutine per connection 的編程模型,與傳統的 thread per connection 模型一樣的簡潔清晰,但是整個程序實際上運行在同一線程當中。

由於協程的切換開銷遠遠小於線程,因此我們可以輕易的同時啟動上千協程來同時服務上千連接,這是 thread per connection的模型很難做到的;由於整個底層的IO系統實際上是使用boost.asio這種高性能的異步io庫來實現的,而且協程切換的開銷非常小,所以能夠達到很高的性能;在伸縮性方面,可以使用scheduler per cpu的方式,即為每個CPU核心分配一個調度器,有多少個CPU核心就建立多少個調度器。

總結:通過協程,我們可以在保持同步IO模型簡潔性的同時,獲得近似於異步IO模型的高性能和伸縮性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM