微信 libco 協程庫原理剖析 https://mp.weixin.qq.com/s/sy26w9XVvQsQoVhbQoN3vQ
微信 libco 協程庫原理剖析
作者:alexzmzheng
同 Go 語言一樣,libco 也是提供了同步風格編程模式,同時還能保證系統的高並發能力,本文主要剖析 libco 中的協程原理。
簡介
- libco 是微信后台大規模使用的 c/c++協程庫,2013 年至今穩定運行在微信后台的數萬台機器上。
- libco 通過僅有的幾個函數接口 co_create/co_resume/co_yield 再配合 co_poll,可以支持同步或者異步的寫法,如線程庫一樣輕松。同時庫里面提供了 socket 族函數的 hook,使得后台邏輯服務幾乎不用修改邏輯代碼就可以完成異步化改造。
- 開源地址:https://github.com/Tencent/libco
准備知識
協程是什么
- 協程本質上就是用戶態線程,又名纖程,將調度的代碼在用戶態重新實現。有極高的執行效率,因為子程序切換不是線程切換而是由程序自身控制,沒有線程切換的開銷。協程通常是純軟件實現的多任務,與 CPU 和操作系統通常沒有關系,跨平台,跨體系架構。
- 協程在執行過程中,可以調用別的協程自己則中途退出執行,之后又從調用別的協程的地方恢復執行。這有點像操作系統的線程,執行過程中可能被掛起,讓位於別的線程執行,稍后又從掛起的地方恢復執行。
- 對於線程而言,其上下文切換流程如下,需要兩次權限等級切換和三次棧切換。上下文存儲在內核棧上。線程的上下文切換必須先進入內核態並切換上下文, 這就造成了嚴重的調度開銷。線程的結構體存在於內核中,在 pthread_create 時需要進入內核態,頻繁創建開銷大。
Linux 程序內存布局
Linux 使用虛擬地址空間,大大增加了進程的尋址空間,由低地址到高地址分別為:
- 只讀段/代碼段:只能讀,不可寫;可執行代碼、字符串字面值、只讀變量
- 數據段:已初始化且初值非 0 全局變量、靜態變量的空間
- BSS 段:未初始化或初值為 0 的全局變量和靜態局部變量
- 堆 :就是平時所說的動態內存, malloc/new 大部分都來源於此。
- 文件映射區域 :如動態庫、共享內存等映射物理空間的內存,一般是 mmap 函數所分配的虛擬地址空間。
- 棧:用於維護函數調用的上下文空間;局部變量、函數參數、返回地址等
- 內核虛擬空間:用戶代碼不可見的內存區域,由內核管理(頁表就存放在內核虛擬空間)。
其中需要注意的是:棧和堆的這兩種不同的地址增長方向,棧從高到低地址增長。堆從低到高增長,后面協程切換中就涉及到該布局的不同。
棧幀
棧幀是從棧上分配的一段內存,每次函數調用時,用於存儲自動變量。從物理介質角度看,棧幀是位於 esp(棧指針)及 ebp(基指針)之間的一塊區域。每個棧幀對應着一個未運行完的函數。棧幀中保存了該函數的函數參數、返回地址和局部變量等數據。局部變量等分配均在棧幀上分配,函數結束自動釋放。
- ESP:棧指針寄存器,指向當前棧幀的棧頂。
- EBP:基址指針寄存器,指向當前棧幀的底部。
C 函數調用,調用者將一些參數放在棧上,調用函數,然后彈出棧上存放的參數。這里涉及調用約定,調用約定涉及參數的入棧順序(從左到右還是從右到左)、參數入棧和清理的是調用者(caller)還是被調用者(callee),函數名的處理。
- 采用__cdecl 調用約定的調用者會將參數從右到左的入棧,最后將返回地址入棧。這個返回地址是指,函數調用結束后的下一行執行的代碼地址。(__cdecl is the default calling convention for C and C++ programs. Because the stack is cleaned up by the caller, it can do vararg functions. The __cdecl calling convention creates larger executables than __stdcall, because it requires each function call to include stack cleanup code. The following list shows the implementation of this calling convention. The __cdecl modifier is Microsoft-specific.)
關鍵數據結構
libco 的協程控制塊 stCoRoutine_t:
struct stCoRoutine_t
{
stCoRoutineEnv_t *env;
pfn_co_routine_t pfn;
void *arg;
coctx_t ctx;
char cStart;
char cEnd;
char cIsMain;
char cEnableSysHook;
char cIsShareStack;
void *pvEnv;
//char sRunStack[ 1024 * 128 ];
stStackMem_t* stack_mem;
//save stack buffer while confilct on same stack_buffer;
char* stack_sp;
unsigned int save_size;
char* save_buffer;
stCoSpec_t aSpec[1024];
};
- env:即協程執行的環境,libco 協程一旦創建便跟對應線程綁定了,不支持在不同線程間遷移,這里 env 即同屬於一個線程所有協程的執行環境,包括了當前運行協程、嵌套調用的協程棧,和一個 epoll 的封裝結構。這個結構是跟運行的線程綁定了的,運行在同一個線程上的各協程是共享該結構的,是個全局性的資源。
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ];
int iCallStackSize;
stCoEpoll_t *pEpoll;
//for copy stack log lastco and nextco
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};
- pfn:實際等待執行的協程函數
- arg:上面協程函數的參數
- ctx:上下文,即 ESP、EBP、EIP 和其他通用寄存器的值
struct coctx_t
{
#if defined(__i386__)
void *regs[ 8 ];
#else
void *regs[ 14 ];
#endif
size_t ss_size;
char *ss_sp;
};
- cStart、cEnd、cIsMain、cEnableSysHook、cIsShareStack:一些狀態和標志變量,后面會細說
- pvEnv:保存程序系統環境變量的指針
- stack_mem:協程運行時的棧內存,這個棧內存是固定的 128KB 的大小。
struct stStackMem_t
{
stCoRoutine_t* occupy_co;
int stack_size;
char* stack_bp; //stack_buffer + stack_size
char* stack_buffer;
};
stack_sp、save_size、save_buffer:這里要提到實現 stackful 協程(與之相對的還有一種 stackless 協程)的兩種技術:Separate coroutine stacks 和 Copying the stack(又叫共享棧)。這三個變量就是用來實現這兩種技術的。
實現細節上,前者為每一個協程分配一個單獨的、固定大小的棧;而后者則僅為正在運行的協程分配棧內存,當協程被調度切換出去時,就把它實際占用的棧內存 copy 保存到一個單獨分配的緩沖區;當被切出去的協程再次調度執行時,再一次 copy 將原來保存的棧內存恢復到那個共享的、固定大小的棧內存空間。
如果是獨享棧模式,分配在堆中的一塊作為當前協程棧幀的內存 stack_mem,這塊內存的默認大小為 128K。
如果是共享棧模式,協程切換的時候,用來拷貝存儲當前共享棧內容的 save_buffer,長度為實際的共享棧使用長度。
通常情況下,一個協程實際占用的(從 esp 到棧底)棧空間,相比預分配的這個棧大小(比如 libco 的 128KB)會小得多;這樣一來, copying stack 的實現方案所占用的內存便會少很多。當然,協程切換時拷貝內存的開銷有些場景下也是很大的。因此兩種方案各有利弊,而 libco 則同時實現了兩種方案,默認使用前者,也允許用戶在創建協程時指定使用共享棧。
生命周期
創建協程 Create coroutine
調用 co_create 將協程創建出來后,這時候它還沒有啟動,也即是說我們傳遞的 routine 函數還沒有被調用。實質上,這個函數內部僅僅是分配並初始化 stCoRoutine_t 結構體、設置任務函數指針、分配一段“棧”內存,以及分配和初始化 coctx_t。
- ppco:輸出參數,co_create 內部為新協程分配一個協程控制塊,ppco 將指向這個分配的協程控制塊。
- attr:指定要創建協程的屬性(棧大小、指向共享棧的指針(使用共享棧模式))
- pfn:協程的任務(業務邏輯)函數
- arg:傳遞給任務函數的參數
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() )
{
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}
啟動協程 Resume coroutine
在調用 co_create 創建協程返回成功后,便可以調用 co_resume 函數將它啟動了。
取當前協程控制塊指針,將待啟動的協程壓入 pCallStack 棧,然后 co_swap 切換到指向的新協程上取執行,co_swap 不會就此返回,而是要等當前執行的協程主動讓出 cpu 時才會讓新的協程切換上下文來執行自己的內容。
void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart )
{
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co;
co_swap( lpCurrRoutine, co );
}
掛起協程 Yield coroutine
在非對稱協程理論,yield 與 resume 是個相對的操作。A 協程 resume 啟動了 B 協程,那么只有當 B 協程執行 yield 操作時才會返回到 A 協程。在上一節剖析協程啟動函數 co_resume() 時,也提到了該函數內部 co_swap() 會執行被調協程的代碼。只有被調協程 yield 讓出 CPU,調用者協程的 co_swap() 函數才能返回到原點,即返回到原來 co_resume() 內的位置。
在被調協程要讓出 CPU 時,會將它的 stCoRoutine_t 從 pCallStack 彈出,“棧指針” iCallStackSize 減 1,然后 co_swap() 切換 CPU 上下文到原來被掛起的調用者協程恢復執行。這里“被掛起的調用者協程”,即是調用者 co_resume() 中切換 CPU 上下文被掛起的那個協程。
void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
env->iCallStackSize--;
co_swap( curr, last);
}
void co_yield_ct()
{
co_yield_env( co_get_curr_thread_env() );
}
void co_yield( stCoRoutine_t *co )
{
co_yield_env( co->env );
}
- 同一個線程上所有協程是共享一個 stCoRoutineEnv_t 結構的,因此任意協程的 co->env 指向的結構都相同。
切換協程 Switch coroutine
上面的啟動協程和掛起協程都設計協程的切換,本質是上下文的切換,發生在 co_swap()中。
- 如果是獨享棧模式:將當前協程的上下文存好,讀取下一協程的上下文。
- 如果是共享棧模式:libco 對共享棧做了個優化,可以申請多個共享棧循環使用,當目標協程所記錄的共享棧沒有被其它協程占用的時候,整個切換過程和獨享棧模式一致。否則就是:將協程的棧空間內容從共享棧拷貝到自己的 save_buffer 中,將下一協程的 save_buffer 中的棧內容拷貝到共享棧中,將當前協程的上下文存好,讀取下一協程上下文。
協程的本質是,使用 ContextSwap,來代替匯編中函數 call 調用,在保存寄存器上下文后,把需要執行的協程入口 push 到棧上。
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
stCoRoutineEnv_t* env = co_get_curr_thread_env();
//get curr stack sp
char c;
curr->stack_sp= &c;
if (!pending_co->cIsShareStack)
{
env->pending_co = NULL;
env->occupy_co = NULL;
}
else
{
env->pending_co = pending_co;
//get last occupy co on the same stack mem
stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
//set pending co to occupy thest stack mem;
pending_co->stack_mem->occupy_co = pending_co;
env->occupy_co = occupy_co;
if (occupy_co && occupy_co != pending_co)
{
save_stack_buffer(occupy_co);
}
}
//swap context
coctx_swap(&(curr->ctx),&(pending_co->ctx) );
//stack buffer may be overwrite, so get again;
stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
stCoRoutine_t* update_occupy_co = curr_env->occupy_co;
stCoRoutine_t* update_pending_co = curr_env->pending_co;
if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
{
//resume stack buffer
if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
{
memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
}
}
}
這里起寄存器拷貝切換作用的 coctx_swap 函數,是用匯編來實現的。
coctx_swap 接受兩個參數,第一個是當前協程的 coctx_t 指針,第二個參數是待切入的協程的 coctx_t 指針。該函數調用前還處於第一個協程的環境,調用之后就變成另一個協程的環境了。
extern "C"
{
extern void coctx_swap( coctx_t *,coctx_t* ) asm("coctx_swap");
};
.globl coctx_swap
#if !defined( __APPLE__ )
.type coctx_swap, @function
#endif
coctx_swap:
#if defined(__i386__)
movl 4(%esp), %eax
movl %esp, 28(%eax)
movl %ebp, 24(%eax)
movl %esi, 20(%eax)
movl %edi, 16(%eax)
movl %edx, 12(%eax)
movl %ecx, 8(%eax)
movl %ebx, 4(%eax)
movl 8(%esp), %eax
movl 4(%eax), %ebx
movl 8(%eax), %ecx
movl 12(%eax), %edx
movl 16(%eax), %edi
movl 20(%eax), %esi
movl 24(%eax), %ebp
movl 28(%eax), %esp
ret
#elif defined(__x86_64__)
leaq (%rsp),%rax
movq %rax, 104(%rdi)
movq %rbx, 96(%rdi)
movq %rcx, 88(%rdi)
movq %rdx, 80(%rdi)
movq 0(%rax), %rax
movq %rax, 72(%rdi)
movq %rsi, 64(%rdi)
movq %rdi, 56(%rdi)
movq %rbp, 48(%rdi)
movq %r8, 40(%rdi)
movq %r9, 32(%rdi)
movq %r12, 24(%rdi)
movq %r13, 16(%rdi)
movq %r14, 8(%rdi)
movq %r15, (%rdi)
xorq %rax, %rax
movq 48(%rsi), %rbp
movq 104(%rsi), %rsp
movq (%rsi), %r15
movq 8(%rsi), %r14
movq 16(%rsi), %r13
movq 24(%rsi), %r12
movq 32(%rsi), %r9
movq 40(%rsi), %r8
movq 56(%rsi), %rdi
movq 80(%rsi), %rdx
movq 88(%rsi), %rcx
movq 96(%rsi), %rbx
leaq 8(%rsp), %rsp
pushq 72(%rsi)
movq 64(%rsi), %rsi
ret
#endif
退出協程
同協程掛起一樣,協程退出時也應將 CPU 控制權交給它的調用者,這也是調用 co_yield_env() 函數來完成的。
我們調用 co_create()、co_resume() 啟動協程執行一次性任務,當任務結束后要記得調用 co_free()或 co_release() 銷毀這個臨時性的協程,否則將引起內存泄漏。
void co_free( stCoRoutine_t *co )
{
if (!co->cIsShareStack)
{
free(co->stack_mem->stack_buffer);
free(co->stack_mem);
}
//walkerdu fix at 2018-01-20
//存在內存泄漏
else
{
if(co->save_buffer)
free(co->save_buffer);
if(co->stack_mem->occupy_co == co)
co->stack_mem->occupy_co = NULL;
}
free( co );
}
void co_release( stCoRoutine_t *co )
{
co_free( co );
}
補充
協程的調度
co_eventloop() 即“調度器”的核心所在。這里講的“調度器”,嚴格意義上算不上真正的調度器,只是為了表述的方便。libco 的協程機制是非對稱的,沒有什么調度算法。在執行 yield 時,當前協程只能將控制權交給調用者協程,沒有任何可調度的余地。Resume 靈活性稍強一點,不過也還算不得調度。如果非要說有什么“調度算法”的話,那就只能說是“基於 epoll/kqueue 事件驅動”的調度算法。“調度器”就是 epoll/kqueue 的事件循環。
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
if( !ctx->result )
{
ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
}
co_epoll_res *result = ctx->result;
for(;;)
{
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
memset( timeout,0,sizeof(stTimeoutItemLink_t) );
for(int i=0;i<ret;i++)
{
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
if( item->pfnPrepare )
{
item->pfnPrepare( item,result->events[i],active );
}
else
{
AddTail( active,item );
}
}
unsigned long long now = GetTickMS();
TakeAllTimeout( ctx->pTimeout,now,timeout );
stTimeoutItem_t *lp = timeout->head;
while( lp )
{
//printf("raise timeout %p\n",lp);
lp->bTimeout = true;
lp = lp->pNext;
}
Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
lp = active->head;
while( lp )
{
PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
if (lp->bTimeout && now < lp->ullExpireTime)
{
int ret = AddTimeout(ctx->pTimeout, lp, now);
if (!ret)
{
lp->bTimeout = false;
lp = active->head;
continue;
}
}
if( lp->pfnProcess )
{
lp->pfnProcess( lp );
}
lp = active->head;
}
if( pfn )
{
if( -1 == pfn( arg ) )
{
break;
}
}
}
}
在關鍵數據結構 stCoRoutineEnv_t 中,有一個變量 stCoEpoll_t 類型的指針,即與 epoll 事件循環相關。
- iEpollFd:epoll 實例的文件描述符
- _EPOLL_SIZE:一次 epoll_wait 最多返回的就緒事件個數
- pTimeout:時間輪定時器
- pstTimeoutList:存放超時事件
- pstActiveList:存放就緒事件/超時事件
- result:epoll_wait 得到的結果集
struct stCoEpoll_t
{
int iEpollFd;
static const int _EPOLL_SIZE = 1024 * 10;
struct stTimeout_t *pTimeout;
struct stTimeoutItemLink_t *pstTimeoutList;
struct stTimeoutItemLink_t *pstActiveList;
co_epoll_res *result;
};
一般而言,使用定時功能時,我們首先向定時器中注冊一個定時事件(Timer Event),在注冊定時事件時需要指定這個事件在未來的觸發時間。在到了觸發時間點后,我們會收到定時器的通知。
網絡框架里的定時器可以看做由兩部分組成:
- 第一部分是保存已注冊 timer events 的數據結構,第二部分則是定時通知機制。保存已注冊的 timer events ,一般選用紅黑樹,比如 nginx;另外一種常見的數據結構便是時間輪,libco 就使用了這種結構。
- 第二部分是高精度的定時(精確到微秒級)通知機制,一般使用 getitimer/setitimer 這類接口,用 epoll/kqueue 這樣的系統調用來完成定時通知。
何時掛起何時恢復
libco 中有 3 種調用 yield 的場景:
- 用戶程序中主動調用 co_yield_ct();
- 程序調用了 epoll() 或 co_cond_timedwait() 陷入“阻塞”等待;
- 程序調用了 connect(), read(), write(), recv(), send() 等系統調用陷入“阻塞”等待。
resume 啟動一個協程有 3 種情況:
- 對應用戶程序主動 yield 的情況,這種情況也有賴於用戶程序主動將協程 co_resume() 起來;
- epoll() 的目標文件描述符事件就緒或超時,co_cond_timedwait() 等到了其他協程的 co_cond_signal() 通知信號或等待超時;
- read(), write() 等 I/O 接口成功讀到或寫入數據,或者讀寫超時。