協程
什么是協程
wikipedia 的定義:
協程是一個無優先級的子程序調度組件,允許子程序在特點的地方掛起恢復。
線程包含於進程,協程包含於線程。只要內存足夠,一個線程中可以有任意多個協程,但某一時刻只能有一個協程在運行,多個協程分享該線程分配到的計算機資源。
為什么需要協程
簡單引入
就實際使用理解來講,協程允許我們寫同步代碼的邏輯,卻做着異步的事,避免了回調嵌套,使得代碼邏輯清晰。code like this:
co(function*(next){
let [err,data]=yield fs.readFile("./test.txt",next);//異步讀文件
[err]=yield fs.appendFile("./test2.txt",data,next);//異步寫文件
//....
})()
異步 指令執行之后,結果並不立即顯現的操作稱為異步操作。及其指令執行完成並不代表操作完成。
協程是追求極限性能和優美的代碼結構的產物。
一點歷史
起初人們喜歡同步編程,然后發現有一堆線程因為I/O卡在那里,並發上不去,資源嚴重浪費。
然后出了異步(select,epoll,kqueue,etc),將I/O操作交給內核線程,自己注冊一個回調函數處理最終結果。
然而項目大了之后代碼結構變得不清晰,下面是個小例子。
async_func1("hello world",func(){
async_func2("what's up?",func(){
async_func2("oh ,friend!",func(){
//todo something
})
})
})
於是發明了協程,寫同步的代碼,享受着異步帶來的性能優勢。
程序運行是需要的資源:
- cpu
- 內存
- I/O (文件、網絡,磁盤(內存訪問不在一個層級,忽略不計))
協程的實現原理(c++和node.js里面的實現)
libco 一個C++協程庫實現
libco 是騰訊開源的一個C++協程庫,作為微信后台的基礎庫,經受住了實際的檢驗。項目地址:https://github.com/Tencent/libco
個人源碼閱讀項目:https://github.com/yyrdl/libco-code-study (未完結)
libco源代碼文件一共11個,其中一個是匯編代碼,其余是C++,閱讀起來相對較容易。
在C++里面實現協程要解決的問題有如下幾個:
- 何時掛起協程?何時喚醒協程?
- 如何掛起、喚醒協程,如何保護協程運行時的上下文?
- 如何封裝異步操作?
前期知識准備
- 現代操作系統是分時操作系統,資源分配的基本單位是進程,CPU調度的基本單位是線程。
- C++程序運行時會有一個運行時棧,一次函數調用就會在棧上生成一個record
- 運行時內存空間分為全局變量區(存放函數,全局變量),棧區,堆區。棧區內存分配從高地址往低地址分配,堆區從低地址往高地址分配。
- 下一條指令地址存在於指令寄存器IP,ESP寄存值指向當前棧頂地址,EBP指向當前活動棧幀的基地址。
- 發生函數調用時操作為:將參數從右往左一次壓棧,將返回地址壓棧,將當前EBP寄存器的值壓棧,在棧區分配當前函數局部變量所需的空間,表現為修改ESP寄存器的值。
- 協程的上下文包含屬於他的棧區和寄存器里面存放的值。
何時掛起,喚醒協程?
如開始介紹時所說,協程是為了使用異步的優勢,異步操作是為了避免IO操作阻塞線程。那么協程掛起的時刻應該是當前協程發起異步操作的時候,而喚醒應該在其他協程退出,並且他的異步操作完成時。
如何掛起、喚醒協程,如何保護協程運行時的上下文?
協程發起異步操作的時刻是該掛起協程的時刻,為了保證喚醒時能正常運行,需要正確保存並恢復其運行時的上下文。
所以這里的操作步驟為:
- 保存當前協程的上下文(運行棧,返回地址,寄存器狀態)
- 設置將要喚醒的協程的入口指令地址到IP寄存器
- 恢復將要喚醒的協程的上下文
這部分操作相應的源代碼:
.globl coctx_swap//定義該部分匯編代碼對外暴露的函數命
#if !defined( __APPLE__ )
.type coctx_swap, @function
#endif
coctx_swap:
#if defined(__i386__)
leal 4(%esp), %eax //sp R[eax]=R[esp]+4 R[eax]的值應該為coctx_swap的第一個參數在棧中的地址
movl 4(%esp), %esp // R[esp]=Mem[R[esp]+4] 將esp指向 &(curr->ctx) 當前routine 上下文的內存地址,ctx在堆區,現在esp應指向reg[0]
leal 32(%esp), %esp //parm a : ®s[7] + sizeof(void*) push 操作是以esp的值為基准,push一個值,則esp的值減一個單位(因為是按棧區的操作邏輯,從高位往低位分配地址),但ctx是在堆區,所以應將esp指向reg[7],然后從eax到-4(%eax)push
//保存寄存器值到棧中,實際對應coctx_t->regs 數組在棧中的位置(參見coctx.h 中coctx_t的定義)
pushl %eax //esp ->parm a
pushl %ebp
pushl %esi
pushl %edi
pushl %edx
pushl %ecx
pushl %ebx
pushl -4(%eax) //將函數返回地址壓棧,即coctx_swap 之后的指令地址,保存返回地址,保存到coctx_t->regs[0]
//恢復運行目標routine時的環境(各個寄存器的值和棧狀態)
movl 4(%eax), %esp //parm b -> ®s[0] //切換esp到目標 routine ctx在棧中的起始地址,這個地址正好對應regs[0],pop一次 esp會加一個單位的值
popl %eax //ret func addr regs[0] 暫存返回地址到 EAX
//恢復當時的寄存器狀態
popl %ebx // regs[1]
popl %ecx // regs[2]
popl %edx // regs[3]
popl %edi // regs[4]
popl %esi // regs[5]
popl %ebp // regs[6]
popl %esp // regs[7]
//將返回地址壓棧
pushl %eax //set ret func addr
//將 eax清零
xorl %eax, %eax
//返回,這里返回之后就切換到目標routine了,C++代碼中調用coctx_swap的地方之后的代碼將得不到立即執行
ret
#elif
這部分代碼只是做了寄存器部分的操作。依賴的結構體定義,見文件coctx.h中:
struct coctx_t
{
#if defined(__i386__)
void *regs[ 8 ];//32位機,依次為:ret,ebx,ecx,edx,edi,esi,ebp,eax
#else
void *regs[ 14 ];//64位機的情況
#endif
size_t ss_size;//空間大小
char *ss_sp;//ESP
};
調用coctx_swap 函數只在文件co_routine.cpp中的co_swap函數。
保存運行棧的操作見co_swap函數中調用coctx_swap之前的部分。具體步驟為取當前棧頂地址 (代碼:char c; esp=&c
),若不是共享棧模型則清理下env,若不是則判斷共享棧區有沒有被占用,被占用則從堆區申請內存保存,然后再分配共享棧。
需要注意的是,libco運行時的棧區不在是傳統意義上的棧區,其空間實際來自於堆區。
如何封裝異步操作?
這部分代碼見:
- co_hook_sys_call.cpp
- co_routine.cpp
- co_epoll.cpp
- co_epoll.h
核心思想是hook系統本來的I/O接口,比如socket()
函數,和epoll(kqueue)結合,采用一個co_eventloop
來統一管理,當發現一個協程發起異步操作時,就將其掛起放入等待隊列,喚醒其他異步操作已經完成的協程。可以聯系libevent里面的event_loop,區別在在於一個是操作棧區和寄存器恢復協程,一個是調用綁定的回調函數。
node.js里面協程
node.js 的優勢:
- node.js天生異步(下面是libuv)
- javascript的閉包特性完成了上下文的保存工作
需要我們做的:
- 實現同步編程
附上 文章開始時的代碼:
const fs=require("fs");
const co=require("zco");
co(function*(next){
let [err,data]=yield fs.readFile("./test.txt",next);//異步讀文件
[err]=yield fs.appendFile("./test2.txt",data,next);//異步寫文件
//....
})()
JS 中的Generator
Generator是一個迭代器生成器,也是js中實現協程的關鍵。
let gen=function *() {
console.log("ok1");
var a=yield 1;
console.log("a:"+a);
var b=yield 2;
console.log("b:"+b);
}
var iterator=gen();
console.log("ok2");
console.log(iterator.next(100));
console.log(iterator.next(101));
console.log(iterator.next(102));
輸出:
ok2
ok1
{ value: 1, done: false }
a:101
{ value: 2, done: false }
b:102
{ value: undefined, done: true }
從這里我們可以看到其執行順序,以及各個值的變化。iterator.next() 返回的值即yield 之后的表達式的返回值,yield之前的變量的值即iterator.next方法傳入的值。通過這個特性,合理包裝即可實現coroutine.
以下是zco
模塊源碼,項目地址:https://github.com/yyrdl/zco:
/**
* Created by yyrdl on 2017/3/14.
*/
var slice = Array.prototype.slice;
var co = function (gen) {
var iterator,
callback = null,
hasReturn = false;
var _end = function (e, v) {
callback && callback(e, v); //I shoudn't catch the error throwed by user's callback
if(callback==null&&e){//the error should be throwed if no handler instead of catching silently
throw e;
}
}
var run=function(arg){
try {
var v = iterator.next(arg);
hasReturn = true;
v.done && _end(undefined, v.value);
} catch (e) {
_end(e);
}
}
var nextSlave = function (arg) {
hasReturn = false;
run(arg);
}
var next = function () {
var arg = slice.call(arguments);
if (!hasReturn) {//support fake async operation,avoid error: "Generator is already running"
setTimeout(nextSlave, 0, arg);
} else {
nextSlave(arg);
}
}
if ("[object GeneratorFunction]" === Object.prototype.toString.call(gen)) {//todo: support other Generator implements
iterator = gen(next);
} else {
throw new TypeError("the arg of co must be generator function")
}
var future = function (cb) {
if ("function" == typeof cb) {
callback = cb;
}
run();
}
return future;
}
module.exports = co;