從 Protothreads 和 libco 看 C/C++ 實現的協程庫


同步Synchronous,異步(Asynchronous),協程(coroutine)

同步的好處是邏輯流就是代碼的控制流,易於編寫。但是如果碰到阻塞請求,就會卡住,因此CPU利用率不高。當然操作系統可以進行進程/線程調度,但是又有一些上下文切換的開銷。
異步的好處是當線程可以不用一直阻塞在IO請求上,返回的邏輯可以寫在回調里。但是這樣有兩個問題,一個是邏輯流不等於控制流,coder需要去適應異步的思想;二是回調時也需要保證維持一些當初請求的狀態,這個時候比較繁瑣,並且容易出錯。
協程的主要目的是,要讓所有CPU都能跑滿,這樣可以最大化利用計算資源。並且協程編寫跟同步很相似,業務邏輯不會改變。協程相比於傳統的進程/線程,有一個優勢是調度在用戶態,因此開銷小(不用用戶態->內核態切換),同時協程的並發量主要受制於內存大小。

Protothreads

Protothreads 是一個短小精悍的協程庫,它對於每個協程都只需要2字節來保存狀態。主頁:http://dunkels.com/adam/pt/index.html (PS: 這個作者非常有名)

協程最重要需要解決的問題:

  1. 如何保存當前處理的狀態(寄存器,局部變量等。。)
  2. 如何保證下次可以在當前位置繼續執行。

Protothreads 解決了第二個問題,它對待第一個問題的方式是不允許用戶在函數里定義局部變量 : ),換句話說你定義的局部變量這些都沒用,所以是無狀態的。當然你可以用全局變量去維護一些狀態。
解決第二個問題的方式是使用 switch-case 語句跳轉,類似於 goto。那么怎么區分跳轉位置呢,采用 LABEL_行號 的方式解決(行號就是上面說的2字節),這樣你第一次進入函數的時候是正常執行的,但是當你后面進入函數時,通過 switch 可以直接跳轉到指定 label 繼續執行。
pt 的幾個宏定義:

// lc-switch.h
typedef unsigned short lc_t;

#define LC_INIT(s) s = 0;

#define LC_RESUME(s) switch(s) { case 0:

#define LC_SET(s) s = __LINE__; case __LINE__:

#define LC_END(s) }

/************************************daghlny 專有分割線***************************************/
// pt.h
#define PT_INIT(pt)   LC_INIT((pt)->lc)\

#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc)

#define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \
                   PT_INIT(pt); return PT_ENDED; }

#define PT_WAIT_UNTIL(pt, condition)	        \
  do {						\
    LC_SET((pt)->lc);				\
    if(!(condition)) {				\
      return PT_WAITING;			\
    }						\
  } while(0)

#define PT_YIELD(pt)				\
  do {						\
    PT_YIELD_FLAG = 0;				\
    LC_SET((pt)->lc);				\
    if(PT_YIELD_FLAG == 0) {			\
      return PT_YIELDED;			\
    }						\
  } while(0)

上面這個是一種 LABEL 策略 (定義在 lc-switch.h),另一種是使用 C語言的 &&LABEL 語法糖(定義在lc-addrlabels.h),然后在 lc.h 文件中進行選擇使用。如果需要看到 pt 的具體執行,可以用 gcc -E 來打印出來宏展開之后的代碼。

libco

libco 是微信給出的一個協程庫,代碼量也不大,並且是經過微信的業務考驗的,當然文檔很差(基本沒有注釋),並且更新很慢(commit已經很久遠了),但是依然是一個比較健全的協程庫。主頁:https://github.com/Tencent/libco
libco 采用的是 hook 所有常用的 Unix 系統調用,包括 read, write, send, recv 等。然后在每次進行這些調用時,會先觀察是否啟用了 hook,然后觀察是否 fd 設置了 O_NONBLOCK,然后再觀察是否設置了 timeout == 0,最后才會采用自己定義的 poll 函數來注冊 fd,然后將控制權移交到其他協程中去。
下面是一個 hook read 的代碼:

ssize_t read( int fd, void *buf, size_t nbyte )
{
	HOOK_SYS_FUNC( read );
	
	if( !co_is_enable_sys_hook() )
	{
		return g_sys_read_func( fd,buf,nbyte );
	}
	rpchook_t *lp = get_by_fd( fd );

	if( !lp || ( O_NONBLOCK & lp->user_flag ) ) 
	{
		ssize_t ret = g_sys_read_func( fd,buf,nbyte );
		return ret;
	}
	int timeout = ( lp->read_timeout.tv_sec * 1000 ) 
				+ ( lp->read_timeout.tv_usec / 1000 );

	struct pollfd pf = { 0 };
	pf.fd = fd;
	pf.events = ( POLLIN | POLLERR | POLLHUP );

	int pollret = poll( &pf,1,timeout );

	ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte );

	if( readret < 0 )
	{
		co_log_err("CO_ERR: read fd %d ret %ld errno %d poll ret %d timeout %d",
					fd,readret,errno,pollret,timeout);
	}

	return readret;
	
}

但是,libco 如何保存當前線程的狀態呢,具體的邏輯在 co_routine.cpp/co_swap() 函數里,函數定義如下:

void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
 	stCoRoutineEnv_t* env = co_get_curr_thread_env();

	//get curr stack sp
	char c;
	curr->stack_sp= &c;

	if (!pending_co->cIsShareStack)
	{
		env->pending_co = NULL;
		env->occupy_co = NULL;
	}
	else 
	{
		env->pending_co = pending_co;
		//get last occupy co on the same stack mem
		stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
		//set pending co to occupy thest stack mem;
		pending_co->stack_mem->occupy_co = pending_co;

		env->occupy_co = occupy_co;
		if (occupy_co && occupy_co != pending_co)
		{
			save_stack_buffer(occupy_co);
		}
	}

	//swap context
	coctx_swap(&(curr->ctx),&(pending_co->ctx) );

	//stack buffer may be overwrite, so get again;
	stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
	stCoRoutine_t* update_occupy_co =  curr_env->occupy_co;
	stCoRoutine_t* update_pending_co = curr_env->pending_co;
	
	if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
	{
		//resume stack buffer
		if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
		{
			memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
		}
	}
}

假設現在要從協程 A(就是參數 curr) 切換到協程 B(就是參數 pending_co),其中 save_stack_buffer() 函數是用來保存協程A棧的存儲內容的,而 coctx_swap() 是用來將 A 的寄存器內容保存到到自己的 stCoRoutine_t 結構中,然后將 B 的寄存器內容恢復回來。首先,stCoRoutine_tctx 字段是 coctx_t 結構,定義如下:

struct coctx_t
{
#if defined(__i386__)
	void *regs[ 8 ];
#else
	void *regs[ 14 ];
#endif
	size_t ss_size;
	char *ss_sp;
	
};

coctx_swap() 這個函數是用匯編寫的,代碼是:

#elif defined(__x86_64__)
	leaq 8(%rsp),%rax
	leaq 112(%rdi),%rsp
	pushq %rax
	pushq %rbx
	pushq %rcx
	pushq %rdx

	pushq -8(%rax) //ret func addr

	pushq %rsi
	pushq %rdi
	pushq %rbp
	pushq %r8
	pushq %r9
	pushq %r12
	pushq %r13
	pushq %r14
	pushq %r15
	
	movq %rsi, %rsp
	popq %r15
	popq %r14
	popq %r13
	popq %r12
	popq %r9
	popq %r8
	popq %rbp
	popq %rdi
	popq %rsi
	popq %rax //ret func addr
	popq %rdx
	popq %rcx
	popq %rbx
	popq %rsp
	pushq %rax
	
	xorl %eax, %eax
	ret
#endif

方便起見只我只截取了 x86_64 的。函數定義中,第一句leaq 8(%rsp),%rax ,實際上是將除了返回之外的協程A棧頂放到 %ras 寄存器中,leaq 112(%rdi),%rsp 是將 %rsp 指向上面 coctx_t 結構的 regs 數組的末尾(也就是地址最大的一端)。然后通過后續的 pushq,就可以把其他的寄存器放到 regs 中進行保存。 然后將 %rsp 指向 %rsi (根據之前的某篇博文,這個參數在64位下應該是第二個傳入參數),然后使用一系列 popq 指令再將協程B之前保存的寄存器狀態依次從 regs 放入每個寄存器中。
然后用 pushq %rax 來將返回地址重新壓棧,最后使用 ret 來保證 %rip 寄存器指向指定的指令位置,然后就恢復了 協程B 的上下文狀態,這樣就可以繼續執行了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM