c coroutine


今天看了下雲風寫的關於 c coroutine博客 (代碼), 發現 coroutine 實現原理其實還比較簡單,就用戶態棧切換,只需要幾十行匯編,特別輕量級。

具體實現

 1. 創建一個coroutine: 也就是創建一塊連續內存,用於存放棧空間,並設置好入口函數所需要的寄存器

  makecontext glibc c語言實現

 2. resume coroutine:  push保存當前執行上下文的寄存器到棧上,修改%rsp寄存器, jmp 到指定coroutine 執行指令位置,pop 恢復寄存器,開始執行

   swapcontext glibc 匯編實現

 3. yield coroutine: 同resume

   棧切換涉及寄存器操作,得用匯編實現, x86 8個通用寄存器,x64 16個,通過push 保存到棧,pop 恢復到寄存器;比較重要寄存器%rsp 棧頂指針,%rip 指令指針不能直接操作,通過call、jmp 跳轉新的Code執行位置。 

  在64匯編中,並不需要對16個寄存器都備份,其中%rax作為返回值、%r10 %r11 被調用方使用前會自己備份.

 參考: X86-64寄存器和棧幀

X86-64寄存器的變化,不僅體現在位數上,更加體現在寄存器數量上。新增加寄存器%r8到%r15。加上x86的原有8個,一共16個寄存器。
剛剛說到,寄存器集成在CPU上,存取速度比存儲器快好幾個數量級,寄存器多了,GCC就可以更多的使用寄存器,替換之前的存儲器堆棧使用,從而大大提升性能。
讓寄存器為己所用,就得了解它們的用途,這些用途都涉及函數調用,X86-64有16個64位寄存器,分別是:%rax,%rbx,%rcx,%rdx,%esi,%edi,%rbp,%rsp,%r8,%r9,%r10,%r11,%r12,%r13,%r14,%r15。其中:

  1. %rax 作為函數返回值使用。
  2. %rsp 棧指針寄存器,指向棧頂
  3. %rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函數參數,依次對應第1參數,第2參數。。。
  4. %rbx,%rbp,%r12,%r13,%14,%15 用作數據存儲,遵循被調用者使用規則,簡單說就是隨便用,調用子函數之前要備份它,以防他被修改
  5. %r10,%r11 用作數據存儲,遵循調用者使用規則,簡單說就是使用之前要先保存原值

 

 cloudwu/coroutine 測試

 測試環境:R620 E5-2620 2.4G

 測試次數:1kw 次yeild操作

 結果: 

time ./main

real 0m7.886s
user 0m4.408s
sys 0m3.447s

分析:

- 單核心每秒1.27M/s yield,每次耗時約 2000 cpu周期
- 因sys占用近一半時間,strace統計 每次yield至少兩次 rt_sigprocmask 系統調用,glibc 還考慮到了sig 設置的切換,其實必要性不大
- 切換時棧需要memcpy,棧因為需要預先分配,一般都在1M左右,但實際使用很少超過10K,如果為每個coroutine 預先分配1M,內存消耗過大。
  雲風實現里面,只分配一個1M的棧,coroutine 切換時才將實際大小的棧memcpy出來。節省內存,但性能消耗也不可忽視
- 如果去掉syscall,性能會有很大提升
 

微信libco協程庫

在infoq一個關於微信后端存儲視頻提到:  https://github.com/starjiang/libco
相比:
- 沒有使用glibc,只支持linux,但總體和glibc 實現類似,優化掉了 rt_sigprocmask
- 為每個coroutine 預分配128K的棧
- 包含 epoll 網絡庫實現
- 單線程版本
- 超時控制
 
就看看socket 的read 實現:
ssize_t read( int fd, void *buf, size_t nbyte )
{
    ....... 
    int timeout = ( lp->read_timeout.tv_sec * 1000 )
                + ( lp->read_timeout.tv_usec / 1000 );

    struct pollfd pf = { 0 };
    pf.fd = fd;
    pf.events = ( POLLIN | POLLERR | POLLHUP );

    int pollret = poll( &pf,1,timeout );  // 此處上下文將yeild, 切換到到epoll_wait 直到fd可讀,當前協程才會被重新resume

    ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte ); // read系統調用

    if( readret < 0 )
    {
        co_log_err("CO_ERR: read fd %d ret %ld errno %d poll ret %d timeout %d",
    }

    return readret;

}

int poll(struct pollfd fds[], nfds_t nfds, int timeout)
{
   ......
    return co_poll( co_get_epoll_ct(),fds,nfds,timeout );
}


int co_poll( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout )
{
....
    for(nfds_t i=0;i<nfds;i++)
    {
        arg.pPollItems[i].pSelf = fds + i;
        arg.pPollItems[i].pPoll = &arg;

        arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
        struct epoll_event &ev = arg.pPollItems[i].stEvent;

        if( fds[i].fd > -1 )
        {
            ev.data.ptr = arg.pPollItems + i;
            ev.events = PollEvent2Epoll( fds[i].events );

            epoll_ctl( epfd,EPOLL_CTL_ADD, fds[i].fd, &ev );  // 添加epoll監聽事件
        }
        //if fail,the timeout would work

    }

    co_yield_env( co_get_curr_thread_env() ); // yiled 協程,將被切換到epoll_wait 
....
}


void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
    epoll_event *result = (epoll_event*)calloc(1, sizeof(epoll_event) * stCoEpoll_t::_EPOLL_SIZE );
     
    for(;;)
    {
        int ret = epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
        // 超時時間1ms,而不是一直等待,方便做send timeout 處理
        ....  
        // resume 收到數據fd所在coroutine
}

 

總體上,讓accept、read、write 等操作網絡IO操作,用同步方式來寫但實際以NIO方式執行,不阻塞線程,減少代碼量同時邏輯更清晰。

 
 

其他語言

Java: 以前公司rpc有通過JavaFlow實現,但沒有正式用,貌似有性能和其他一些問題;虛機語言線程上下文較為復雜,不像c那么簡單切換棧。

C#: IEnumerator不怎么完善版本后,4.5 使用語法糖 await 編譯器技巧實現類似效果。

Erlang: 原生進程模型就是coroutine,相比上面實現就是玩具;多線程、跨線程任務遷移、私有堆和棧、線程相關內存分配器、消息箱、公平調度等。Erlang 進程棧因為解釋執行,棧空間不是由CPU自動管理,不需要連續的,可以動態擴展,沒上限可遞歸到OOM。

Golang:goroutine Erlang的低配版,夠用也實用;可惜沒有分布式支持,但關鍵golang執行性能比Erlang高至少一個數量級。

 
圖片來自coursera 北京大學的 計算機組成課程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM