Coroutine及其實現


 

    線程是內核對外提供的服務,應用程序可以通過系統調用讓內核啟動線程,由內核來負責線程調度和切換。線程在等待IO操作時線程變為unrunnable狀態會觸發上下文切換。現代操作系統一般都采用搶占式調度,上下文切換一般發生在時鍾中斷和系統調用返回前,調度器計算當前線程的時間片,如果需要切換就從運行隊列中選出一個目標線程,保存當前線程的環境,並且恢復目標線程的運行環境,最典型的就是切換ESP指向目標線程內核堆棧,將EIP指向目標線程上次被調度出時的指令地址。

    協程也叫用戶態線程,協程之間的切換發生在用戶態。在用戶態沒有時鍾中斷,系統調用等機制,那么協程切換由什么觸發?調度器將控制權交給某個協程后,控制權什么時候回到調度器,從而調度另外一個協程運行? 實際上,這需要協程主動放棄CPU,控制權回到調度器,從而調度另外一個協程運行。所謂協作式線程(cooperative),需要協程之間互相協作,不需要使用CPU時將CPU主動讓出。

    協程切換和內核線程的上下文切換相同,也需要有機制來保存當前上下文,恢復目標上下文。在POSIX系統上,getcontext/makecontext/swapcontext等可以用來做這件事。

    協程帶來的最大的好處就是可以用同步的方式來寫異步的程序。比如協程A,B:A是工作協程,B是網絡IO協程(這種模型下,實際工作協程會比網絡IO協程多),A發送一個包時只需要將包push到A和B之間的一個channel,然后就可以主動放棄CPU,讓出CPU給其它協程運行,B從channel中pop出將要發送的包,接收到包響應后,將結果放到A能拿到的地方,然后將A標識為ready狀態,放入可運行隊列等待調度,A下次被調度器調度就可以拿到結果繼續做后面的事情。如果是基於線程的模型,A和B都是線程,通常基於回調的方式,1. A阻塞在某個隊列Q上,B接受到響應包回調A傳給B的回調函數f,回調函數f將響應包push到Q中,A可以取到響應包繼續干活,如果阻塞基於cond等機制,則會被OS調度出去,如果忙等,則耗費CPU。2. A可以不阻塞在Q上,而是繼續做別的事情,可以定期過來取結果。 這種情況下,線程模型業務邏輯其實被打亂了,發包和取包響應的過程被隔離開了。

   實現協程庫的基本思路很簡單,每個線程一個調度器,就是一個循環,不斷的從可運行隊列中取出協程,並且利用swapcontext恢復協程的上下文從而繼續執行協程。當一個協程放棄CPU時,通過swapcontext恢復調度器上下文從而將控制權歸還給調度器,調度器從可運行隊列選擇下一個協程。每個協程初始化通過getcontext和makecontext,需要的棧空間從堆上分配即可。

   以下分析一個簡單的協程庫libtask,由golang team成員之一的Russ cox在加入golang team之前開發。只支持單線程,簡單包裝了一下read/write等同步IO。

   在libtask中,一個協程用一個struct Task來表示:

struct Task        
{ 
  char  name[256];  // offset known to acid
  char  state[256];
  Task  *next; //通過這兩個指針將task串起來
  Task  *prev;
  Task  *allnext;
  Task  *allprev;
  Context context;// 當前協程上下文
  uvlong  alarmtime;
  uint  id;
  uchar *stk; // 當前協程可以使用的堆棧,初始化為棧頂地址
  uint  stksize;// 當前協程可以使用的堆棧大小
  int exiting;
  int alltaskslot;
  int system;
  int ready;
  void  (*startfn)(void*);//當前協程的執行入口函數
  void  *startarg;//參數
  void  *udata;
};

 下面看看新增一個協程的過程:

static Task*
taskalloc(void (*fn)(void*), void *arg, uint stack)
{                                                                                                                                                                    
  Task *t;
  sigset_t zero;
  uint x, y;
  ulong z;

  /* allocate the task and stack together */
  t = malloc(sizeof *t+stack);     //在堆上為這個協程分配結構體和協程所使用的堆棧
  if(t == nil){
    fprint(2, "taskalloc malloc: %r\n");
    abort();
  }
  memset(t, 0, sizeof *t);
  t->stk = (uchar*)(t+1);
  t->stksize = stack;
  t->id = ++taskidgen;
  t->startfn = fn;                // 協程入口函數
  t->startarg = arg;              // 協程入口函數參數

  /* do a reasonable initialization */
  memset(&t->context.uc, 0, sizeof t->context.uc);
  sigemptyset(&zero);
  sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);

  /* must initialize with current context */
  if(getcontext(&t->context.uc) < 0){              // 初始化當前協程上下文
    fprint(2, "getcontext: %r\n");
    abort();
  }

  /* call makecontext to do the real work. */
  /* leave a few words open on both ends */
  t->context.uc.uc_stack.ss_sp = t->stk+8;          //ss_sp成員為棧頂地址,后續makecontext會將ss_sp往高地址移動ss_size個字節,從這里開始壓棧
  t->context.uc.uc_stack.ss_size = t->stksize-64;   //ss_size成員為棧大小
#if defined(__sun__) && !defined(__MAKECONTEXT_V2_SOURCE)   /* sigh */
#warning "doing sun thing"
  /* can avoid this with __MAKECONTEXT_V2_SOURCE but only on SunOS 5.9 */
  t->context.uc.uc_stack.ss_sp = 
    (char*)t->context.uc.uc_stack.ss_sp
    +t->context.uc.uc_stack.ss_size;
#endif
  /*
   * All this magic is because you have to pass makecontext a
   * function that takes some number of word-sized variables,
   * and on 64-bit machines pointers are bigger than words.
   */
//print("make %p\n", t);
  z = (ulong)t;
  y = z;
  z >>= 16; /* hide undefined 32-bit shift from 32-bit compilers */
  x = z>>16;
  makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);       // 協程入口函數為taskstart,y,x兩個參數會被壓到t->context.uc.uc_stack棧底
  return t;
}

 

 然后調用taskready將這個協程放入可運行隊列中:

void
taskready(Task *t)
{
  t->ready = 1; //
  addtask(&taskrunqueue, t);   //將協程放入到可運行隊列中,后續調度器就可以從taskrunqueue中拿到它了。taskrunqueue就是一個全局變量,libtask只支持單線程從這里也可以看出來
}

現在可以看看調度器:

static void
taskscheduler(void)
{
  int i;
  Task *t;

  taskdebug("scheduler enter");
  for(;;){                          //無限循環
    if(taskcount == 0)
      exit(taskexitval);
    t = taskrunqueue.head;          //從可運行隊列頭部取出下一個運行的協程
    if(t == nil){
      fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount);
      exit(1);
    }
    deltask(&taskrunqueue, t);      //從可運行隊列中將它刪除
    t->ready = 0;
    taskrunning = t;                //將t設置為當前正在運行的協程,taskrunning是一個全局變量
    tasknswitch++;                  //統計值,協程一共執行了多少次
    taskdebug("run %d (%s)", t->id, t->name);
    contextswitch(&taskschedcontext, &t->context);    // 通過swapcontext切換到目標協程,並且將調度器上下文保存在全局變量taskschedcontext中
//print("back in scheduler\n"); taskrunning = nil; if(t->exiting){ if(!t->system) taskcount--; i = t->alltaskslot; alltask[i] = alltask[--nalltask]; alltask[i]->alltaskslot = i; free(t); } } }

協程主動放棄CPU調用taskyield:

int
taskyield(void)         
{
  int n;
  n = tasknswitch;
  taskready(taskrunning); // 將自己設置為ready重新放回可運行隊列
  taskstate("yield");
  taskswitch();           //將控制權還給調度器
  return tasknswitch - n - 1;
}

看看taskswitch:

void
taskswitch(void)
{
  needstack(0);     // 檢查當前協程是否堆棧溢出,如果溢出,程序退出
  contextswitch(&taskrunning->context, &taskschedcontext);     // 切換到 taskschedcontext 上下文,從上面調度器循環可以看出,它就是調度器上下文
}

看看如何檢查協程堆棧溢出:

void
needstack(int n)
{
    Task *t;           
    t = taskrunning;                  // t是個棧變量,當前協程是taskrunning
    if((char*)&t <= (char*)t->stk     // t是taskrunning, stk是taskrunning這個協程的棧頂,棧的增長方向是從高到低,stk是低地址,顯然,t這個局部變量的地址小於stk時,棧溢出
    || (char*)&t - (char*)t->stk < 256+n){     // 如果離stk的地址小於256+n,則同樣說明溢出,為什么這里需要預留256+n,不太清楚。
        fprint(2, "task stack overflow: &t=%p tstk=%p n=%d\n", &t, t->stk, 256+n);
        abort();
    }
}

最后看看contextswitch:

static void
contextswitch(Context *from, Context *to)
{
  if(swapcontext(&from->uc, &to->uc) < 0){   //調用swapcontext切換到to->uc協程
    fprint(2, "swapcontext failed: %r\n");
    assert(0);
  }
}

taskswitch之后控制權回到調度器,調度器就繼續從可運行隊列中取出下一個協程運行了。

下面看看makecontext:

void
makecontext(ucontext_t *ucp, void (*func)(void), int argc, ...)
{
    int *sp;

    sp = (int*)ucp->uc_stack.ss_sp+ucp->uc_stack.ss_size/4; // 將sp移動到分配的棧空間的最高地址
    sp -= argc;  // 往棧低地址方向留出argc個sizeof(int)空間用於后續壓argc個int參數進棧 
    sp = (void*)((uintptr_t)sp - (uintptr_t)sp%16);    /* 16-align for OS X */
    memmove(sp, &argc+1, argc*sizeof(int));    //將argc后面的int參數進棧

    *--sp = 0;        /* return address */     // 函數返回后執行的下一條指令,這個返回值沒用,因為協程是由外部調度器調度的。
    ucp->uc_mcontext.mc_eip = (long)func;      //設置IP
    ucp->uc_mcontext.mc_esp = (int)sp;        //設置當前棧頂,告訴func從哪里分配棧變量
}

由於函數調用返回,壓棧順序,棧幀的變化參看:http://www.cnblogs.com/foxmailed/archive/2013/01/29/2881402.html

以上就是協程相關的全部流程。

后續分析同步IO操作的封裝。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM