注:為方便理解,本文貼出的代碼部分經過了縮減或展開,與實際skynet代碼可能會有所出入。
作為一個skynet actor,在啟動腳本被加載的過程中,總是要調用skynet.start和skynet.dispatch的,前者在skynet-os中做一些初始化工作,設置消息的Lua回調,后者則注冊針對某協議的解析回調。舉個例子:
1 local skynet = require "skynet" 2 3 local function hello() 4 skynet.ret(skynet.pack("Hello, World!")) 5 print("hello OK") 6 end 7 8 skynet.start(function() 9 skynet.dispatch("lua", function(session, address, cmd, ...) 10 assert(cmd == "hello") 11 hello() 12 end) 13 end)
先是調用skynet.start注冊初始化回調,在其中調用skynet.dispatch注冊針對"lua"協議的解析回調。skynet的基本使用這里我們就不多說了,具體見官方文檔。下面我們就從skynet.start(見skynet.lua)開始,逐一分析流程。
1 function skynet.start(start_func) 2 c.callback(skynet.dispatch_message) 3 skynet.timeout(0, function() 4 skynet.init_service(start_func) 5 end) 6 end
這里的c來自於require "skynet.core",它是在lua-skynet.c中注冊的,如下:
1 int luaopen_skynet_core(lua_State *L) { 2 luaL_checkversion(L); 3 4 luaL_Reg l[] = { 5 ... 6 { "callback", _callback }, 7 { NULL, NULL }, 8 }; 9 10 luaL_newlibtable(L, l); 11 12 lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context"); 13 struct skynet_context *ctx = lua_touserdata(L,-1); 14 if (ctx == NULL) { 15 return luaL_error(L, "Init skynet context first"); 16 } 17 18 luaL_setfuncs(L,l,1); 19 20 return 1; 21 }
可以看到,它注冊了幾個函數,並將skynet_context實例作為各函數的upvalue,方便調用時獲取。skynet.start中調用c.callback,對應的就是lua-skynet.c中的_callback函數,skynet.dispatch_message回調就是它的參數:
1 static int _callback(lua_State *L) { 2 struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1)); 3 int forward = lua_toboolean(L, 2); 4 luaL_checktype(L,1,LUA_TFUNCTION); 5 lua_settop(L,1); 6 lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); 7 8 lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD); 9 lua_State *gL = lua_tothread(L,-1); 10 11 if (forward) { 12 skynet_callback(context, gL, forward_cb); 13 } else { 14 skynet_callback(context, gL, _cb); 15 } 16 17 return 0; 18 }
可以看到,其以函數_cb為key,LUA回調(skynet.dispatch_message)作為value被注冊到全局注冊表中。skynet_callback(在skynet_server.c中)則設置函數指針_cb為C層面的消息處理函數:
1 void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) { 2 context->cb = cb; 3 context->cb_ud = ud; 4 }
先不關注skynet-os內部的線程調度細節,只需要知道,skynet-context接收到消息后會轉發給context->cb處理,也就是_cb函數。在_cb中,從全局表中取到關聯的LUA回調,將type, msg, sz, session, source壓棧調用:
1 static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) { 2 lua_State *L = ud; 3 int trace = 1; 4 int r; 5 int top = lua_gettop(L); 6 if (top == 0) { 7 lua_pushcfunction(L, traceback); 8 lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); 9 } else { 10 assert(top == 2); 11 } 12 lua_pushvalue(L,2); 13 14 lua_pushinteger(L, type); 15 lua_pushlightuserdata(L, (void *)msg); 16 lua_pushinteger(L,sz); 17 lua_pushinteger(L, session); 18 lua_pushinteger(L, source); 19 20 r = lua_pcall(L, 5, 0 , trace); 21 22 if (r == LUA_OK) { 23 return 0; 24 } 25 }
此時調用流程正式轉到skynet.lua中的skynet.dispatch_message:
1 function skynet.dispatch_message(...) 2 local succ, err = pcall(raw_dispatch_message,...) 3 while true do 4 local key,co = next(fork_queue) 5 fork_queue[key] = nil 6 pcall(suspend,co,coroutine_resume(co)) 7 end 8 end
首先是將msg交由raw_dispatch_message作分發,然后開始處理fork_queue中緩存的fork協程:
pcall(suspend, co, coroutine_resume(co))
這行代碼是我們今天關注的重點。在繼續之前,我假設你對lua的協程有一定的了解,了解coroutine.resume,coroutine.yield的基本用法。coroutine就是lua里的線程,它擁有自己的函數棧,但與我們平常接觸的大多數操作系統里的線程不同,是非搶占式的。skynet對lua的coroutine作了封裝(詳見lua-profile.c),主要是增加了starttime和totaltime的監測,最終還是交由lua的coroutine庫來處理的。既然這里分析到了fork_queue,那我們就先以skynet.fork為例,看看它作了什么:
1 function skynet.fork(func,...) 2 local args = table.pack(...) 3 local co = co_create(function() 4 func(table.unpack(args,1,args.n)) 5 end) 6 table.insert(fork_queue, co) 7 return co 8 end
skynet.fork做的事情很簡單,通過co_create創建一個coroutine並將其入隊fork_queue。看看co_create是如何創建協程的:
1 local function co_create(f) 2 local co = table.remove(coroutine_pool) 3 if co == nil then 4 co = coroutine.create(function(...) 5 f(...) 6 while true do 7 f = nil 8 coroutine_pool[#coroutine_pool+1] = co 9 f = coroutine_yield "EXIT" 10 f(coroutine_yield()) 11 end 12 end) 13 else 14 coroutine_resume(co, f) 15 end 16 return co 17 end
調用co_create時,如果coroutine_pool為空,它會創建一個新的co。co在第一次被resume時,會執行f,接着便進入一個使用和回收的無限循環。在這個循環中,先是收回co到coroutine_pool中,接着便yield "EXIT"到上一次的resume點A。當下一次被resume在點B喚醒時,會先將函數f傳遞過來,接着再次yield到點B,等待下一次在點D被resume喚醒時,傳遞需要的參數過來加以執行,完畢后回收,如此反復。這樣看來,co的執行似乎相當簡單。但是實際上要復雜一些,因為在執行f的過程中,可以再反復地yield和resume。下面我們舉個簡單的例子:
1 skynet.fork(function() 2 print ("skynet fork: <1>") 3 skynet.sleep(10) 4 print ("skynet fork: <2>") 5 end)
我們把skynet.sleep展開:
1 skynet.fork(function() 2 print ("skynet fork: <1>") 3 coroutine_yield("SLEEP", COMMAND_TIMEOUT(10)) 4 print ("skynet fork: <2>") 5 end)
下面開始分析調用流程。fork-co入隊,主co在skynet.dispatch_message中分發消息后取出fork-co,調用resume開始進入fork-co的函數f執行,如下圖所示,如果fork-co是第一次執行,是走圈1,如果是復用,則走圈1*(如果是復用的話,調用co_create時,會先coroutine_resume(co,f)一次進入fork-co,將用戶函數傳遞給while循環中的coroutine_yield "EXIT"點之后,接着fork-co再次yield讓出,等待實際傳參的調用)。接着進入用戶函數,COMMAND_TIMEOUT會先向skynet-kernal發送TIMEOUT命令,如圈2所示。然后yield "SLEEP"到主co的resume點1之后繼續執行,如圈3所示,按圈4的指向,調用suspend進入"SLEEP"分支,記錄下TIMEOUT-session與fork-co的映射關系。此時主co回到skynet.dispatch_message中繼續下一個fork-co的處理。當TIMEOUT消息回來時,會由主co再次進入skynet.dispatch_message並調用raw_dispatch_message分發,這時通過session拿到之前映射的fork-co,再次resume,按照圈5的指向,會跳轉到fork-co的yield "SLEEP"點之后繼續向下處理。用戶函數處理完畢后,回到上層調用,即圈6所指,回收fork-co,接着yield "EXIT"到主co所在raw_dispatch_message中的resume點之后,如圈7所示。進入suspend后,無額外命令,raw_dispatch_message處理結束,繼續主co的消息處理流程。
由以上分析可以看到,實際的協程跳轉過程是比較復雜的,也更顯得小小的LUA在skynet中的精巧運用。為方便理解,順便貼出suspend的代碼(只列出了我們關注的幾個命令,並做了刪減):
1 function suspend(co, result, command, param, size) 2 if command == "CALL" then 3 session_id_coroutine[param] = co 4 elseif command == "SLEEP" then 5 session_id_coroutine[param] = co 6 sleep_session[co] = param 7 elseif command == "RETURN" then 8 local co_session = session_coroutine_id[co] 9 local co_address = session_coroutine_address[co] 10 session_response[co] = true 11 c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) 12 return suspend(co, coroutine_resume(co, ret)) 13 elseif command == "EXIT" then 14 -- coroutine exit 15 local address = session_coroutine_address[co] 16 session_coroutine_id[co] = nil 17 session_coroutine_address[co] = nil 18 session_response[co] = nil 19 elseif command == nil then 20 -- debug trace 21 return 22 end 23 end
看完skynet.fork,我們再回過頭來,看一看skynet.dispatch_message中消息分發raw_dispatch_message的具體細節:
1 local function raw_dispatch_message(prototype, msg, sz, session, source) 2 -- skynet.PTYPE_RESPONSE = 1, read skynet.h 3 if prototype == 1 then 4 local co = session_id_coroutine[session] 5 session_id_coroutine[session] = nil 6 suspend(co, coroutine_resume(co, true, msg, sz)) 7 else 8 local p = proto[prototype] 9 local f = p.dispatch 10 local co = co_create(f) 11 session_coroutine_id[co] = session 12 session_coroutine_address[co] = source 13 suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) 14 end 15 end
RESPONSE的處理先就不說了,在敘述skynet.sleep時已經有所討論。消息會根據它的prototype查找proto,接着調用co_create取得協程user-co,將message解包后resume到user-co進入用戶函數f。而后續的流程則與上文我們討論的fork是一樣的了。比如,在f中調用skynet.call時,會向目標發送消息,接着會yield到主co,回到這里的resume點,接着進入suspend的"CALL"分支,記錄session與user-co的映射關系。下一次response消息回來時,會查找到user-co並resume喚醒,在skynet.call后繼續執行,用戶函數f結束后進入上層調用,回收user-co,等待新的調用。
因為本篇我們關注的是協程調度模型,而非具體的處理細節,因此有空再對skynet.call,skynet.ret等作詳細的細節分析。