skynet coroutine 運行筆記


閱讀雲大的博客以及網上關於 skynet 的文章,總是會談服務與消息。不怎么看得懂代碼,光讀這些文字真的很空洞,不明白說啥。網絡的力量是偉大的,相信總能找到一些解決自己疑惑的文章。然后找到了這篇講解 skynet 消息隊列的文章(最新的 skynet 消息隊列代碼已經有更新,變得更簡潔易讀)。了解了 skynet 消息是如何派發的,就想知道消息被派發出去到一個服務后,如何調用服務的 callback 函數,從而處理此消息。碰巧博主寫了這篇講解 skynet 如何注冊回調函數的文章,於是 skynet 的概念“服務與消息”便在代碼中得到了定位,便可以此為入口點探究 skynet 實現。

消息派發

這里雲大已經很詳細的介紹了,我就僅僅在這里略提一下。skynet 把消息分為不同的類別,不同類別的消息有不同的編碼方式,若編寫一個服務,你需要為此服務關注的消息類型注冊 dispatch 函數用來接收此類別的消息。skynet 注冊類別消息的 dispatch 函數有兩種方式。

  • 調用 skynet.register_protocol 注冊。函數的參數是一個 table ,以"lua"類消息為例,里面有若干字段含義如下:
    {
         name = "lua", -- 消息組的字符串名稱
         id = skynet.PTYPE_LUA, -- 消息組的數字 id
         pack = skynet.pack, -- 打包消息
         unpack = skynet.unpack, -- 解包消息
         dispatch = function(session, source, cmd, ...) ... end -- 消息回調/分發函數
    }

    指定了 table 中的 dispatch 字段,以后"lua"類消息到達時便會調用此函數。

  • 調用 skynet.dispatch 函數注冊。為此,雲大給出了一個慣用寫法,以"lua"類消息為例,如下:
    local CMD = {}
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        f(...)
    end)

兩種方式可以根據喜好選擇,畢竟一個服務可能需要處理多種類型的消息,需要注冊多個 dispatch 函數。

在 skynet 中用 Lua 編寫一個服務必須調用 skynet.start 啟動函數啟動此服務。

function skynet.start(start_func)
    c.callback(dispatch_message)
    skynet.timeout(0, function()
        init_service(start_func)
    end)
end

skynet.start 其中在一個作用是調用 c.callback 函數把 skynet 框架的消息派發與你自定義的 dispatch 函數聯系起來,這個聯系的紐帶就是 dispatch_message(skynet.lua) 函數。當服務的消息隊列有消息到達時,框架從消息隊列中取出消息經過一些轉換調用到 dispatch_message 函數,然后 dispatch_message 函數根據協議類型調用相應的 dispatch 函數,最終到具體某條消息的處理函數。

消息執行

skynet 是基於服務的,服務間通過消息進行通信。實現方面 skynet 為每個服務創建一個 lua_State ,不同的服務 lua_State 是不同的,因此服務是相互獨立互不影響的。對於消息,"skynet 的 lua 層會為每個請求創建一個獨立的 coroutine"。經過上面一節,了解到消息會到達我們自定義的 dispatch 函數,此時進入了業務相關的代碼邏輯中,我們只關注業務的邏輯而不關注底層消息如何到達這兒的。於是猜測應該是在 dispatch_message 函數中 skynet 會創建 coroutine 來具體處理某個消息。然后,我們猜想消息執行流程大概應該是這樣的:

  • 一條消息到達,服務的主線程創建 coroutine 處理此消息,處理完后執行權回到主線程,繼續下一條消息處理。
  • 一條消息到達,服務的主線程創建 coroutine 處理此消息,假設此服務是 A ,此時創建的 coroutine 是 coA。A 向另一個服務 B 發送一條消息並等待 B 的返回結果,A 才繼續執行。這時最好的方式是對 coA 做出標記讓出執行,主線程繼續處理其他消息,並根據標記判斷接收的消息是不是派發到 coA 的,若是則再喚醒 coA 繼續執行。

對於單個服務來說,弄清楚一條消息執行流程是這篇筆記的主要內容。

此外由於每條消息都運行在一個 coroutine 中,雲大根據反饋對 coroutine 進行了回收再利用以此提升效率。

skynet 接口有非阻塞 API (如 skynet.ret)也有阻塞 API (如 skynet.call)。阻塞 API 也僅僅是阻塞調用此 API 的 coroutine ,服務本身並沒有阻塞。這兩個 API 剛好與上面猜測的消息執行流程相呼應,接下來以這兩個 API 為例子來說明。順便提一點,調用阻塞 API 時要防止一些問題

繞來繞去的 coroutine

 上面提到 dispatch_message 會創建 coroutine 把消息派發到我們的自定義 dispatch 函數中。實際上完成任務是在函數 raw_dispatch_message 函數中。下面是簡化版的函數實現:

 1 local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
 2     -- skynet.PTYPE_RESPONSE = 1, read skynet.h
 3     if prototype == 1 then -- “response” 類型消息,skynet 已自動處理
 4         local co = session_id_coroutine[session]
 5         session_id_coroutine[session] = nil
 6         suspend(co, coroutine.resume(co, true, msg, sz))
 7     else -- 其他類型消息派發到相應的 dispatch 函數
 8         local p = assert(proto[prototype], prototype)
 9         local f = p.dispatch -- 我們自定義的 dispatch 函數
10         if f then
11             local co = co_create(f) -- 創建 coroutine
12             session_coroutine_id[co] = session
13             session_coroutine_address[co] = source
14             suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))
15         end
16     end
17 end 

下面以 skynet 自帶的例子 agent.lua 和 simpledb.lua 為例來進行說明,以 agent 服務 和 simpledb 服務分別指代這兩個服務。agent 服務通過"client"類型協議處理客戶端發送過來的請求,然后 agent 服務和 simpledb 服務通信獲得結果,最后把結果發送到客戶端。simpledb 服務最簡單,接收消息計算結果並返回結果。

先以 simpledb 服務為例進行說明。

 1 local skynet = require "skynet"
 2 local db = {}
 3 
 4 local command = {}
 5 
 6 function command.GET(key)
 7     return db[key]
 8 end
 9 
10 function command.SET(key, value)
11     local last = db[key]
12     db[key] = value
13     return last
14 end
15 
16 skynet.start(function()
17     skynet.dispatch("lua", function(session, address, cmd, ...)
18         local f = command[string.upper(cmd)]
19         if f then
20             skynet.ret(skynet.pack(f(...)))
21         else
22             error(string.format("Unknown command %s", tostring(cmd)))
23         end
24     end)
25     skynet.register "SIMPLEDB" -- 注冊名稱,其他服務可以直接向此名稱發送協議
26 end)

simpledb(line 17) 調用 skynet.dispatch 注冊"lua"類型消息的 dispatch 函數,假設這個匿名函數叫 db_dispatch 。

假設 simpledb 接收到 agent 發送過來的"SET"消息。框架從 simpledb 消息隊列中取出消息,經過一些調用代碼執行到 raw_dispatch_message 函數。在 raw_dispatch_message(line 3) 進行 if 條件判斷,這條"SET"消息的消息類型是"lua",因此 prototype 是 10 ,代碼這時執行到 else 分支,目的是為了創建 coroutine 調用 db_dispatch 函數。代碼走到 raw_dispatch_message(line 11) 調用 co_create 函數,在能回收 coroutine 的情況下創建一個 coroutine ,讓我們看看 co_create 實現。

 1 local coroutine_pool = {} -- 存放 coroutine 對象的數組
 2 local coroutine_yield = coroutine.yield -- 讓出函數
 3 
 4 local function co_create(f)
 5     local co = table.remove(coroutine_pool) -- 先從數組中取出 coroutine ,從數組中刪除是禁止此 coroutine 被其他消息使用
 6     if co == nil then
 7         co = coroutine.create(function(...)
 8             f(...) -- 執行我們傳入的函數
 9             while true do
10             -- 執行完后回收 coroutine 
11             f = nil
12             coroutine_pool[#coroutine_pool+1] = co
13             -- 讓出執行,通知 main_thread 做些清理工作
14             -- coroutine 被喚醒后,代碼會從下面的調用中返回並賦值 f 為我們需要執行的函數,然后繼續執行
15             f = coroutine_yield "EXIT" 
16             f(coroutine_yield()) -- 這里再次調用讓出函數,是為了接收參數傳遞給 f 
17             end
18         end)
19     else
20         coroutine.resume(co, f) -- 喚醒一個 coroutine ,並傳入參數 f ,f 是我們想要執行的函數
21     end
22     return co
23 end            

從使用的理念上,調用函數創建一個 coroutine 對象后,再調用 resume 函數,coroutine 便會執行,調用者無需關注這個 coroutine 是新創建的還是回收利用之前已經創建的。代碼繼續執行,走到 raw_dispatch_message(line 14) ,正如預想的那樣,代碼先調用 coroutine.resume 啟動 coroutine ,於是  dispatch 函數變得以執行。由於 coroutine 是回收利用的,實際在 raw_dispatch_message(line 14) 調用 coroutine.resume 時,coroutine 是分兩種情況執行的,讓我們回到 co_create 函數實現。

  • 當調用 co_create ,coroutine_pool 沒有 coroutine 時(此時有可能是服務剛啟動數組中還沒有 coroutine ,也有可能創建的 coroutine 已經被用完了)此時會走到 co_create(line 7) ,創建一個新的 coroutine 。然后調用 coroutine.resume 時,co_create(line 8) 的代碼會被執行,函數執行完后就要回收這個新創建的 co ,然后調用 coroutine_yield "EXIT" 讓出執行,此時 raw_dispatch_message(line 14) 調用的 coroutine.resume 函數返回,代碼回到主線程,調用 suspend 函數處理"EXIT"命令,suspend 函數執行完后,raw_dispatch_message 函數也執行完畢,本次消息也就執行完畢。
  • 當調用 co_create ,coroutine_pool 中有剩余的 coroutine 時,此時便會利用這個 coroutine 。代碼執行到 co_create(line 20) ,這里調用 coroutine.resume 喚醒這個之前已經讓出執行的 coroutine ,然后在 co_create(line 15) 對 coroutine_yield 的調用會返回,並賦值 f ,這樣做的目的是為了傳入我們要執行的函數 f 。然后執行到 line 16 再次調用 coroutine_yield ,這次目的是為了接收函數參數。最后在 raw_dispatch_message(line 14) 調用 coroutine.resume 時,coroutine 第二次被喚醒,在 co_create(line 16) coroutine_yield 會返回並返回 resume 傳入的參數,這樣我們想要執行的函數便得到執行。執行后這是一個 while 死循環,代碼走到 co_create(11) 開始回收這個 coroutine ,然后調用 coroutine_yield "EXIT" 讓出執行(接下來的執行同上),消息執行完畢。

分析了 co_create 函數,讓我們回到正題。此時是 simpledb 服務,代碼執行 raw_dispatch_message(line 14) ,coroutine 被執行,db_dispatch 函數被調用,此時代碼走到 simpledb(line 18) 然后 command.SET 函數被調用,緊接着調用 skynet.ret 返回結果。skynet.ret 實現如下:

1 function skynet.ret(msg, sz)
2     msg = msg or ""
3     return coroutine_yield("RETURN", msg, sz)
4 end

 

在 skynet.ret 函數中會調用  coroutine_yield ,此時 coroutine 會讓出執行,執行權回到主線程 main_thread 。不要暈,千萬不要暈:)現在代碼再次回到 raw_dispatch_message(line 14) ,此時 coroutine.resume 函數返回並返回了 4 個參數:true, "RETURN", msg, sz ,其中 msg, sz 是要發送回去的消息。接着便調用 suspend 函數處理"RETURN"命令。下面看一下簡化版的 suspend 代碼。

 1 function suspend(co, result, command, param, size)
 2     if command == "CALL" then
 3         session_id_coroutine[param] = co -- 記錄下此 coroutine ,接收到"response"消息時獲取
 4     elseif command == "RETURN" then
 5         local co_session = session_coroutine_id[co]
 6         local co_address = session_coroutine_address[co]
 7         ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size)
 8         return suspend(co, coroutine.resume(co, ret))
 9     elseif command == "EXIT" then
10         -- coroutine exit
11         local address = session_coroutine_address[co]
12         release_watching(address)
13         session_coroutine_id[co] = nil
14         session_coroutine_address[co] = nil
15         session_response[co] = nil
16     end
17 end

 

 再次強調一下,此時代碼走到 suspend ,這是在主線程執行的,然后處理"RETURN"命令,發送消息到 agent 服務。這里發現原來調用 skynet.ret 返回消息時實際的消息發送是在主線程執行的。緊接着代碼走到 suspend(line 8) ,再次調用 coroutine.resume ,此時執行權回到 coroutine ,回到 skynet.ret 函數中,在 skynet.ret(line 3) coroutine_yield 返回后,skynet.ret 函數也已經返回,執行權還是在 coroutine ,代碼此時走到 simpledb(line 20) skynet.ret 的返回,db_dispatch 函數也已經執行完並返回,此時 simpledb 已經對"SET"消息處理完畢,這時就相當於 co_create 中的 f 函數執行完畢,下面就是 coroutine 的回收,參考 co_create 說明。OK ,到了這里 simpledb 處理"SET"消息,我們已經分析完畢,看起來很繞,其實也蠻清晰的。我們來總結一下 simpledb 處理"SET"消息在主線程和 coroutine 經歷了哪些切換(忽略 co_create 利用回收的 cocoutine 時做的切換):

(raw_dispatch_message 函數) 主線程 -> (db_dispatch 函數)  coroutine: skynet.ret 調用 coroutine_yield"RETURN" 讓出執行 -> (suspend 函數,在 raw_dispatch_message(line 14) 被調用) 主線程: 處理"RETURN",並再次 resume -> coroutine: skynet.ret 返回,db_dispatch 函數返回,調用coroutine_yield"EXIT" 讓出執行 -> (suspend 函數,在suspend(line 8) 被調用) 主線程: 處理"EXIT",suspend 函數返回,raw_dispatch_message 函數返回 -> 消息執行完畢。

 

以 agent 服務為例說明 skynet.call 調用。

上面解釋了 simpledb 處理"SET"消息的流程,這條消息實際上是 agent 服務發送過去的,agent 也是接收到"client"類型的"set"。agent 簡化版代碼如下:

 1 local skynet = require "skynet"
 2 local netpack = require "netpack"
 3 local socket = require "socket"
 4 local sproto = require "sproto"
 5 local sprotoloader = require "sprotoloader"
 6 
 7 local host
 8 local send_request
 9 
10 local CMD = {}
11 local REQUEST = {}
12 local client_fd
13 
14 function REQUEST:set()
15     print("set", self.what, self.value)
16     local r = skynet.call("SIMPLEDB", "lua", "set", self.what, self.value)
17 end
18 
19 local function request(name, args, response)
20     local f = assert(REQUEST[name])
21     local r = f(args)
22     if response then
23         return response(r)
24     end
25 end
26 
27 local function send_package(pack)
28     local package = string.pack(">s2", pack)
29     socket.write(client_fd, package)
30 end
31 
32 skynet.register_protocol {
33     name = "client",
34     id = skynet.PTYPE_CLIENT,
35     unpack = function (msg, sz)
36         return host:dispatch(msg, sz)
37     end,
38     dispatch = function (_, _, type, ...) -- "client" 類型消息 dispatch 函數
39         if type == "REQUEST" then
40             local ok, result  = pcall(request, ...)
41             if ok then
42                 if result then
43                     send_package(result)
44                 end
45             else
46                 skynet.error(result)
47             end
48         else
49             assert(type == "RESPONSE")
50             error "This example doesn't support request client"
51         end
52     end
53 }
54 
55 skynet.start(function()
56     skynet.dispatch("lua", function(_,_, command, ...)
57         local f = CMD[command]
58         skynet.ret(skynet.pack(f(...)))
59     end)
60 end)

 

 看見代碼發現 agent 服務處理兩種類型的消息:"lua"和"client"。這里我們關注的是"client"消息,"client"消息的 dispatch 函數是調用 skynet.register_protocol 設置的,賦值給 dispatch 一個匿名函數,假設這個匿名函數叫 ag_client_dispatch 。當接收到客戶端發送來的"set"消息(這里先不管那些不懂的函數,我們此時只關注執行流程),便會調用 REQUEST:set 函數,然后調用 skynet.call 向 simpledb 發送"set"消息,閱讀 skynet 文檔說 skynet.call 是阻塞的(阻塞調用 skynet.call 的 coroutine),我們來看一下是如何阻塞的。先看一下簡化的 skynet.call 代碼:

 1 local function yield_call(service, session)
 2     local succ, msg, sz = coroutine_yield("CALL", session)
 3     return msg,sz
 4 end
 5 
 6 function skynet.call(addr, typename, ...)
 7     local p = proto[typename]
 8     local session = c.send(addr, p.id , nil , p.pack(...))
 9     if session == nil then
10         error("call to invalid address " .. skynet.address(addr))
11     end
12     return p.unpack(yield_call(addr, session))
13 end

 

閱讀發現 skynet.call 和 skynet.ret 有一些相似,不同的是 skynet.call 調用 coroutine_yield 傳入的是"CALL",然后執行權回到主線程 suspend 函數,閱讀 suspend 函數(千萬別暈)代碼發現此時僅僅是記錄了 coroutine ,然后就返回了。 神馬?神馬?神馬?suspend 函數沒有做其他的事情就返回了,我們的 agent 服務對"set"消息的處理追蹤定格在了 skynet.call(line 2) 行,當前這個 coroutine 未被回收,而是被標記了,然后本次 agent 對"set"消息的處理也就完畢了。

當 simpledb 接收到"set"消息並處理完,然后調用 skynet.ret 返回結果時,閱讀 suspend(line 7) 此時給 agent 服務發送了一個類型為 1 的"lua"類型的消息。之后 agent 服務接收到此消息時,agent 服務主線程執行到函數 raw_dispatch_message ,由於 prototype 為 1 ,此時走到了 raw_dispatch_message(line 6) ,找到了上次標記的 coroutine ,並調用 resume 喚醒這個 coroutine 並傳入了接收到的 msg 和 sz(這實際是 simpledb 服務發送來的),接着代碼執行權來到 coroutine ,來到 skynet.call(line 2) ,coroutine_yield 函數返回並返回了接收到的消息。然后 skynet.call 函數執行完畢,執行權依舊是在 coroutine 中,然后回到 agent(line 16) ,接着繼續執行,ag_client_dispatch 執行完畢,然后進行 coroutine 的回收,調用 coroutine_yield"EXIT" ,coroutine 也就執行完畢,執行權回到主線程,raw_dispatch_message(line 6) 行繼續調用suspend 並傳入"EXIT"命令,suspend 執行完后,raw_dispatch_message 也就執行完畢了,agent 對"set"消息的處理也終於結束了。總結一下:調用 skynet.call 導致 coroutine 被中間執行中斷,等結果到達時(框架從 agent 服務消息隊列取得相應的消息)才會從中斷處繼續執行。流程是這樣的:

1) 第一次。

(raw_dispatch_message 函數) 主線程 -> (ag_client_dispatch 函數)  coroutine: skynet.call 調用 coroutine_yield"CALL" 讓出執行 -> (suspend 函數,在 raw_dispatch_message(line 14) 被調用) 主線程: 處理"CALL",suspend 函數返回,raw_dispatch_message 函數返回 -> 消息執行完畢。

2)第二次。

(raw_dispatch_message 函數) 主線程 -> coroutine: skynet.call(line 2) ,skynet.call 函數返回,ag_client_dispatch 函數返回,調用coroutine_yield"EXIT" 讓出執行 -> (suspend 函數,在 raw_dispatch_message(line 6) 被調用) 主線程: 處理"EXIT",suspend 函數返回,raw_dispatch_message 函數返回 -> 消息執行完畢。

 

總結:大體上 coroutine 的執行流程就是這樣的。我們始終保持一個理念:skynet 為每個服務創建一個 lua_State ,skynet 為每個消息的執行創建一個 coroutine ,阻塞 API 阻塞的是當前 coroutine ,服務本身不會被阻塞,可以繼續處理其他消息。

代碼取自skynet-v1.0.0-alpha,因為代碼以后有可能變動,這里是以 1.0-alpha 為基准分析的。一次性打字描述好多函數調用有可能會描述錯誤(而且打字並沒有那么直觀:)),有錯誤的話,歡迎評論指出,我來修改。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM