協程概念,原理及實現(c++和node.js實現)


協程

什么是協程

wikipedia 的定義:
協程是一個無優先級的子程序調度組件,允許子程序在特點的地方掛起恢復。

線程包含於進程,協程包含於線程。只要內存足夠,一個線程中可以有任意多個協程,但某一時刻只能有一個協程在運行,多個協程分享該線程分配到的計算機資源。

為什么需要協程

簡單引入

就實際使用理解來講,協程允許我們寫同步代碼的邏輯,卻做着異步的事,避免了回調嵌套,使得代碼邏輯清晰。code like this:

   co(function*(next){
     let [err,data]=yield fs.readFile("./test.txt",next);//異步讀文件
     [err]=yield fs.appendFile("./test2.txt",data,next);//異步寫文件
     //....
   })()

異步 指令執行之后,結果並不立即顯現的操作稱為異步操作。及其指令執行完成並不代表操作完成。

協程是追求極限性能和優美的代碼結構的產物。

一點歷史

起初人們喜歡同步編程,然后發現有一堆線程因為I/O卡在那里,並發上不去,資源嚴重浪費。

然后出了異步(select,epoll,kqueue,etc),將I/O操作交給內核線程,自己注冊一個回調函數處理最終結果。

然而項目大了之后代碼結構變得不清晰,下面是個小例子。

  async_func1("hello world",func(){
     async_func2("what's up?",func(){
       async_func2("oh ,friend!",func(){ 
         //todo something
       })
     })
  })

於是發明了協程,寫同步的代碼,享受着異步帶來的性能優勢。

程序運行是需要的資源

  • cpu
  • 內存
  • I/O (文件、網絡,磁盤(內存訪問不在一個層級,忽略不計))

協程的實現原理(c++和node.js里面的實現)

libco 一個C++協程庫實現

libco 是騰訊開源的一個C++協程庫,作為微信后台的基礎庫,經受住了實際的檢驗。項目地址:https://github.com/Tencent/libco

個人源碼閱讀項目:https://github.com/yyrdl/libco-code-study (未完結)

libco源代碼文件一共11個,其中一個是匯編代碼,其余是C++,閱讀起來相對較容易。

在C++里面實現協程要解決的問題有如下幾個:

  • 何時掛起協程?何時喚醒協程?
  • 如何掛起、喚醒協程,如何保護協程運行時的上下文?
  • 如何封裝異步操作?

前期知識准備

  1. 現代操作系統是分時操作系統,資源分配的基本單位是進程,CPU調度的基本單位是線程。
  2. C++程序運行時會有一個運行時棧,一次函數調用就會在棧上生成一個record
  3. 運行時內存空間分為全局變量區(存放函數,全局變量),棧區,堆區。棧區內存分配從高地址往低地址分配,堆區從低地址往高地址分配。
  4. 下一條指令地址存在於指令寄存器IP,ESP寄存值指向當前棧頂地址,EBP指向當前活動棧幀的基地址。
  5. 發生函數調用時操作為:將參數從右往左一次壓棧,將返回地址壓棧,將當前EBP寄存器的值壓棧,在棧區分配當前函數局部變量所需的空間,表現為修改ESP寄存器的值。
  6. 協程的上下文包含屬於他的棧區和寄存器里面存放的值。

何時掛起,喚醒協程?

如開始介紹時所說,協程是為了使用異步的優勢,異步操作是為了避免IO操作阻塞線程。那么協程掛起的時刻應該是當前協程發起異步操作的時候,而喚醒應該在其他協程退出,並且他的異步操作完成時。

如何掛起、喚醒協程,如何保護協程運行時的上下文?

協程發起異步操作的時刻是該掛起協程的時刻,為了保證喚醒時能正常運行,需要正確保存並恢復其運行時的上下文。

所以這里的操作步驟為:

  • 保存當前協程的上下文(運行棧,返回地址,寄存器狀態)
  • 設置將要喚醒的協程的入口指令地址到IP寄存器
  • 恢復將要喚醒的協程的上下文

這部分操作相應的源代碼:

.globl coctx_swap//定義該部分匯編代碼對外暴露的函數命
#if !defined( __APPLE__ )
.type  coctx_swap, @function
#endif
coctx_swap:

#if defined(__i386__)
	leal 4(%esp), %eax //sp   R[eax]=R[esp]+4 R[eax]的值應該為coctx_swap的第一個參數在棧中的地址
	movl 4(%esp), %esp  //    R[esp]=Mem[R[esp]+4] 將esp指向 &(curr->ctx) 當前routine 上下文的內存地址,ctx在堆區,現在esp應指向reg[0]
	leal 32(%esp), %esp //parm a : &regs[7] + sizeof(void*)   push 操作是以esp的值為基准,push一個值,則esp的值減一個單位(因為是按棧區的操作邏輯,從高位往低位分配地址),但ctx是在堆區,所以應將esp指向reg[7],然后從eax到-4(%eax)push
    //保存寄存器值到棧中,實際對應coctx_t->regs 數組在棧中的位置(參見coctx.h 中coctx_t的定義)
	pushl %eax //esp ->parm a

	pushl %ebp
	pushl %esi
	pushl %edi
	pushl %edx
	pushl %ecx
	pushl %ebx
	pushl -4(%eax) //將函數返回地址壓棧,即coctx_swap 之后的指令地址,保存返回地址,保存到coctx_t->regs[0]

    //恢復運行目標routine時的環境(各個寄存器的值和棧狀態)
	movl 4(%eax), %esp //parm b -> &regs[0] //切換esp到目標 routine  ctx在棧中的起始地址,這個地址正好對應regs[0],pop一次 esp會加一個單位的值

	popl %eax  //ret func addr regs[0] 暫存返回地址到 EAX
	//恢復當時的寄存器狀態
	popl %ebx  // regs[1]
	popl %ecx  // regs[2]
	popl %edx  // regs[3]
	popl %edi  // regs[4]
	popl %esi  // regs[5]
	popl %ebp  // regs[6]
	popl %esp  // regs[7]
	//將返回地址壓棧
	pushl %eax //set ret func addr
    //將 eax清零
	xorl %eax, %eax
	//返回,這里返回之后就切換到目標routine了,C++代碼中調用coctx_swap的地方之后的代碼將得不到立即執行
	ret

#elif

這部分代碼只是做了寄存器部分的操作。依賴的結構體定義,見文件coctx.h中:

struct coctx_t
{
#if defined(__i386__)
	void *regs[ 8 ];//32位機,依次為:ret,ebx,ecx,edx,edi,esi,ebp,eax
#else
	void *regs[ 14 ];//64位機的情況
#endif
	size_t ss_size;//空間大小
	char *ss_sp;//ESP
	
};

調用coctx_swap 函數只在文件co_routine.cpp中的co_swap函數。

保存運行棧的操作見co_swap函數中調用coctx_swap之前的部分。具體步驟為取當前棧頂地址 (代碼:char c; esp=&c),若不是共享棧模型則清理下env,若不是則判斷共享棧區有沒有被占用,被占用則從堆區申請內存保存,然后再分配共享棧。

需要注意的是,libco運行時的棧區不在是傳統意義上的棧區,其空間實際來自於堆區。

如何封裝異步操作?

這部分代碼見:

  • co_hook_sys_call.cpp
  • co_routine.cpp
  • co_epoll.cpp
  • co_epoll.h

核心思想是hook系統本來的I/O接口,比如socket()函數,和epoll(kqueue)結合,采用一個co_eventloop來統一管理,當發現一個協程發起異步操作時,就將其掛起放入等待隊列,喚醒其他異步操作已經完成的協程。可以聯系libevent里面的event_loop,區別在在於一個是操作棧區和寄存器恢復協程,一個是調用綁定的回調函數。

node.js里面協程

node.js 的優勢:

  • node.js天生異步(下面是libuv)
  • javascript的閉包特性完成了上下文的保存工作

需要我們做的:

  • 實現同步編程

附上 文章開始時的代碼:

   const fs=require("fs");
   const co=require("zco");
   
   co(function*(next){
     let [err,data]=yield fs.readFile("./test.txt",next);//異步讀文件
     [err]=yield fs.appendFile("./test2.txt",data,next);//異步寫文件
     //....
   })()

JS 中的Generator

Generator是一個迭代器生成器,也是js中實現協程的關鍵。


let gen=function *() {
    console.log("ok1");
    var a=yield 1;
    console.log("a:"+a);
    var b=yield 2;
    console.log("b:"+b);
}


var iterator=gen();
console.log("ok2");

console.log(iterator.next(100));
console.log(iterator.next(101));
console.log(iterator.next(102));

輸出:

ok2
ok1
{ value: 1, done: false }
a:101
{ value: 2, done: false }
b:102
{ value: undefined, done: true }

從這里我們可以看到其執行順序,以及各個值的變化。iterator.next() 返回的值即yield 之后的表達式的返回值,yield之前的變量的值即iterator.next方法傳入的值。通過這個特性,合理包裝即可實現coroutine.

以下是zco模塊源碼,項目地址:https://github.com/yyrdl/zco:

/**
 * Created by yyrdl on 2017/3/14.
 */
var slice = Array.prototype.slice;

var co = function (gen) {

	var iterator,
	callback = null,
	hasReturn = false;

	var _end = function (e, v) {
		callback && callback(e, v); //I shoudn't catch the error throwed by user's callback
		if(callback==null&&e){//the error should be throwed if no handler instead of  catching silently
			throw e;
		}
	}
	var run=function(arg){
		try {
			var v = iterator.next(arg);
			hasReturn = true;
			v.done && _end(undefined, v.value);
		} catch (e) {
			_end(e);
		}
	}
	var nextSlave = function (arg) {
		hasReturn = false;
		run(arg);
	}
	
	var next = function () {
		var arg = slice.call(arguments);
		if (!hasReturn) {//support fake async operation,avoid error: "Generator is already running"
			setTimeout(nextSlave, 0, arg);
		} else {
			nextSlave(arg);
		}
	}
	
	if ("[object GeneratorFunction]" === Object.prototype.toString.call(gen)) {//todo: support other Generator implements 
		iterator = gen(next);
	} else {
		throw new TypeError("the arg of co must be generator function")
	}

	var future = function (cb) {
		if ("function" == typeof cb) {
			callback = cb;
		}
		run();
	}

	return future;
}

module.exports = co;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM