協程的原理及其在高並發服務中的應用


協程的原理和應用

 

協程的原理

協程(coroutine)跟具有操作系統概念的線程不一樣,實際上協程就是類函數一樣的程序組件,你可以在一個線程里面輕松創建數十萬個協程,就像數十萬次函數調用一樣。只不過函數只有一個調用入口起始點,返回之后就結束了,而協程入口既可以是起始點,又可以從上一個返回點繼續執行,也就是說協程之間可以通過 yield 方式轉移執行權,對稱(symmetric)、平級地調用對方,而不是像函數那樣上下級調用關系。當然 協程也可以模擬函數那樣實現上下級調用關系,這就叫非對稱協程(asymmetric coroutines)。

我們舉一個例子來看看一種對稱協程調用場景,大家最熟悉的“生產者-消費者”事件驅動模型,一個協程負責生產產品並將它們加入隊列,另一個負責從隊列中取出產品並使用它。為了提高效率,你想一次增加或刪除多個產品。偽代碼可以是這樣的:

# producer coroutine
loop
while queue is not full
  create some new items
  add the items to queue
yield to consumer
 
# consumer coroutine
loop
while queue is not empty
  remove some items from queue
  use the items
yield to producer

如果用多線程實現生產者-消費者模式,線程之間需要使用同步機制來避免產生全局資源的竟態,這就不可避免產生了休眠、調度、切換上下文一類的系統開銷,而且線程調度還會產生時序上的不確定性。

而對於協程來說,“掛起”的概念只不過是轉讓代碼執行權並調用另外的協程,待到轉讓的協程告一段落后重新得到調用並從掛起點“喚醒”,這種協程間的調用是邏輯上可控的,時序上確定的,可謂一切盡在掌握中。

當今一些具備協程語義的語言,比較重量級的如C#、erlang、golang,以及輕量級的python、lua、javascript、ruby,還有函數式的scala、scheme等。相比之下,作為原生態語言的 C 反而處於尷尬的地位,原因在於 C 依賴於一種叫做棧幀的例程調用,例程內部的狀態量和返回值都保留在堆棧上,這意味着生產者和消費者相互之間無法實現平級調用,當然你可以改寫成把生產者作為主例程然后將產品作為傳遞參數調用消費者例程,這樣的代碼寫起來費力不討好而且看起來會很難受,特別當協程數目達到十萬數量級,這種寫法就過於僵化了。

如果將每個協程的上下文(比如程序計數器)保存在其它地方而不是堆棧上,協程之間相互調用時,被調用的協程只要從堆棧以外的地方恢復上次出讓點之前的上下文即可,這有點類似於 CPU 的上下文切換,C 標准庫給我們提供了兩種協程調度原語:一種是 setjmp/longjmp,另一種是 ucontext 組件,它們內部(當然是用匯編語言)實現了協程的上下文切換,相較之下前者在應用上會產生相當的不確定性(比如不好封裝,具體說明參考聯機文檔),所以后者應用更廣泛一些,網上絕大多數 C 協程庫也是基於 ucontext 組件實現的。

我們知道 python 的 yield 語義功能類似於一種迭代生成器,函數會保留上次的調用狀態,並在下次調用時會從上個返回點繼續執行,例如:

def cols():
    for i in range(10):
        yield i

g=cols()
for k in g:
    print(k)

 

下面看看C語言的yiled語義是如何實現的:

int function(void) {
  static int i, state = 0;
  switch (state) {
    case 0: goto LABEL0;
    case 1: goto LABEL1;
  }
  LABEL0: /* start of function */
  for (i = 0; i < 10; i++) {
    state = 1; /* so we will come back to LABEL1 */
    return i;
    LABEL1:; /* resume control straight after the return */
  }
}

這是利用了static變量和goto跳轉來實現的,如果不用goto,而是直接利用switch的跳轉功能:

int function(void) {
  static int i, state = 0;
  switch (state) {
    case 0: /* start of function */
    for (i = 0; i < 10; i++) {
      state = 1; /* so we will come back to "case 1" */
      return i;
      case 1:; /* resume control straight after the return */
    }
  }
}

 

我們還可以用 __LINE__ 宏使其更加一般化:

int function(void) {
  static int i, state = 0;
  switch (state) {
    case 0: /* start of function */
    for (i = 0; i < 10; i++) {
      state = __LINE__ + 2; /* so we will come back to "case __LINE__" */
      return i;
      case __LINE__:; /* resume control straight after the return */
    }
  }
}

這樣一來我們可以用宏提煉出一種范式,封裝成組件:

#define Begin() static int state=0; switch(state) { case 0:
#define Yield(x) do { state=__LINE__; return x; case __LINE__:; } while (0)
#define End() }
int function(void) {
  static int i;
  Begin();
  for (i = 0; i < 10; i++)
    Yield(i);
  End();
}

這種協程實現方法有個使用上的局限,就是協程調度狀態的保存依賴於 static 變量,而不是堆棧上的局部變量,實際上也無法用局部變量(堆棧)來保存狀態,這就使得代碼不具備可重入性和多線程應用。如果將局部變量包裝成函數參數傳入的一個虛構的上下文結構體指針,然后用動態分配的堆來“模擬”堆棧,解決了線程可重入問題。但這樣一來反而有損代碼清晰,比如所有局部變量都要寫成對象成員的引用方式,特別是局部變量很多的時候很麻煩,再比如宏定義 malloc/free 的玩法過於托大,不易控制。

既然協程本身是一種單線程的方案,那么我們應該假定應用環境是單線程的,不存在代碼重入問題,所以我們可以大膽地使用 static 變量,維持代碼的簡潔和可讀性。事實上我們也不應該在多線程環境下考慮使用這么簡陋的協程,非要用的話,前面提到 glibc 的 ucontext 組件也是一種可行的替代方案,它提供了一種協程私有堆棧的上下文,當然這種用法在跨線程上也並非沒有限制,請仔細閱讀其文檔。

 

 

 


 

協程的並發應用

協程就是在單線程中使用同步編程思想來實現異步的處理流程,從而實現單線程能並發處理成百上千個請求,而且每個請求的處理過程是線性的,沒有使用晦澀難懂的callback機制來銜接處理流程。

 

基於事件驅動狀態機

傳統的網絡服務器(如nginx、squid等)都采用了 EDSM (event-driven state machine,事件驅動狀態機) 機制並發處理請求,這是一種異步處理的方式,通過使用callback 方法避免阻塞線程。

EDSM最常見的方式就是I/O事件的異步回調。基本上都會有一個叫做dispatcher的單線程主循環(又叫event loop),用戶通過向dispatcher注冊回調函數(又叫event handler)來實現異步通知,從而不必在原地空耗資源干等。在dispatcher主循環中通過select()/epoll()等系統調用來等待各種I/O事件的發生,當內核檢測到事件觸發並且數據可達或可用時,select()/epoll()會返回從而使dispatcher調用相應的回調函數來對處理用戶的請求。

整個過程都是單線程的。這種處理本質上就是將一堆相互獨立(disjoint)的回調實現同步控制,就像串聯在一個順序鏈表上。如下圖,黑色的雙箭頭表示I/O事件復用,回調是個筐,里面裝着對各種請求的處理(當然不是每個請求都有回調,一個請求也可以對應不同的回調),每個回調被串聯起來由dispatcher激活。這里請求等價於thread的概念(不是操作系統的線程),只不過“上下文切換”(context switch)發生在每個回調結束之時(假設不同請求對應不同回調),注冊下一個回調以待事件觸發時恢復其它請求的處理。至於dispatcher的執行狀態(execute state)可作為回調函數的參數保存和傳遞

 

 

 

異步回調的缺陷在於難以實現和擴展,雖然已經有libevent這樣的通用庫,以及其它actor/reacotor的設計模式及其框架,但正如Dean Gaudet(Apache開發者)所說:“其內在的復雜性——將線性思維分解成一堆回調的負擔(breaking up linear thought into a bucketload of callbacks)——仍然存在”。從上圖可見,回調之間請求例程不是連續的,比如回調之間的切換會打斷部分請求,又比如有新的請求需要重新注冊。

協程本質上仍然是基於EDSM模型,但旨在取代傳統的異步回調方式。協程將請求抽象為thread概念以更接近自然編程模式(所謂的linear thought吧,就像操作系統的線程之間切換那樣自然)。

 

 

 

下面介紹一種協程的實現方案:State Threads庫。

ST庫

ST (State Threads) 庫提供了一種高性能、可擴展服務器(比如web server、proxy server、mail agent等)的實現方案。

ST 庫簡化了multi-threading編程范式,每個請求對應一個線程,注意這里的線程其實是一種coroutine(協程),跟pthread那種內核線程不是一回事。

這里稍微解釋一下ST調度工作原理,ST運行環境維護了四種隊列,分別是IOQ(等待隊列)、RUNQ(運行隊列)、SLEEPQ(超時隊列)以及ZOMBIEQ。當每個thread處於不同隊列中對應不同的狀態(ST顧名思義所謂thread狀態機)。比如polling請求的時候,當前thread就加入IOQ表示等待事件(如果有timeout同時會被放到SLEEPQ中),當事件觸發時,thread就從IOQ(如果有timeout同時會從SLEEPQ)移除並轉移到RUNQ等待被調度,成為當前的running thread,相當於操作系統的就緒隊列,跟傳統EDSM對應起來就是注冊回調以及激活回調。再比如模擬同步控制wait/sleep/lock的時候,當前thread會被放入SLEEPQ,直到被喚醒或者超時再次進入RUNQ以待調度。

ST的調度具備性能與內存雙重優點:在性能上,ST實現自己的setjmp/longjmp來模擬調度,無任何系統開銷,並且context(就是jmp_buf)針對不同平台和架構用底層語言實現的,可移植性媲美libc。下面放一段代碼解釋一下調度實現:

/*
 * Switch away from the current thread context by saving its state 
 * and calling the thread scheduler
 */
#define _ST_SWITCH_CONTEXT(_thread)       \
    ST_BEGIN_MACRO                        \
    if (!MD_SETJMP((_thread)->context)) { \
      _st_vp_schedule();                  \
    }                                     \
    ST_END_MACRO
 
/*
 * Restore a thread context that was saved by _ST_SWITCH_CONTEXT 
 * or initialized by _ST_INIT_CONTEXT
 */
#define _ST_RESTORE_CONTEXT(_thread)   \
    ST_BEGIN_MACRO                     \
    _ST_SET_CURRENT_THREAD(_thread);   \
    MD_LONGJMP((_thread)->context, 1); \
    ST_END_MACRO
 
void _st_vp_schedule(void)
{
    _st_thread_t *thread;
 
    if (_ST_RUNQ.next != &_ST_RUNQ) {
        /* Pull thread off of the run queue */
        thread = _ST_THREAD_PTR(_ST_RUNQ.next);
        _ST_DEL_RUNQ(thread);
    } else {
        /* If there are no threads to run, switch to the idle thread */
        thread = _st_this_vp.idle_thread;
    }
    ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
 
    /* Resume the thread */
    thread->state = _ST_ST_RUNNING;
    _ST_RESTORE_CONTEXT(thread);
}

如果你熟悉setjmp/longjmp的用法,你就知道當前thread在調用MD_SETJMP將現場上下文保存在jmp_buf中並返回返回0,然后自己調用_st_vp_schedule()將自己調度出去。調度器先從RUNQ上找,如果隊列為空就找idle thread,這是在整個ST初始化時創建的一個特殊thread,然后將當前線程設為自己,再調用MD_LONGJMP切換到其上次調用MD_SETJMP的地方,從thread->context恢復現場並返回1,該thread就接着往下執行了。整個過程就同EDSM一樣發生在操作系統單線程下,所以沒有任何系統開銷與阻塞。

其實真正的阻塞是發生在等待I/O事件復用上,也就是select()/epoll(),這是整個ST唯一的系統調用。ST當前的狀態是,整個環境處於空閑狀態,所有threads的請求處理都已經完成,也就是RUNQ為空。這時在_st_idle_thread_start維護了一個主循環(類似於event loop),主要負責三種任務:1.對IOQ所有thread進行I/O復用檢測;2.對SLEEPQ進行超時檢查;3.將idle thread調度出去,代碼如下:

void *_st_idle_thread_start(void *arg)
{
    _st_thread_t *me = _ST_CURRENT_THREAD();
 
    while (_st_active_count > 0) {
        /* Idle vp till I/O is ready or the smallest timeout expired */
        _ST_VP_IDLE();
 
        /* Check sleep queue for expired threads */
        _st_vp_check_clock();
 
        me->state = _ST_ST_RUNNABLE;
        _ST_SWITCH_CONTEXT(me);
    }
 
    /* No more threads */
    exit(0);
 
    /* NOTREACHED */
    return NULL;
}

這里的me就是idle thread,因為_st_idle_thread_start就是創建idle thread的啟動點,每從上次_ST_SWITCH_CONTEXT()切換回來的時候,接着在_ST_VP_IDLE()里輪詢I/O事件的發生,一旦檢測到發生了別的thread事件或者SLEEPQ里面發生超時,再用_ST_SWITCH_CONTEXT()把自己切換出去,如果此時RUNQ中非空的話就切換到隊列第一個thread。這里主循環是不會退出的。

在內存方面,ST的執行狀態作為局部變量保存在棧上,而不是像回調需要動態分配,用戶可能分別這樣使用thread模式和callback模式:

/* thread land */
int foo()
{
    int local1;
    int local2;
    do_some_io();
}
 
/* callback land */
struct foo_data {
    int local1;
    int local2;
};
 
void foo_cb(void *arg)
{
    struct foo_data *locals = arg;
    ...
}
 
void foo()
{
    struct foo_data *locals = malloc(sizeof(struct foo_data));
    register(foo_cb, locals);
}

 

另外有兩點要注意,一是ST的thread是無優先級的非搶占式調度,也就是說ST基於EDSM的,每個thread都是事件或數據驅動,遲早會把自己調度出去,而且調度點是明確的,並非按時間片來的,從而簡化了thread管理;二是ST會忽略所有信號處理,在_st_io_init中會把sigact.sa_handler設為SIG_IGN,這樣做是因為將thread資源最小化,避免了signal mask及其系統調用(在ucontext上是避免不了的)。但這並不意味着ST就不能處理信號,實際上ST建議將信號寫入pipe的方式轉化為普通I/O事件處理,示例詳見這里

 

multi-threading編程范式

Posix Thread(以下簡稱PThread)是個通用的線程庫,它是將用戶級線程(thread)同內核執行對象(kernel execution entity,有些書又叫lightweight processes)做了1:1或m:n映射,從而實現multi-threading模式。例如,Apache服務器就是使用了PThread來實現並發請求的處理,每個線程處理一個請求,線程是以同步、阻塞的方式處理請求的,在線程的當前請求處理完成之前不會接受其它請求。

而ST是單線程(n:1映射),它的thread實際上就是協程(coroutine)。通常的網絡應用上,多線程范式繞不開操作系統,但在某些特定的服務器領域,線程間的共享資源會帶來額外復雜度,鎖、競態、並發、文件句柄、全局變量、管道、信號等,面對這些Pthread的靈活性會大打折扣。而ST的調度是精確的,它只會在明確的I/O和同步函數調用點上發生上下文切換,這正是協程的特性,如此一來ST就不需要互斥保護了,進而也可以放心使用任何靜態變量和不可重入庫函數了(這在同樣作為協程的Protothreads里是不允許的,因為那是stack-less的,無法保存上下文),極大的簡化了編程和調試同時增加了性能。

 

這里順便說一句,C語言實現的協程據我所知只有三種方式:

1、Protothread為代表利用switch-case語義跳轉;

2、以ST為代表不依賴libc的setjmp/longjmp上下文切換;

3、依賴glibc的ucontext接口(雲風的coroutine);

其中,Protothread最輕,但受限最大,ucontext耗資源性能慢,目前看來ST是最好使的。

 

 

 

 

 

總結

ST的核心思想就是利用multi-threading的簡單優雅范式勝過傳統異步回調的復雜晦澀實現,又利用EDSM的性能和解耦架構避免了multi-threading在系統上的開銷和暗礁。

ST的主要限制在於,應用程序所有I/O操作必須使用ST提供的API,因為只有這樣thread才能被調度器管理,並且避免阻塞。

其實最后在羅嗦一句,ngx_lua模塊也是利用coroutine簡化了Nginx流程的處理流程,每個請求對應一個lua coroutine,從而在coroutine內部完全使用線性的方式處理請求,避免了使用回調的異步寫法;

 

 

參考文檔:

http://state-threads.sourceforge.net/

http://state-threads.sourceforge.net/docs/faq.html

http://coolshell.cn/articles/12012.html

http://coolshell.cn/articles/10975.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM