關於協程coroutine前面的文章已經介紹過了,本文總結對qemu中coroutine機制的分析,qemu 協程coroutine基於:setcontext函數族以及函數間跳轉函數siglongjmp和sigsetjmp實現。使用setcontext函數族來實現用戶態進程棧的切換,使用函數間跳轉siglongjmp和sigsetjmp實現協程coroutine不退出以及多次進入,即使coroutine執行的任務已經完成,這實現了協程池的功能,避免大量協程創建和銷毀帶來的系統開銷。
qemu coroutine主要提供了5個接口,用於協程創建、協程進入、協程讓出,下面首次介紹qemu 實現協程使用的主要數據結構,然后將依次介紹qemu coroutine 這5個接口的實現。
1.qemu協程實現使用的主要數據結構 coroutine和CoroutineUContext:
/* 協程coroutine */ struct Coroutine { CoroutineEntry *entry; /* 協程入口函數 */ void *entry_arg; /* 協程入口函數的參數 */ Coroutine *caller; QSLIST_ENTRY(Coroutine) pool_next; /* 協程池掛鏈 */ /* Coroutines that should be woken up when we yield or terminate */ QTAILQ_HEAD(, Coroutine) co_queue_wakeup; QTAILQ_ENTRY(Coroutine) co_queue_next; /* co_queue_wakeup掛鏈 */ }; typedef struct { Coroutine base; /* 協程coroutine */ void *stack; /* 當前上下文的進程棧 */ sigjmp_buf env; #ifdef CONFIG_VALGRIND_H unsigned int valgrind_stack_id; #endif } CoroutineUContext; /* coroutine上下文 */
coroutine數據結構主要封裝協程,coroutineUContext封裝協程上下文,是對coroutine的進一步包裝。
2. qemu協程創建函數 qemu_coroutine_create,其實現如下:
1 Coroutine *qemu_coroutine_create(CoroutineEntry *entry) 2 { 3 Coroutine *co = NULL; 4 5 if (CONFIG_COROUTINE_POOL) { /* 判斷是否使用了coroutine池 */ 6 qemu_mutex_lock(&pool_lock); 7 co = QSLIST_FIRST(&pool); /* 從池子里取出第一個協程 */ 8 if (co) { 9 QSLIST_REMOVE_HEAD(&pool, pool_next); 10 pool_size--; 11 } 12 qemu_mutex_unlock(&pool_lock); 13 } 14 15 if (!co) { /* co為NULL,表示沒有使用coroutine池或者池子已空 */ 16 co = qemu_coroutine_new(); /* 創建一個新的coroutine,這里只是一個空的協程 */ 17 } 18 19 co->entry = entry; /* 設置協程的入口函數 */ 20 QTAILQ_INIT(&co->co_queue_wakeup); /* 初始化協程線性隊列 */ 21 return co; 22 }
qemu_coroutine_create首先嘗試從coroutine池中取出一個coroutine,如果沒有獲取到,則通過qemu_coroutine_new函數創建一個新的coroutine,qemu_coroutine_new的實現如下:
1 Coroutine *qemu_coroutine_new(void) 2 { 3 const size_t stack_size = 1 << 20; /* ucontext_t使用的棧大小 */ 4 CoroutineUContext *co; /* 協程上下文 */ 5 ucontext_t old_uc, uc; /* 進程執行上下文 */ 6 sigjmp_buf old_env; /* 函數間跳轉-環境 */ 7 union cc_arg arg = {0}; 8 9 /* The ucontext functions preserve signal masks which incurs a 10 * system call overhead. sigsetjmp(buf, 0)/siglongjmp() does not 11 * preserve signal masks but only works on the current stack. 12 * Since we need a way to create and switch to a new stack, use 13 * the ucontext functions for that but sigsetjmp()/siglongjmp() for 14 * everything else. 15 */ 16 17 if (getcontext(&uc) == -1) { 18 abort(); 19 } 20 /* 協程上下文CoroutineUContext初始化 */ 21 co = g_malloc0(sizeof(*co)); 22 co->stack = g_malloc(stack_size); 23 co->base.entry_arg = &old_env; /* stash away our jmp_buf */ 24 25 /* 進程執行上下文ucontext_t初始化 */ 26 uc.uc_link = &old_uc; 27 uc.uc_stack.ss_sp = co->stack; 28 uc.uc_stack.ss_size = stack_size; 29 uc.uc_stack.ss_flags = 0; 30 31 #ifdef CONFIG_VALGRIND_H 32 co->valgrind_stack_id = 33 VALGRIND_STACK_REGISTER(co->stack, co->stack + stack_size); 34 #endif 35 /* co的傳遞為什么要以arg的方式?????? */ 36 arg.p = co; 37 /* 創建一個進程執行上下文uc,進程執行上下文的入口函數為coroutine_trampoline */ 38 makecontext(&uc, (void (*)(void))coroutine_trampoline, 39 2, arg.i[0], arg.i[1]); 40 41 /* swapcontext() in, siglongjmp() back out */ 42 if (!sigsetjmp(old_env, 0)) { /* 保存當前堆棧環境,sigsetjmp為一次調用多次返回的函數 */ 43 swapcontext(&old_uc, &uc);/* 進入uc進程執行上下文,並保存當前上下文到old_uc */ 44 } 45 return &co->base; /* 返回coroutine */ 46 }
qemu_coroutine_new的主要動作:
- 3-7行定義堆棧大小、進程上下文、協程上下文、函數間跳轉變量等。
- 20-23行初始化coroutine上下文。
- 25-29行初始化進程上下文。
- 38行創建一個新的進程上下文uc。
- 42-43行首先通過sigsetjmp保存當前棧環境,sigsetjmp是一種一次調用可以多次返回的函數,第一次返回值為0,之后的返回值取決於導致其返回的siglongjmp的參數,因此第一sigsetjmp返回時將執行43行,進入uc進程執行上下文,38行將uc的入口函數設置為coroutine_trampoline,因此43行將進入coroutine_trampoline函數的執行。
- 45行返回協程上下文中的coroutine。
上面的注釋提到了一個疑問:38行將協程上下文co作為參數傳遞給了新創建的協程uc,但是co的傳遞為什么要轉換成arg,並以兩個int變量的形式傳遞?cc_arg聯合體的定義給出了說明:
/* * va_args to makecontext() must be type 'int', so passing * the pointer we need may require several int args. This * union is a quick hack to let us do that */ union cc_arg { void *p; int i[2]; };
主要原因是makecontext的va_args參數只接受int類型,因此作為指針傳遞的協程上下文co等價於兩個int類型的變量,64位系統上int類型占用4個字節,指針類型占用8個字節。
上面qemu_coroutine_new函數43行的執行將導致進入coroutine_trampoline函數,下面分析coroutine_trampoline函數的實現:
1 /* 2 * qemu coroutine入口函數, 3 * 函數參數i0為協程上下文指針的低8位, 4 * i1為協程上下文指針的高八位。 5 */ 6 static void coroutine_trampoline(int i0, int i1) 7 { 8 union cc_arg arg; 9 CoroutineUContext *self; 10 Coroutine *co; 11 12 arg.i[0] = i0; 13 arg.i[1] = i1; 14 self = arg.p;/* 獲取協程上下文對象指針 */ 15 co = &self->base;/* 獲取協程上下文的協程對象指針 */ 16 17 /* Initialize longjmp environment and switch back the caller */ 18 if (!sigsetjmp(self->env, 0)) { /* 保存當前堆棧信息,為了再一次進入該協程上下文 */ 19 /* 函數間跳轉,跳轉到qemu_coroutine_new函數的42行 */ 20 siglongjmp(*(sigjmp_buf *)co->entry_arg, 1); 21 } 22 23 while (true) { 24 /* 執行協程的入口函數 */ 25 co->entry(co->entry_arg); 26 /* 協程入口函數退出,協程退出到調用者 */ 27 qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); 28 } 29 }
coroutine_trampoline的主要動作:
- 取得協程上下文對象self,並通過協程上下文對象獲取相應的協程對象co,12-15行。
- 通過sigsetjmp保存當前堆棧到協程上下文的env中成員變量中,env作為協程再一次進入的點,18行。
- 第一次執行sigsetjmp時,sigsetjmp返回0,因此通過siglongjmp,跳出到qemu_coroutine_new的42行,進而導致qemu_coroutine_new返回,然后qemu_coroutine_create的返回,19行。
- 當再一次通過siglongjmp進入coroutine_trampoline函數,也即18行時,將進入while循環,在循環中調用協程入口函數開始執行,執行完成后通過qemu_coroutine_switch進行協程上下文切換,切換到協程調用的上下文中,23-28行。
注意這里的co->caller將在進入該協程時被賦值,上面即是qemu中創建一個協程對象的過程,從上面的分析可以看出qemu中每一協程coroutine對象對應一個協程上下文對象,通過makecontext創建一個新的進程執行上下文,可以看做協程的主體,協程上下文對象的env成員保存了進入執行上下文的點,通過siglongjmp跳出該執行上下文,qemu協程的創建也即創建了一個新的進程執行上下文,並且保存了再次進入該執行上下文的堆棧信息,下面將分析協程進入函數qemu_coroutine_enter。
3. qemu協程進入函數 qemu_coroutine_enter,其實現如下:
1 /* 功能:切換到co執行上下文,也即開始執行co的入口函數,opaque為入口函數的參數 */ 2 void qemu_coroutine_enter(Coroutine *co, void *opaque) 3 { 4 Coroutine *self = qemu_coroutine_self(); /* 獲取當前的進程執行上下文-當前協程 */ 5 6 trace_qemu_coroutine_enter(self, co, opaque); 7 8 if (co->caller) { /* qemu 協程不允許遞歸,也即協程內創建協程 */ 9 fprintf(stderr, "Co-routine re-entered recursively\n"); 10 abort(); 11 } 12 /* 調用co協程的協程,也即進入co上下文之前的進程上下文 */ 13 co->caller = self; 14 /* co協程入口函數的參數 */ 15 co->entry_arg = opaque; 16 /* 將進程上下文從self切換到co */ 17 coroutine_swap(self, co); 18 }
qemu_coroutine_enter函數的實現主要為:獲取當前進程執行上下文並保存到co->caller中,然后設置co入口函數的參數,之后做上下文切換coroutine_swap()。coroutine_swap的實現如下:
1 /* 協程切換:從from切換到to */ 2 static void coroutine_swap(Coroutine *from, Coroutine *to) 3 { 4 CoroutineAction ret; 5 /* 協程切換,切換到to */ 6 ret = qemu_coroutine_switch(from, to, COROUTINE_YIELD); 7 /* to協程讓出,依次喚醒co->co_queue_wakeup列表中排隊的協程 */ 8 qemu_co_queue_run_restart(to); 9 /* 根據返回值,決定是否刪除協程co還是僅僅退出 */ 10 switch (ret) { 11 case COROUTINE_YIELD: 12 return; 13 case COROUTINE_TERMINATE: 14 trace_qemu_coroutine_terminate(to); 15 coroutine_delete(to); 16 return; 17 default: 18 abort(); 19 } 20 }
coroutine_swap的實現主要:首先切換到to協程上下文執行,當to協程讓出后依次喚醒排隊的協程,之后根據to協程退出的返回值來決定是否刪除to,下面是qemu_coroutine_switch函數的實現:
1 CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, 2 CoroutineAction action) 3 { 4 CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); 5 CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); 6 CoroutineThreadState *s = coroutine_get_thread_state(); 7 int ret; 8 9 s->current = to_; /* s在這里起什么作用呢? */ 10 11 ret = sigsetjmp(from->env, 0); /* 保存當前堆棧到from->env,用於協程的讓出 */ 12 if (ret == 0) { 13 siglongjmp(to->env, action);/* 跳轉到coroutine_trampoline中第18行 */ 14 } 15 return ret; 16 }
qemu_coroutine_switch值得注意的兩個地方:
- 首先11行保存了當前堆棧到from->env, to協程的讓出時的返回點,前面的coroutine_trampoline函數25行-當前協程執行完成時,執行27行將導致執行上下文切換到此處。
- 其次是13行執行函數間跳轉,在創建協程時在coroutine_trampoline函數的18行我們保存了堆棧信息到所創建協程的env成員中,因此13行的跳轉導致直接切換到coroutine_trampoline的18行執行,在coroutine_trampoline中執行co->entry開始執行協程的入口函數,也即開始了協程上下文的執行。
有兩種方式可以退出當前協程:協程入口函數返回、協程上下文主動執行qemu_coroutine_yield函數,前面已經說明了在coroutine_trampoline函數中協程入口函數返回時,將通過siglongjmp的方式來退出當前協程的執行上下文,下面介紹qemu_coroutine_yield的實現。
4. qemu協程讓出函數 qemu_coroutine_yield,其實現如下