線程是內核對外提供的服務,應用程序可以通過系統調用讓內核啟動線程,由內核來負責線程調度和切換。線程在等待IO操作時線程變為unrunnable狀態會觸發上下文切換。現代操作系統一般都采用搶占式調度,上下文切換一般發生在時鍾中斷和系統調用返回前,調度器計算當前線程的時間片,如果需要切換就從運行隊列中選出一個目標線程,保存當前線程的環境,並且恢復目標線程的運行環境,最典型的就是切換ESP指向目標線程內核堆棧,將EIP指向目標線程上次被調度出時的指令地址。
協程也叫用戶態線程,協程之間的切換發生在用戶態。在用戶態沒有時鍾中斷,系統調用等機制,那么協程切換由什么觸發?調度器將控制權交給某個協程后,控制權什么時候回到調度器,從而調度另外一個協程運行? 實際上,這需要協程主動放棄CPU,控制權回到調度器,從而調度另外一個協程運行。所謂協作式線程(cooperative),需要協程之間互相協作,不需要使用CPU時將CPU主動讓出。
協程切換和內核線程的上下文切換相同,也需要有機制來保存當前上下文,恢復目標上下文。在POSIX系統上,getcontext/makecontext/swapcontext等可以用來做這件事。
協程帶來的最大的好處就是可以用同步的方式來寫異步的程序。比如協程A,B:A是工作協程,B是網絡IO協程(這種模型下,實際工作協程會比網絡IO協程多),A發送一個包時只需要將包push到A和B之間的一個channel,然后就可以主動放棄CPU,讓出CPU給其它協程運行,B從channel中pop出將要發送的包,接收到包響應后,將結果放到A能拿到的地方,然后將A標識為ready狀態,放入可運行隊列等待調度,A下次被調度器調度就可以拿到結果繼續做后面的事情。如果是基於線程的模型,A和B都是線程,通常基於回調的方式,1. A阻塞在某個隊列Q上,B接受到響應包回調A傳給B的回調函數f,回調函數f將響應包push到Q中,A可以取到響應包繼續干活,如果阻塞基於cond等機制,則會被OS調度出去,如果忙等,則耗費CPU。2. A可以不阻塞在Q上,而是繼續做別的事情,可以定期過來取結果。 這種情況下,線程模型業務邏輯其實被打亂了,發包和取包響應的過程被隔離開了。
實現協程庫的基本思路很簡單,每個線程一個調度器,就是一個循環,不斷的從可運行隊列中取出協程,並且利用swapcontext恢復協程的上下文從而繼續執行協程。當一個協程放棄CPU時,通過swapcontext恢復調度器上下文從而將控制權歸還給調度器,調度器從可運行隊列選擇下一個協程。每個協程初始化通過getcontext和makecontext,需要的棧空間從堆上分配即可。
以下分析一個簡單的協程庫libtask,由golang team成員之一的Russ cox在加入golang team之前開發。只支持單線程,簡單包裝了一下read/write等同步IO。
在libtask中,一個協程用一個struct Task來表示:
struct Task { char name[256]; // offset known to acid char state[256]; Task *next; //通過這兩個指針將task串起來 Task *prev; Task *allnext; Task *allprev; Context context;// 當前協程上下文 uvlong alarmtime; uint id; uchar *stk; // 當前協程可以使用的堆棧,初始化為棧頂地址 uint stksize;// 當前協程可以使用的堆棧大小 int exiting; int alltaskslot; int system; int ready; void (*startfn)(void*);//當前協程的執行入口函數 void *startarg;//參數 void *udata; };
下面看看新增一個協程的過程:
static Task* taskalloc(void (*fn)(void*), void *arg, uint stack) { Task *t; sigset_t zero; uint x, y; ulong z; /* allocate the task and stack together */ t = malloc(sizeof *t+stack); //在堆上為這個協程分配結構體和協程所使用的堆棧 if(t == nil){ fprint(2, "taskalloc malloc: %r\n"); abort(); } memset(t, 0, sizeof *t); t->stk = (uchar*)(t+1); t->stksize = stack; t->id = ++taskidgen; t->startfn = fn; // 協程入口函數 t->startarg = arg; // 協程入口函數參數 /* do a reasonable initialization */ memset(&t->context.uc, 0, sizeof t->context.uc); sigemptyset(&zero); sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask); /* must initialize with current context */ if(getcontext(&t->context.uc) < 0){ // 初始化當前協程上下文 fprint(2, "getcontext: %r\n"); abort(); } /* call makecontext to do the real work. */ /* leave a few words open on both ends */ t->context.uc.uc_stack.ss_sp = t->stk+8; //ss_sp成員為棧頂地址,后續makecontext會將ss_sp往高地址移動ss_size個字節,從這里開始壓棧 t->context.uc.uc_stack.ss_size = t->stksize-64; //ss_size成員為棧大小 #if defined(__sun__) && !defined(__MAKECONTEXT_V2_SOURCE) /* sigh */ #warning "doing sun thing" /* can avoid this with __MAKECONTEXT_V2_SOURCE but only on SunOS 5.9 */ t->context.uc.uc_stack.ss_sp = (char*)t->context.uc.uc_stack.ss_sp +t->context.uc.uc_stack.ss_size; #endif /* * All this magic is because you have to pass makecontext a * function that takes some number of word-sized variables, * and on 64-bit machines pointers are bigger than words. */ //print("make %p\n", t); z = (ulong)t; y = z; z >>= 16; /* hide undefined 32-bit shift from 32-bit compilers */ x = z>>16; makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x); // 協程入口函數為taskstart,y,x兩個參數會被壓到t->context.uc.uc_stack棧底 return t; }
然后調用taskready將這個協程放入可運行隊列中:
void taskready(Task *t) { t->ready = 1; // addtask(&taskrunqueue, t); //將協程放入到可運行隊列中,后續調度器就可以從taskrunqueue中拿到它了。taskrunqueue就是一個全局變量,libtask只支持單線程從這里也可以看出來 }
現在可以看看調度器:
static void taskscheduler(void) { int i; Task *t; taskdebug("scheduler enter"); for(;;){ //無限循環 if(taskcount == 0) exit(taskexitval); t = taskrunqueue.head; //從可運行隊列頭部取出下一個運行的協程 if(t == nil){ fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount); exit(1); } deltask(&taskrunqueue, t); //從可運行隊列中將它刪除 t->ready = 0; taskrunning = t; //將t設置為當前正在運行的協程,taskrunning是一個全局變量 tasknswitch++; //統計值,協程一共執行了多少次 taskdebug("run %d (%s)", t->id, t->name); contextswitch(&taskschedcontext, &t->context); // 通過swapcontext切換到目標協程,並且將調度器上下文保存在全局變量taskschedcontext中 //print("back in scheduler\n"); taskrunning = nil; if(t->exiting){ if(!t->system) taskcount--; i = t->alltaskslot; alltask[i] = alltask[--nalltask]; alltask[i]->alltaskslot = i; free(t); } } }
協程主動放棄CPU調用taskyield:
int taskyield(void) { int n; n = tasknswitch; taskready(taskrunning); // 將自己設置為ready重新放回可運行隊列 taskstate("yield"); taskswitch(); //將控制權還給調度器 return tasknswitch - n - 1; }
看看taskswitch:
void taskswitch(void) { needstack(0); // 檢查當前協程是否堆棧溢出,如果溢出,程序退出 contextswitch(&taskrunning->context, &taskschedcontext); // 切換到 taskschedcontext 上下文,從上面調度器循環可以看出,它就是調度器上下文 }
看看如何檢查協程堆棧溢出:
void needstack(int n) { Task *t; t = taskrunning; // t是個棧變量,當前協程是taskrunning if((char*)&t <= (char*)t->stk // t是taskrunning, stk是taskrunning這個協程的棧頂,棧的增長方向是從高到低,stk是低地址,顯然,t這個局部變量的地址小於stk時,棧溢出 || (char*)&t - (char*)t->stk < 256+n){ // 如果離stk的地址小於256+n,則同樣說明溢出,為什么這里需要預留256+n,不太清楚。 fprint(2, "task stack overflow: &t=%p tstk=%p n=%d\n", &t, t->stk, 256+n); abort(); } }
最后看看contextswitch:
static void contextswitch(Context *from, Context *to) { if(swapcontext(&from->uc, &to->uc) < 0){ //調用swapcontext切換到to->uc協程 fprint(2, "swapcontext failed: %r\n"); assert(0); } }
taskswitch之后控制權回到調度器,調度器就繼續從可運行隊列中取出下一個協程運行了。
下面看看makecontext:
void makecontext(ucontext_t *ucp, void (*func)(void), int argc, ...) { int *sp; sp = (int*)ucp->uc_stack.ss_sp+ucp->uc_stack.ss_size/4; // 將sp移動到分配的棧空間的最高地址 sp -= argc; // 往棧低地址方向留出argc個sizeof(int)空間用於后續壓argc個int參數進棧 sp = (void*)((uintptr_t)sp - (uintptr_t)sp%16); /* 16-align for OS X */ memmove(sp, &argc+1, argc*sizeof(int)); //將argc后面的int參數進棧 *--sp = 0; /* return address */ // 函數返回后執行的下一條指令,這個返回值沒用,因為協程是由外部調度器調度的。 ucp->uc_mcontext.mc_eip = (long)func; //設置IP ucp->uc_mcontext.mc_esp = (int)sp; //設置當前棧頂,告訴func從哪里分配棧變量 }
由於函數調用返回,壓棧順序,棧幀的變化參看:http://www.cnblogs.com/foxmailed/archive/2013/01/29/2881402.html
以上就是協程相關的全部流程。
后續分析同步IO操作的封裝。