閱讀雲大的博客以及網上關於 skynet 的文章,總是會談服務與消息。不怎么看得懂代碼,光讀這些文字真的很空洞,不明白說啥。網絡的力量是偉大的,相信總能找到一些解決自己疑惑的文章。然后找到了這篇講解 skynet 消息隊列的文章(最新的 skynet 消息隊列代碼已經有更新,變得更簡潔易讀)。了解了 skynet 消息是如何派發的,就想知道消息被派發出去到一個服務后,如何調用服務的 callback 函數,從而處理此消息。碰巧博主寫了這篇講解 skynet 如何注冊回調函數的文章,於是 skynet 的概念“服務與消息”便在代碼中得到了定位,便可以此為入口點探究 skynet 實現。
消息派發
這里雲大已經很詳細的介紹了,我就僅僅在這里略提一下。skynet 把消息分為不同的類別,不同類別的消息有不同的編碼方式,若編寫一個服務,你需要為此服務關注的消息類型注冊 dispatch 函數用來接收此類別的消息。skynet 注冊類別消息的 dispatch 函數有兩種方式。
- 調用 skynet.register_protocol 注冊。函數的參數是一個 table ,以"lua"類消息為例,里面有若干字段含義如下:
{ name = "lua", -- 消息組的字符串名稱 id = skynet.PTYPE_LUA, -- 消息組的數字 id pack = skynet.pack, -- 打包消息 unpack = skynet.unpack, -- 解包消息 dispatch = function(session, source, cmd, ...) ... end -- 消息回調/分發函數 }
指定了 table 中的 dispatch 字段,以后"lua"類消息到達時便會調用此函數。
- 調用 skynet.dispatch 函數注冊。為此,雲大給出了一個慣用寫法,以"lua"類消息為例,如下:
local CMD = {} skynet.dispatch("lua", function(session, source, cmd, ...) local f = assert(CMD[cmd]) f(...) end)
兩種方式可以根據喜好選擇,畢竟一個服務可能需要處理多種類型的消息,需要注冊多個 dispatch 函數。
在 skynet 中用 Lua 編寫一個服務必須調用 skynet.start 啟動函數啟動此服務。
function skynet.start(start_func) c.callback(dispatch_message) skynet.timeout(0, function() init_service(start_func) end) end
skynet.start 其中在一個作用是調用 c.callback 函數把 skynet 框架的消息派發與你自定義的 dispatch 函數聯系起來,這個聯系的紐帶就是 dispatch_message(skynet.lua) 函數。當服務的消息隊列有消息到達時,框架從消息隊列中取出消息經過一些轉換調用到 dispatch_message 函數,然后 dispatch_message 函數根據協議類型調用相應的 dispatch 函數,最終到具體某條消息的處理函數。
消息執行
skynet 是基於服務的,服務間通過消息進行通信。實現方面 skynet 為每個服務創建一個 lua_State ,不同的服務 lua_State 是不同的,因此服務是相互獨立互不影響的。對於消息,"skynet 的 lua 層會為每個請求創建一個獨立的 coroutine"。經過上面一節,了解到消息會到達我們自定義的 dispatch 函數,此時進入了業務相關的代碼邏輯中,我們只關注業務的邏輯而不關注底層消息如何到達這兒的。於是猜測應該是在 dispatch_message 函數中 skynet 會創建 coroutine 來具體處理某個消息。然后,我們猜想消息執行流程大概應該是這樣的:
- 一條消息到達,服務的主線程創建 coroutine 處理此消息,處理完后執行權回到主線程,繼續下一條消息處理。
- 一條消息到達,服務的主線程創建 coroutine 處理此消息,假設此服務是 A ,此時創建的 coroutine 是 coA。A 向另一個服務 B 發送一條消息並等待 B 的返回結果,A 才繼續執行。這時最好的方式是對 coA 做出標記讓出執行,主線程繼續處理其他消息,並根據標記判斷接收的消息是不是派發到 coA 的,若是則再喚醒 coA 繼續執行。
對於單個服務來說,弄清楚一條消息執行流程是這篇筆記的主要內容。
此外由於每條消息都運行在一個 coroutine 中,雲大根據反饋對 coroutine 進行了回收再利用以此提升效率。
skynet 接口有非阻塞 API (如 skynet.ret)也有阻塞 API (如 skynet.call)。阻塞 API 也僅僅是阻塞調用此 API 的 coroutine ,服務本身並沒有阻塞。這兩個 API 剛好與上面猜測的消息執行流程相呼應,接下來以這兩個 API 為例子來說明。順便提一點,調用阻塞 API 時要防止一些問題。
繞來繞去的 coroutine
上面提到 dispatch_message 會創建 coroutine 把消息派發到我們的自定義 dispatch 函數中。實際上完成任務是在函數 raw_dispatch_message 函數中。下面是簡化版的函數實現:
1 local function raw_dispatch_message(prototype, msg, sz, session, source, ...) 2 -- skynet.PTYPE_RESPONSE = 1, read skynet.h 3 if prototype == 1 then -- “response” 類型消息,skynet 已自動處理 4 local co = session_id_coroutine[session] 5 session_id_coroutine[session] = nil 6 suspend(co, coroutine.resume(co, true, msg, sz)) 7 else -- 其他類型消息派發到相應的 dispatch 函數 8 local p = assert(proto[prototype], prototype) 9 local f = p.dispatch -- 我們自定義的 dispatch 函數 10 if f then 11 local co = co_create(f) -- 創建 coroutine 12 session_coroutine_id[co] = session 13 session_coroutine_address[co] = source 14 suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...))) 15 end 16 end 17 end
下面以 skynet 自帶的例子 agent.lua 和 simpledb.lua 為例來進行說明,以 agent 服務 和 simpledb 服務分別指代這兩個服務。agent 服務通過"client"類型協議處理客戶端發送過來的請求,然后 agent 服務和 simpledb 服務通信獲得結果,最后把結果發送到客戶端。simpledb 服務最簡單,接收消息計算結果並返回結果。
先以 simpledb 服務為例進行說明。
1 local skynet = require "skynet" 2 local db = {} 3 4 local command = {} 5 6 function command.GET(key) 7 return db[key] 8 end 9 10 function command.SET(key, value) 11 local last = db[key] 12 db[key] = value 13 return last 14 end 15 16 skynet.start(function() 17 skynet.dispatch("lua", function(session, address, cmd, ...) 18 local f = command[string.upper(cmd)] 19 if f then 20 skynet.ret(skynet.pack(f(...))) 21 else 22 error(string.format("Unknown command %s", tostring(cmd))) 23 end 24 end) 25 skynet.register "SIMPLEDB" -- 注冊名稱,其他服務可以直接向此名稱發送協議 26 end)
simpledb(line 17) 調用 skynet.dispatch 注冊"lua"類型消息的 dispatch 函數,假設這個匿名函數叫 db_dispatch 。
假設 simpledb 接收到 agent 發送過來的"SET"消息。框架從 simpledb 消息隊列中取出消息,經過一些調用代碼執行到 raw_dispatch_message 函數。在 raw_dispatch_message(line 3) 進行 if 條件判斷,這條"SET"消息的消息類型是"lua",因此 prototype 是 10 ,代碼這時執行到 else 分支,目的是為了創建 coroutine 調用 db_dispatch 函數。代碼走到 raw_dispatch_message(line 11) 調用 co_create 函數,在能回收 coroutine 的情況下創建一個 coroutine ,讓我們看看 co_create 實現。
1 local coroutine_pool = {} -- 存放 coroutine 對象的數組 2 local coroutine_yield = coroutine.yield -- 讓出函數 3 4 local function co_create(f) 5 local co = table.remove(coroutine_pool) -- 先從數組中取出 coroutine ,從數組中刪除是禁止此 coroutine 被其他消息使用 6 if co == nil then 7 co = coroutine.create(function(...) 8 f(...) -- 執行我們傳入的函數 9 while true do 10 -- 執行完后回收 coroutine 11 f = nil 12 coroutine_pool[#coroutine_pool+1] = co 13 -- 讓出執行,通知 main_thread 做些清理工作 14 -- coroutine 被喚醒后,代碼會從下面的調用中返回並賦值 f 為我們需要執行的函數,然后繼續執行 15 f = coroutine_yield "EXIT" 16 f(coroutine_yield()) -- 這里再次調用讓出函數,是為了接收參數傳遞給 f 17 end 18 end) 19 else 20 coroutine.resume(co, f) -- 喚醒一個 coroutine ,並傳入參數 f ,f 是我們想要執行的函數 21 end 22 return co 23 end
從使用的理念上,調用函數創建一個 coroutine 對象后,再調用 resume 函數,coroutine 便會執行,調用者無需關注這個 coroutine 是新創建的還是回收利用之前已經創建的。代碼繼續執行,走到 raw_dispatch_message(line 14) ,正如預想的那樣,代碼先調用 coroutine.resume 啟動 coroutine ,於是 dispatch 函數變得以執行。由於 coroutine 是回收利用的,實際在 raw_dispatch_message(line 14) 調用 coroutine.resume 時,coroutine 是分兩種情況執行的,讓我們回到 co_create 函數實現。
- 當調用 co_create ,coroutine_pool 沒有 coroutine 時(此時有可能是服務剛啟動數組中還沒有 coroutine ,也有可能創建的 coroutine 已經被用完了)此時會走到 co_create(line 7) ,創建一個新的 coroutine 。然后調用 coroutine.resume 時,co_create(line 8) 的代碼會被執行,函數執行完后就要回收這個新創建的 co ,然后調用 coroutine_yield "EXIT" 讓出執行,此時 raw_dispatch_message(line 14) 調用的 coroutine.resume 函數返回,代碼回到主線程,調用 suspend 函數處理"EXIT"命令,suspend 函數執行完后,raw_dispatch_message 函數也執行完畢,本次消息也就執行完畢。
- 當調用 co_create ,coroutine_pool 中有剩余的 coroutine 時,此時便會利用這個 coroutine 。代碼執行到 co_create(line 20) ,這里調用 coroutine.resume 喚醒這個之前已經讓出執行的 coroutine ,然后在 co_create(line 15) 對 coroutine_yield 的調用會返回,並賦值 f ,這樣做的目的是為了傳入我們要執行的函數 f 。然后執行到 line 16 再次調用 coroutine_yield ,這次目的是為了接收函數參數。最后在 raw_dispatch_message(line 14) 調用 coroutine.resume 時,coroutine 第二次被喚醒,在 co_create(line 16) coroutine_yield 會返回並返回 resume 傳入的參數,這樣我們想要執行的函數便得到執行。執行后這是一個 while 死循環,代碼走到 co_create(11) 開始回收這個 coroutine ,然后調用 coroutine_yield "EXIT" 讓出執行(接下來的執行同上),消息執行完畢。
分析了 co_create 函數,讓我們回到正題。此時是 simpledb 服務,代碼執行 raw_dispatch_message(line 14) ,coroutine 被執行,db_dispatch 函數被調用,此時代碼走到 simpledb(line 18) 然后 command.SET 函數被調用,緊接着調用 skynet.ret 返回結果。skynet.ret 實現如下:
1 function skynet.ret(msg, sz) 2 msg = msg or "" 3 return coroutine_yield("RETURN", msg, sz) 4 end
在 skynet.ret 函數中會調用 coroutine_yield ,此時 coroutine 會讓出執行,執行權回到主線程 main_thread 。不要暈,千萬不要暈:)現在代碼再次回到 raw_dispatch_message(line 14) ,此時 coroutine.resume 函數返回並返回了 4 個參數:true, "RETURN", msg, sz ,其中 msg, sz 是要發送回去的消息。接着便調用 suspend 函數處理"RETURN"命令。下面看一下簡化版的 suspend 代碼。
1 function suspend(co, result, command, param, size) 2 if command == "CALL" then 3 session_id_coroutine[param] = co -- 記錄下此 coroutine ,接收到"response"消息時獲取 4 elseif command == "RETURN" then 5 local co_session = session_coroutine_id[co] 6 local co_address = session_coroutine_address[co] 7 ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) 8 return suspend(co, coroutine.resume(co, ret)) 9 elseif command == "EXIT" then 10 -- coroutine exit 11 local address = session_coroutine_address[co] 12 release_watching(address) 13 session_coroutine_id[co] = nil 14 session_coroutine_address[co] = nil 15 session_response[co] = nil 16 end 17 end
再次強調一下,此時代碼走到 suspend ,這是在主線程執行的,然后處理"RETURN"命令,發送消息到 agent 服務。這里發現原來調用 skynet.ret 返回消息時實際的消息發送是在主線程執行的。緊接着代碼走到 suspend(line 8) ,再次調用 coroutine.resume ,此時執行權回到 coroutine ,回到 skynet.ret 函數中,在 skynet.ret(line 3) coroutine_yield 返回后,skynet.ret 函數也已經返回,執行權還是在 coroutine ,代碼此時走到 simpledb(line 20) skynet.ret 的返回,db_dispatch 函數也已經執行完並返回,此時 simpledb 已經對"SET"消息處理完畢,這時就相當於 co_create 中的 f 函數執行完畢,下面就是 coroutine 的回收,參考 co_create 說明。OK ,到了這里 simpledb 處理"SET"消息,我們已經分析完畢,看起來很繞,其實也蠻清晰的。我們來總結一下 simpledb 處理"SET"消息在主線程和 coroutine 經歷了哪些切換(忽略 co_create 利用回收的 cocoutine 時做的切換):
(raw_dispatch_message 函數) 主線程 -> (db_dispatch 函數) coroutine: skynet.ret 調用 coroutine_yield"RETURN" 讓出執行 -> (suspend 函數,在 raw_dispatch_message(line 14) 被調用) 主線程: 處理"RETURN",並再次 resume -> coroutine: skynet.ret 返回,db_dispatch 函數返回,調用coroutine_yield"EXIT" 讓出執行 -> (suspend 函數,在suspend(line 8) 被調用) 主線程: 處理"EXIT",suspend 函數返回,raw_dispatch_message 函數返回 -> 消息執行完畢。
以 agent 服務為例說明 skynet.call 調用。
上面解釋了 simpledb 處理"SET"消息的流程,這條消息實際上是 agent 服務發送過去的,agent 也是接收到"client"類型的"set"。agent 簡化版代碼如下:
1 local skynet = require "skynet" 2 local netpack = require "netpack" 3 local socket = require "socket" 4 local sproto = require "sproto" 5 local sprotoloader = require "sprotoloader" 6 7 local host 8 local send_request 9 10 local CMD = {} 11 local REQUEST = {} 12 local client_fd 13 14 function REQUEST:set() 15 print("set", self.what, self.value) 16 local r = skynet.call("SIMPLEDB", "lua", "set", self.what, self.value) 17 end 18 19 local function request(name, args, response) 20 local f = assert(REQUEST[name]) 21 local r = f(args) 22 if response then 23 return response(r) 24 end 25 end 26 27 local function send_package(pack) 28 local package = string.pack(">s2", pack) 29 socket.write(client_fd, package) 30 end 31 32 skynet.register_protocol { 33 name = "client", 34 id = skynet.PTYPE_CLIENT, 35 unpack = function (msg, sz) 36 return host:dispatch(msg, sz) 37 end, 38 dispatch = function (_, _, type, ...) -- "client" 類型消息 dispatch 函數 39 if type == "REQUEST" then 40 local ok, result = pcall(request, ...) 41 if ok then 42 if result then 43 send_package(result) 44 end 45 else 46 skynet.error(result) 47 end 48 else 49 assert(type == "RESPONSE") 50 error "This example doesn't support request client" 51 end 52 end 53 } 54 55 skynet.start(function() 56 skynet.dispatch("lua", function(_,_, command, ...) 57 local f = CMD[command] 58 skynet.ret(skynet.pack(f(...))) 59 end) 60 end)
看見代碼發現 agent 服務處理兩種類型的消息:"lua"和"client"。這里我們關注的是"client"消息,"client"消息的 dispatch 函數是調用 skynet.register_protocol 設置的,賦值給 dispatch 一個匿名函數,假設這個匿名函數叫 ag_client_dispatch 。當接收到客戶端發送來的"set"消息(這里先不管那些不懂的函數,我們此時只關注執行流程),便會調用 REQUEST:set 函數,然后調用 skynet.call 向 simpledb 發送"set"消息,閱讀 skynet 文檔說 skynet.call 是阻塞的(阻塞調用 skynet.call 的 coroutine),我們來看一下是如何阻塞的。先看一下簡化的 skynet.call 代碼:
1 local function yield_call(service, session) 2 local succ, msg, sz = coroutine_yield("CALL", session) 3 return msg,sz 4 end 5 6 function skynet.call(addr, typename, ...) 7 local p = proto[typename] 8 local session = c.send(addr, p.id , nil , p.pack(...)) 9 if session == nil then 10 error("call to invalid address " .. skynet.address(addr)) 11 end 12 return p.unpack(yield_call(addr, session)) 13 end
閱讀發現 skynet.call 和 skynet.ret 有一些相似,不同的是 skynet.call 調用 coroutine_yield 傳入的是"CALL",然后執行權回到主線程 suspend 函數,閱讀 suspend 函數(千萬別暈)代碼發現此時僅僅是記錄了 coroutine ,然后就返回了。 神馬?神馬?神馬?suspend 函數沒有做其他的事情就返回了,我們的 agent 服務對"set"消息的處理追蹤定格在了 skynet.call(line 2) 行,當前這個 coroutine 未被回收,而是被標記了,然后本次 agent 對"set"消息的處理也就完畢了。
當 simpledb 接收到"set"消息並處理完,然后調用 skynet.ret 返回結果時,閱讀 suspend(line 7) 此時給 agent 服務發送了一個類型為 1 的"lua"類型的消息。之后 agent 服務接收到此消息時,agent 服務主線程執行到函數 raw_dispatch_message ,由於 prototype 為 1 ,此時走到了 raw_dispatch_message(line 6) ,找到了上次標記的 coroutine ,並調用 resume 喚醒這個 coroutine 並傳入了接收到的 msg 和 sz(這實際是 simpledb 服務發送來的),接着代碼執行權來到 coroutine ,來到 skynet.call(line 2) ,coroutine_yield 函數返回並返回了接收到的消息。然后 skynet.call 函數執行完畢,執行權依舊是在 coroutine 中,然后回到 agent(line 16) ,接着繼續執行,ag_client_dispatch 執行完畢,然后進行 coroutine 的回收,調用 coroutine_yield"EXIT" ,coroutine 也就執行完畢,執行權回到主線程,raw_dispatch_message(line 6) 行繼續調用suspend 並傳入"EXIT"命令,suspend 執行完后,raw_dispatch_message 也就執行完畢了,agent 對"set"消息的處理也終於結束了。總結一下:調用 skynet.call 導致 coroutine 被中間執行中斷,等結果到達時(框架從 agent 服務消息隊列取得相應的消息)才會從中斷處繼續執行。流程是這樣的:
1) 第一次。
(raw_dispatch_message 函數) 主線程 -> (ag_client_dispatch 函數) coroutine: skynet.call 調用 coroutine_yield"CALL" 讓出執行 -> (suspend 函數,在 raw_dispatch_message(line 14) 被調用) 主線程: 處理"CALL",suspend 函數返回,raw_dispatch_message 函數返回 -> 消息執行完畢。
2)第二次。
(raw_dispatch_message 函數) 主線程 -> coroutine: skynet.call(line 2) ,skynet.call 函數返回,ag_client_dispatch 函數返回,調用coroutine_yield"EXIT" 讓出執行 -> (suspend 函數,在 raw_dispatch_message(line 6) 被調用) 主線程: 處理"EXIT",suspend 函數返回,raw_dispatch_message 函數返回 -> 消息執行完畢。
總結:大體上 coroutine 的執行流程就是這樣的。我們始終保持一個理念:skynet 為每個服務創建一個 lua_State ,skynet 為每個消息的執行創建一個 coroutine ,阻塞 API 阻塞的是當前 coroutine ,服務本身不會被阻塞,可以繼續處理其他消息。
代碼取自skynet-v1.0.0-alpha,因為代碼以后有可能變動,這里是以 1.0-alpha 為基准分析的。一次性打字描述好多函數調用有可能會描述錯誤(而且打字並沒有那么直觀:)),有錯誤的話,歡迎評論指出,我來修改。