微信 libco 協程庫原理剖析


微信 libco 協程庫原理剖析 https://mp.weixin.qq.com/s/sy26w9XVvQsQoVhbQoN3vQ

微信 libco 協程庫原理剖析

圖片

 

作者:alexzmzheng

同 Go 語言一樣,libco 也是提供了同步風格編程模式,同時還能保證系統的高並發能力,本文主要剖析 libco 中的協程原理。

簡介

  • libco 是微信后台大規模使用的 c/c++協程庫,2013 年至今穩定運行在微信后台的數萬台機器上。
  • libco 通過僅有的幾個函數接口 co_create/co_resume/co_yield 再配合 co_poll,可以支持同步或者異步的寫法,如線程庫一樣輕松。同時庫里面提供了 socket 族函數的 hook,使得后台邏輯服務幾乎不用修改邏輯代碼就可以完成異步化改造。
  • 開源地址:https://github.com/Tencent/libco

准備知識

協程是什么

  • 協程本質上就是用戶態線程,又名纖程,將調度的代碼在用戶態重新實現。有極高的執行效率,因為子程序切換不是線程切換而是由程序自身控制,沒有線程切換的開銷。協程通常是純軟件實現的多任務,與 CPU 和操作系統通常沒有關系,跨平台,跨體系架構。
  • 協程在執行過程中,可以調用別的協程自己則中途退出執行,之后又從調用別的協程的地方恢復執行。這有點像操作系統的線程,執行過程中可能被掛起,讓位於別的線程執行,稍后又從掛起的地方恢復執行。
  • 對於線程而言,其上下文切換流程如下,需要兩次權限等級切換和三次棧切換。上下文存儲在內核棧上。線程的上下文切換必須先進入內核態並切換上下文, 這就造成了嚴重的調度開銷。線程的結構體存在於內核中,在 pthread_create 時需要進入內核態,頻繁創建開銷大。

Linux 程序內存布局

Linux 使用虛擬地址空間,大大增加了進程的尋址空間,由低地址到高地址分別為:

  • 只讀段/代碼段:只能讀,不可寫;可執行代碼、字符串字面值、只讀變量
  • 數據段:已初始化且初值非 0 全局變量、靜態變量的空間
  • BSS 段:未初始化或初值為 0 的全局變量和靜態局部變量
  • 堆 :就是平時所說的動態內存, malloc/new 大部分都來源於此。
  • 文件映射區域 :如動態庫、共享內存等映射物理空間的內存,一般是 mmap 函數所分配的虛擬地址空間。
  • 棧:用於維護函數調用的上下文空間;局部變量、函數參數、返回地址等
  • 內核虛擬空間:用戶代碼不可見的內存區域,由內核管理(頁表就存放在內核虛擬空間)。

其中需要注意的是:棧和堆的這兩種不同的地址增長方向,棧從高到低地址增長。堆從低到高增長,后面協程切換中就涉及到該布局的不同。

棧幀

棧幀是從棧上分配的一段內存,每次函數調用時,用於存儲自動變量。從物理介質角度看,棧幀是位於 esp(棧指針)及 ebp(基指針)之間的一塊區域。每個棧幀對應着一個未運行完的函數。棧幀中保存了該函數的函數參數、返回地址和局部變量等數據。局部變量等分配均在棧幀上分配,函數結束自動釋放。

  • ESP:棧指針寄存器,指向當前棧幀的棧頂。
  • EBP:基址指針寄存器,指向當前棧幀的底部。

C 函數調用,調用者將一些參數放在棧上,調用函數,然后彈出棧上存放的參數。這里涉及調用約定,調用約定涉及參數的入棧順序(從左到右還是從右到左)、參數入棧和清理的是調用者(caller)還是被調用者(callee),函數名的處理。

  • 采用__cdecl 調用約定的調用者會將參數從右到左的入棧,最后將返回地址入棧。這個返回地址是指,函數調用結束后的下一行執行的代碼地址。(__cdecl is the default calling convention for C and C++ programs. Because the stack is cleaned up by the caller, it can do vararg functions. The __cdecl calling convention creates larger executables than __stdcall, because it requires each function call to include stack cleanup code. The following list shows the implementation of this calling convention. The __cdecl modifier is Microsoft-specific.)

關鍵數據結構

libco 的協程控制塊 stCoRoutine_t:

struct stCoRoutine_t
{
 stCoRoutineEnv_t *env;
 pfn_co_routine_t pfn;
 void *arg;
 coctx_t ctx;

 char cStart;
 char cEnd;
 char cIsMain;
 char cEnableSysHook;
 char cIsShareStack;

 void *pvEnv;

 //char sRunStack[ 1024 * 128 ];
 stStackMem_t* stack_mem;


 //save stack buffer while confilct on same stack_buffer;
 char* stack_sp;
 unsigned int save_size;
 char* save_buffer;

 stCoSpec_t aSpec[1024];
};
  • env:即協程執行的環境,libco 協程一旦創建便跟對應線程綁定了,不支持在不同線程間遷移,這里 env 即同屬於一個線程所有協程的執行環境,包括了當前運行協程、嵌套調用的協程棧,和一個 epoll 的封裝結構。這個結構是跟運行的線程綁定了的,運行在同一個線程上的各協程是共享該結構的,是個全局性的資源。
struct stCoRoutineEnv_t
{
 stCoRoutine_t *pCallStack[ 128 ];
 int iCallStackSize;
 stCoEpoll_t *pEpoll;

 //for copy stack log lastco and nextco
 stCoRoutine_t* pending_co;
 stCoRoutine_t* occupy_co;
};
  • pfn:實際等待執行的協程函數
  • arg:上面協程函數的參數
  • ctx:上下文,即 ESP、EBP、EIP 和其他通用寄存器的值
struct coctx_t
{
#if defined(__i386__)
 void *regs[ 8 ];
#else
 void *regs[ 14 ];
#endif
 size_t ss_size;
 char *ss_sp;
};
  • cStart、cEnd、cIsMain、cEnableSysHook、cIsShareStack:一些狀態和標志變量,后面會細說
  • pvEnv:保存程序系統環境變量的指針
  • stack_mem:協程運行時的棧內存,這個棧內存是固定的 128KB 的大小。
struct stStackMem_t
{
 stCoRoutine_t* occupy_co;
 int stack_size;
 char* stack_bp; //stack_buffer + stack_size
 char* stack_buffer;
};

stack_sp、save_size、save_buffer:這里要提到實現 stackful 協程(與之相對的還有一種 stackless 協程)的兩種技術:Separate coroutine stacks 和 Copying the stack(又叫共享棧)。這三個變量就是用來實現這兩種技術的。

實現細節上,前者為每一個協程分配一個單獨的、固定大小的棧;而后者則僅為正在運行的協程分配棧內存,當協程被調度切換出去時,就把它實際占用的棧內存 copy 保存到一個單獨分配的緩沖區;當被切出去的協程再次調度執行時,再一次 copy 將原來保存的棧內存恢復到那個共享的、固定大小的棧內存空間。

如果是獨享棧模式,分配在堆中的一塊作為當前協程棧幀的內存 stack_mem,這塊內存的默認大小為 128K。

如果是共享棧模式,協程切換的時候,用來拷貝存儲當前共享棧內容的 save_buffer,長度為實際的共享棧使用長度。

通常情況下,一個協程實際占用的(從 esp 到棧底)棧空間,相比預分配的這個棧大小(比如 libco 的 128KB)會小得多;這樣一來, copying stack 的實現方案所占用的內存便會少很多。當然,協程切換時拷貝內存的開銷有些場景下也是很大的。因此兩種方案各有利弊,而 libco 則同時實現了兩種方案,默認使用前者,也允許用戶在創建協程時指定使用共享棧。

生命周期

創建協程 Create coroutine

調用 co_create 將協程創建出來后,這時候它還沒有啟動,也即是說我們傳遞的 routine 函數還沒有被調用。實質上,這個函數內部僅僅是分配並初始化 stCoRoutine_t 結構體、設置任務函數指針、分配一段“棧”內存,以及分配和初始化 coctx_t。

  • ppco:輸出參數,co_create 內部為新協程分配一個協程控制塊,ppco 將指向這個分配的協程控制塊。
  • attr:指定要創建協程的屬性(棧大小、指向共享棧的指針(使用共享棧模式))
  • pfn:協程的任務(業務邏輯)函數
  • arg:傳遞給任務函數的參數
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
 if( !co_get_curr_thread_env() )
 {
  co_init_curr_thread_env();
 }
 stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
 *ppco = co;
 return 0;
}

啟動協程 Resume coroutine

在調用 co_create 創建協程返回成功后,便可以調用 co_resume 函數將它啟動了。

取當前協程控制塊指針,將待啟動的協程壓入 pCallStack 棧,然后 co_swap 切換到指向的新協程上取執行,co_swap 不會就此返回,而是要等當前執行的協程主動讓出 cpu 時才會讓新的協程切換上下文來執行自己的內容。

void co_resume( stCoRoutine_t *co )
{
 stCoRoutineEnv_t *env = co->env;
 stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
 if( !co->cStart )
 {
  coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
  co->cStart = 1;
 }
 env->pCallStack[ env->iCallStackSize++ ] = co;
 co_swap( lpCurrRoutine, co );
}

掛起協程 Yield coroutine

在非對稱協程理論,yield 與 resume 是個相對的操作。A 協程 resume 啟動了 B 協程,那么只有當 B 協程執行 yield 操作時才會返回到 A 協程。在上一節剖析協程啟動函數 co_resume() 時,也提到了該函數內部 co_swap() 會執行被調協程的代碼。只有被調協程 yield 讓出 CPU,調用者協程的 co_swap() 函數才能返回到原點,即返回到原來 co_resume() 內的位置。

在被調協程要讓出 CPU 時,會將它的 stCoRoutine_t 從 pCallStack 彈出,“棧指針” iCallStackSize 減 1,然后 co_swap() 切換 CPU 上下文到原來被掛起的調用者協程恢復執行。這里“被掛起的調用者協程”,即是調用者 co_resume() 中切換 CPU 上下文被掛起的那個協程。

void co_yield_env( stCoRoutineEnv_t *env )
{
 stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
 stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
 env->iCallStackSize--;
 co_swap( curr, last);
}
void co_yield_ct()
{
 co_yield_env( co_get_curr_thread_env() );
}
void co_yield( stCoRoutine_t *co )
{
 co_yield_env( co->env );
}
  • 同一個線程上所有協程是共享一個 stCoRoutineEnv_t 結構的,因此任意協程的 co->env 指向的結構都相同。

切換協程 Switch coroutine

上面的啟動協程和掛起協程都設計協程的切換,本質是上下文的切換,發生在 co_swap()中。

  • 如果是獨享棧模式:將當前協程的上下文存好,讀取下一協程的上下文。
  • 如果是共享棧模式:libco 對共享棧做了個優化,可以申請多個共享棧循環使用,當目標協程所記錄的共享棧沒有被其它協程占用的時候,整個切換過程和獨享棧模式一致。否則就是:將協程的棧空間內容從共享棧拷貝到自己的 save_buffer 中,將下一協程的 save_buffer 中的棧內容拷貝到共享棧中,將當前協程的上下文存好,讀取下一協程上下文。

協程的本質是,使用 ContextSwap,來代替匯編中函數 call 調用,在保存寄存器上下文后,把需要執行的協程入口 push 到棧上。

void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
  stCoRoutineEnv_t* env = co_get_curr_thread_env();

 //get curr stack sp
 char c;
 curr->stack_sp= &c;

 if (!pending_co->cIsShareStack)
 {
  env->pending_co = NULL;
  env->occupy_co = NULL;
 }
 else
 {
  env->pending_co = pending_co;
  //get last occupy co on the same stack mem
  stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
  //set pending co to occupy thest stack mem;
  pending_co->stack_mem->occupy_co = pending_co;

  env->occupy_co = occupy_co;
  if (occupy_co && occupy_co != pending_co)
  {
   save_stack_buffer(occupy_co);
  }
 }

 //swap context
 coctx_swap(&(curr->ctx),&(pending_co->ctx) );

 //stack buffer may be overwrite, so get again;
 stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
 stCoRoutine_t* update_occupy_co =  curr_env->occupy_co;
 stCoRoutine_t* update_pending_co = curr_env->pending_co;

 if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
 {
  //resume stack buffer
  if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
  {
   memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
  }
 }
}

這里起寄存器拷貝切換作用的 coctx_swap 函數,是用匯編來實現的。

coctx_swap 接受兩個參數,第一個是當前協程的 coctx_t 指針,第二個參數是待切入的協程的 coctx_t 指針。該函數調用前還處於第一個協程的環境,調用之后就變成另一個協程的環境了。

extern "C"
{
 extern void coctx_swap( coctx_t *,coctx_t* ) asm("coctx_swap");
};
.globl coctx_swap
#if !defined( __APPLE__ )
.type  coctx_swap, @function
#endif
coctx_swap:

#if defined(__i386__)
    movl 4(%esp), %eax
    movl %esp,  28(%eax)
    movl %ebp, 24(%eax)
    movl %esi, 20(%eax)
    movl %edi, 16(%eax)
    movl %edx, 12(%eax)
    movl %ecx, 8(%eax)
    movl %ebx, 4(%eax)


    movl 8(%esp), %eax
    movl 4(%eax), %ebx
    movl 8(%eax), %ecx
    movl 12(%eax), %edx
    movl 16(%eax), %edi
    movl 20(%eax), %esi
    movl 24(%eax), %ebp
    movl 28(%eax), %esp

 ret

#elif defined(__x86_64__)
 leaq (%rsp),%rax
    movq %rax, 104(%rdi)
    movq %rbx, 96(%rdi)
    movq %rcx, 88(%rdi)
    movq %rdx, 80(%rdi)
 movq 0(%rax), %rax
 movq %rax, 72(%rdi)
    movq %rsi, 64(%rdi)
 movq %rdi, 56(%rdi)
    movq %rbp, 48(%rdi)
    movq %r8, 40(%rdi)
    movq %r9, 32(%rdi)
    movq %r12, 24(%rdi)
    movq %r13, 16(%rdi)
    movq %r14, 8(%rdi)
    movq %r15, (%rdi)
 xorq %rax, %rax

    movq 48(%rsi), %rbp
    movq 104(%rsi), %rsp
    movq (%rsi), %r15
    movq 8(%rsi), %r14
    movq 16(%rsi), %r13
    movq 24(%rsi), %r12
    movq 32(%rsi), %r9
    movq 40(%rsi), %r8
    movq 56(%rsi), %rdi
    movq 80(%rsi), %rdx
    movq 88(%rsi), %rcx
    movq 96(%rsi), %rbx
 leaq 8(%rsp), %rsp
 pushq 72(%rsi)

    movq 64(%rsi), %rsi
 ret
#endif

退出協程

同協程掛起一樣,協程退出時也應將 CPU 控制權交給它的調用者,這也是調用 co_yield_env() 函數來完成的。

我們調用 co_create()、co_resume() 啟動協程執行一次性任務,當任務結束后要記得調用 co_free()或 co_release() 銷毀這個臨時性的協程,否則將引起內存泄漏。

void co_free( stCoRoutine_t *co )
{
    if (!co->cIsShareStack)
    {
        free(co->stack_mem->stack_buffer);
        free(co->stack_mem);
    }
    //walkerdu fix at 2018-01-20
    //存在內存泄漏
    else
    {
        if(co->save_buffer)
            free(co->save_buffer);

        if(co->stack_mem->occupy_co == co)
            co->stack_mem->occupy_co = NULL;
    }

    free( co );
}
void co_release( stCoRoutine_t *co )
{
    co_free( co );
}

補充

協程的調度

co_eventloop() 即“調度器”的核心所在。這里講的“調度器”,嚴格意義上算不上真正的調度器,只是為了表述的方便。libco 的協程機制是非對稱的,沒有什么調度算法。在執行 yield 時,當前協程只能將控制權交給調用者協程,沒有任何可調度的余地。Resume 靈活性稍強一點,不過也還算不得調度。如果非要說有什么“調度算法”的話,那就只能說是“基於 epoll/kqueue 事件驅動”的調度算法。“調度器”就是 epoll/kqueue 的事件循環。

void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
 if( !ctx->result )
 {
  ctx->result =  co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
 }
 co_epoll_res *result = ctx->result;


 for(;;)
 {
  int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );

  stTimeoutItemLink_t *active = (ctx->pstActiveList);
  stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);

  memset( timeout,0,sizeof(stTimeoutItemLink_t) );

  for(int i=0;i<ret;i++)
  {
   stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
   if( item->pfnPrepare )
   {
    item->pfnPrepare( item,result->events[i],active );
   }
   else
   {
    AddTail( active,item );
   }
  }


  unsigned long long now = GetTickMS();
  TakeAllTimeout( ctx->pTimeout,now,timeout );

  stTimeoutItem_t *lp = timeout->head;
  while( lp )
  {
   //printf("raise timeout %p\n",lp);
   lp->bTimeout = true;
   lp = lp->pNext;
  }

  Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );

  lp = active->head;
  while( lp )
  {

   PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
            if (lp->bTimeout && now < lp->ullExpireTime)
   {
    int ret = AddTimeout(ctx->pTimeout, lp, now);
    if (!ret)
    {
     lp->bTimeout = false;
     lp = active->head;
     continue;
    }
   }
   if( lp->pfnProcess )
   {
    lp->pfnProcess( lp );
   }

   lp = active->head;
  }
  if( pfn )
  {
   if( -1 == pfn( arg ) )
   {
    break;
   }
  }

 }
}

在關鍵數據結構 stCoRoutineEnv_t 中,有一個變量 stCoEpoll_t 類型的指針,即與 epoll 事件循環相關。

  • iEpollFd:epoll 實例的文件描述符
  • _EPOLL_SIZE:一次 epoll_wait 最多返回的就緒事件個數
  • pTimeout:時間輪定時器
  • pstTimeoutList:存放超時事件
  • pstActiveList:存放就緒事件/超時事件
  • result:epoll_wait 得到的結果集
struct stCoEpoll_t
{
 int iEpollFd;
 static const int _EPOLL_SIZE = 1024 * 10;
 struct stTimeout_t *pTimeout;
 struct stTimeoutItemLink_t *pstTimeoutList;
 struct stTimeoutItemLink_t *pstActiveList;
 co_epoll_res *result;
};

一般而言,使用定時功能時,我們首先向定時器中注冊一個定時事件(Timer Event),在注冊定時事件時需要指定這個事件在未來的觸發時間。在到了觸發時間點后,我們會收到定時器的通知。

網絡框架里的定時器可以看做由兩部分組成:

  • 第一部分是保存已注冊 timer events 的數據結構,第二部分則是定時通知機制。保存已注冊的 timer events ,一般選用紅黑樹,比如 nginx;另外一種常見的數據結構便是時間輪,libco 就使用了這種結構。
  • 第二部分是高精度的定時(精確到微秒級)通知機制,一般使用 getitimer/setitimer 這類接口,用 epoll/kqueue 這樣的系統調用來完成定時通知。

何時掛起何時恢復

libco 中有 3 種調用 yield 的場景:

  • 用戶程序中主動調用 co_yield_ct();
  • 程序調用了 epoll() 或 co_cond_timedwait() 陷入“阻塞”等待;
  • 程序調用了 connect(), read(), write(), recv(), send() 等系統調用陷入“阻塞”等待。

resume 啟動一個協程有 3 種情況:

  • 對應用戶程序主動 yield 的情況,這種情況也有賴於用戶程序主動將協程 co_resume() 起來;
  • epoll() 的目標文件描述符事件就緒或超時,co_cond_timedwait() 等到了其他協程的 co_cond_signal() 通知信號或等待超時;
  • read(), write() 等 I/O 接口成功讀到或寫入數據,或者讀寫超時。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM