同步Synchronous,異步(Asynchronous),協程(coroutine)
同步的好處是邏輯流就是代碼的控制流,易於編寫。但是如果碰到阻塞請求,就會卡住,因此CPU利用率不高。當然操作系統可以進行進程/線程調度,但是又有一些上下文切換的開銷。
異步的好處是當線程可以不用一直阻塞在IO請求上,返回的邏輯可以寫在回調里。但是這樣有兩個問題,一個是邏輯流不等於控制流,coder需要去適應異步的思想;二是回調時也需要保證維持一些當初請求的狀態,這個時候比較繁瑣,並且容易出錯。
協程的主要目的是,要讓所有CPU都能跑滿,這樣可以最大化利用計算資源。並且協程編寫跟同步很相似,業務邏輯不會改變。協程相比於傳統的進程/線程,有一個優勢是調度在用戶態,因此開銷小(不用用戶態->內核態切換),同時協程的並發量主要受制於內存大小。
Protothreads
Protothreads 是一個短小精悍的協程庫,它對於每個協程都只需要2字節來保存狀態。主頁:http://dunkels.com/adam/pt/index.html (PS: 這個作者非常有名)
協程最重要需要解決的問題:
- 如何保存當前處理的狀態(寄存器,局部變量等。。)
- 如何保證下次可以在當前位置繼續執行。
Protothreads 解決了第二個問題,它對待第一個問題的方式是不允許用戶在函數里定義局部變量 : ),換句話說你定義的局部變量這些都沒用,所以是無狀態的。當然你可以用全局變量去維護一些狀態。
解決第二個問題的方式是使用 switch-case 語句跳轉,類似於 goto。那么怎么區分跳轉位置呢,采用 LABEL_行號
的方式解決(行號就是上面說的2字節),這樣你第一次進入函數的時候是正常執行的,但是當你后面進入函數時,通過 switch 可以直接跳轉到指定 label 繼續執行。
pt 的幾個宏定義:
// lc-switch.h
typedef unsigned short lc_t;
#define LC_INIT(s) s = 0;
#define LC_RESUME(s) switch(s) { case 0:
#define LC_SET(s) s = __LINE__; case __LINE__:
#define LC_END(s) }
/************************************daghlny 專有分割線***************************************/
// pt.h
#define PT_INIT(pt) LC_INIT((pt)->lc)\
#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc)
#define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \
PT_INIT(pt); return PT_ENDED; }
#define PT_WAIT_UNTIL(pt, condition) \
do { \
LC_SET((pt)->lc); \
if(!(condition)) { \
return PT_WAITING; \
} \
} while(0)
#define PT_YIELD(pt) \
do { \
PT_YIELD_FLAG = 0; \
LC_SET((pt)->lc); \
if(PT_YIELD_FLAG == 0) { \
return PT_YIELDED; \
} \
} while(0)
上面這個是一種 LABEL 策略 (定義在 lc-switch.h),另一種是使用 C語言的 &&LABEL 語法糖(定義在lc-addrlabels.h),然后在 lc.h 文件中進行選擇使用。如果需要看到 pt 的具體執行,可以用 gcc -E
來打印出來宏展開之后的代碼。
libco
libco 是微信給出的一個協程庫,代碼量也不大,並且是經過微信的業務考驗的,當然文檔很差(基本沒有注釋),並且更新很慢(commit已經很久遠了),但是依然是一個比較健全的協程庫。主頁:https://github.com/Tencent/libco
libco 采用的是 hook 所有常用的 Unix 系統調用,包括 read, write, send, recv
等。然后在每次進行這些調用時,會先觀察是否啟用了 hook,然后觀察是否 fd 設置了 O_NONBLOCK
,然后再觀察是否設置了 timeout == 0
,最后才會采用自己定義的 poll 函數來注冊 fd,然后將控制權移交到其他協程中去。
下面是一個 hook read 的代碼:
ssize_t read( int fd, void *buf, size_t nbyte )
{
HOOK_SYS_FUNC( read );
if( !co_is_enable_sys_hook() )
{
return g_sys_read_func( fd,buf,nbyte );
}
rpchook_t *lp = get_by_fd( fd );
if( !lp || ( O_NONBLOCK & lp->user_flag ) )
{
ssize_t ret = g_sys_read_func( fd,buf,nbyte );
return ret;
}
int timeout = ( lp->read_timeout.tv_sec * 1000 )
+ ( lp->read_timeout.tv_usec / 1000 );
struct pollfd pf = { 0 };
pf.fd = fd;
pf.events = ( POLLIN | POLLERR | POLLHUP );
int pollret = poll( &pf,1,timeout );
ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte );
if( readret < 0 )
{
co_log_err("CO_ERR: read fd %d ret %ld errno %d poll ret %d timeout %d",
fd,readret,errno,pollret,timeout);
}
return readret;
}
但是,libco 如何保存當前線程的狀態呢,具體的邏輯在 co_routine.cpp/co_swap()
函數里,函數定義如下:
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
stCoRoutineEnv_t* env = co_get_curr_thread_env();
//get curr stack sp
char c;
curr->stack_sp= &c;
if (!pending_co->cIsShareStack)
{
env->pending_co = NULL;
env->occupy_co = NULL;
}
else
{
env->pending_co = pending_co;
//get last occupy co on the same stack mem
stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
//set pending co to occupy thest stack mem;
pending_co->stack_mem->occupy_co = pending_co;
env->occupy_co = occupy_co;
if (occupy_co && occupy_co != pending_co)
{
save_stack_buffer(occupy_co);
}
}
//swap context
coctx_swap(&(curr->ctx),&(pending_co->ctx) );
//stack buffer may be overwrite, so get again;
stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
stCoRoutine_t* update_occupy_co = curr_env->occupy_co;
stCoRoutine_t* update_pending_co = curr_env->pending_co;
if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
{
//resume stack buffer
if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
{
memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
}
}
}
假設現在要從協程 A(就是參數 curr) 切換到協程 B(就是參數 pending_co),其中 save_stack_buffer()
函數是用來保存協程A棧的存儲內容的,而 coctx_swap()
是用來將 A 的寄存器內容保存到到自己的 stCoRoutine_t
結構中,然后將 B 的寄存器內容恢復回來。首先,stCoRoutine_t
的 ctx
字段是 coctx_t
結構,定義如下:
struct coctx_t
{
#if defined(__i386__)
void *regs[ 8 ];
#else
void *regs[ 14 ];
#endif
size_t ss_size;
char *ss_sp;
};
coctx_swap()
這個函數是用匯編寫的,代碼是:
#elif defined(__x86_64__)
leaq 8(%rsp),%rax
leaq 112(%rdi),%rsp
pushq %rax
pushq %rbx
pushq %rcx
pushq %rdx
pushq -8(%rax) //ret func addr
pushq %rsi
pushq %rdi
pushq %rbp
pushq %r8
pushq %r9
pushq %r12
pushq %r13
pushq %r14
pushq %r15
movq %rsi, %rsp
popq %r15
popq %r14
popq %r13
popq %r12
popq %r9
popq %r8
popq %rbp
popq %rdi
popq %rsi
popq %rax //ret func addr
popq %rdx
popq %rcx
popq %rbx
popq %rsp
pushq %rax
xorl %eax, %eax
ret
#endif
方便起見只我只截取了 x86_64 的。函數定義中,第一句leaq 8(%rsp),%rax
,實際上是將除了返回之外的協程A棧頂放到 %ras 寄存器中,leaq 112(%rdi),%rsp
是將 %rsp 指向上面 coctx_t
結構的 regs
數組的末尾(也就是地址最大的一端)。然后通過后續的 pushq,就可以把其他的寄存器放到 regs 中進行保存。 然后將 %rsp 指向 %rsi (根據之前的某篇博文,這個參數在64位下應該是第二個傳入參數),然后使用一系列 popq 指令再將協程B之前保存的寄存器狀態依次從 regs 放入每個寄存器中。
然后用 pushq %rax
來將返回地址重新壓棧,最后使用 ret
來保證 %rip 寄存器指向指定的指令位置,然后就恢復了 協程B 的上下文狀態,這樣就可以繼續執行了。