skynet源碼分析之snax


snax是一個方便實現skynet服務的簡單框架,對服務的接口(比如skynet.call, skynet.send等)做了進一步的封裝,編寫snax服務比較容易,詳情參考官方wiki https://github.com/cloudwu/skynet/wiki/Snax

下面是一個簡單的snax服務代碼,並不是獨立的Lua程序。包含了一組Lua函數,會被snax框架加載分析。一般至少包含4組函數:init,服務的初始接口;exit,服務的退出接口;response前綴,響應其他服務請求並給出回應的方法;accept前綴,響應其他服務請求不需要回應的方法。

    -- print("xxx")
1
function response.f1(msg) 2 return a .. msg 3 end 4 5 function response.f2(msg) 6 return a .. msg 7 end 8 9 function accept.g1(...) 10 print(...) 11 end 12 13 function accept.g2(...) 14 print(...) 15 end 16 17 function init() 18 print("test start!!!") 19 end 20 21 function exit() 22 print("test exit!!!") 23 end

1. snax框架如何分析Lua代碼

接下來重點分析snax框架如何加載分析Lua代碼:

第10行,返回一個閉包,需三個參數:name,snax服務的名字;G,全局環境,默認是空表{};loader,加載Lua文件的接口,不提供則用默認的。

第4-18行,默認的加載器,通過Lua文件名匹配到路徑,然后調用loadfile加載Lua文件。注:這時不能用print,math系統api,因為還未設置package.path路徑。

第24-40行,注冊response組和accept組的接口信息,對參數做相關檢查,比如不能出現相同的接口名字

第55-62行,注冊system組的接口信息

第64-65行,注冊response組合accept組接口信息

第78-81行,用指定loader加載器加載Lua代碼

第83-85行,分析(運行)Lua代碼,設置了全局環境G的__index=env,__newindex=init_system。以上面代碼為例,分析流程:

            response.f1 -> env.reponse -> func_id -> count,把接口信息保存在func里(第37行)

            response.f2, accept.g1, accept.g2流程同上

            init -> init_system,把函數原型保存在接口信息第4個索引位置(第72行),前3個位置是{1, "system", "init"}

            exit -> init_system,把函數原型保存在接口信息第4個索引位置(第72行),前3個位置是{1, "system", "exit"}

 1 -- lualib/snax/interface.lua
 2 local skynet = require "skynet"
 3 
 4 local function dft_loader(path, name, G)
 5     local errlist = {}
 6 
 7     for pat in string.gmatch(path,"[^;]+") do
 8         local filename = string.gsub(pat, "?", name)
 9         local f , err = loadfile(filename, "bt", G)
10         if f then
11             return f, pat
12         else
13             table.insert(errlist, err)
14         end
15     end
16 
17     error(table.concat(errlist, "\n"))
18 end
19 
20 return function (name , G, loader)
21     loader = loader or dft_loader
22     local mainfunc
23 
24     local function func_id(id, group)
25         local tmp = {}
26         local function count( _, name, func)
27             if type(name) ~= "string" then
28                 error (string.format("%s method only support string", group))
29             end
30             if type(func) ~= "function" then
31                 error (string.format("%s.%s must be function"), group, name)
32             end
33             if tmp[name] then
34                 error (string.format("%s.%s duplicate definition", group, name))
35             end
36             tmp[name] = true
37             table.insert(id, { #id + 1, group, name, func} )
38         end
39         return setmetatable({}, { __newindex = count })
40     end
41 
42     do
43         assert(getmetatable(G) == nil)
44         assert(G.init == nil)
45         assert(G.exit == nil)
46 
47         assert(G.accept == nil)
48         assert(G.response == nil)
49     end
50 
51     local temp_global = {}
52     local env = setmetatable({} , { __index = temp_global })
53     local func = {}
54 
55     local system = { "init", "exit", "hotfix", "profile"}
56 
57     do
58         for k, v in ipairs(system) do
59             system[v] = k
60             func[k] = { k , "system", v }
61         end
62     end
63 
64     env.accept = func_id(func, "accept")
65     env.response = func_id(func, "response")
66     local function init_system(t, name, f)
67         local index = system[name]
68         if index then
69             if type(f) ~= "function" then
70                 error (string.format("%s must be a function", name))
71             end
72             func[index][4] = f
73        else
74             temp_global[name] = f
75        end
76    end
77 
78    local pattern
79 
80    local path = assert(skynet.getenv "snax" , "please set snax in config file")
81    mainfunc, pattern = loader(path, name, G)
82 
83    setmetatable(G, { __index = env , __newindex = init_system })
84    local ok, err = xpcall(mainfunc, debug.traceback)
85    setmetatable(G, nil)
86    assert(ok,err)
87 
88    for k,v in pairs(temp_global) do
89        G[k] = v
90    end
91 
92    return func, pattern
93 end

第92行,返回分析后的接口信息func。即:

{{1, "system", "init", f}, {2, "system", "exit", f}, {3, "system", "hotfix", f}, {4, "system", "profile", f}, {5, "response", "f1", f}, {6, "response", "f2", f}, {7, "accept", "g1", f}, {8, "accept", "g2", f},}

snax.interface api是將snax_interface返回的一維表轉化成key-value形式。即:

{
name = "test",
system = {init=1, exit=2, hotfix=3, profile=4},
reponse = {f1=5, f2=6},
accept = {g1=7, g2=8},
}

 1 -- lualib/skynet/snax.lua
 2  function snax.interface(name)
 3      if typeclass[name] then
 4          return typeclass[name]
 5      end
 6  
 7      local si = snax_interface(name, G)
 8  
 9      local ret = {
10          name = name,
11          accept = {},
12          response = {},
13          system = {},
14      }
15  
16      for _,v in ipairs(si) do
17          local id, group, name, f = table.unpack(v)
18          ret[group][name] = id
19      end
20  
21      typeclass[name] = ret
22      return ret
23  end

 2. snaxd服務

介紹完snax框架如何分析Lua代碼,接下來介紹snax服務的工作流程:

第2行,通過snax.newservice啟動一個snaxd服務,服務名為name

第8行,分析加載服務名對應的Lua文件

第9行,啟動名稱為"snaxd"的snlua服務,參數是name

第12行,給該服務發送"snax"類型消息,第一個參數是t.system.init(即id是1)

 1 -- lualib/skynet/snax.lua
 2 function snax.newservice(name, ...)
 3     local handle = snax.rawnewservice(name, ...)
 4     return snax.bind(handle, name)
 5 end
 6 
 7 function snax.rawnewservice(name, ...)
 8     local t = snax.interface(name)
 9     local handle = skynet.newservice("snaxd", name)
10     assert(handle_cache[handle] == nil)
11     if t.system.init then
12         skynet.call(handle, "snax", t.system.init, ...)
13     end
14     return handle
15 end

啟動snaxd服務過程,

第2-10行,分析snax服務的Lua代碼,設置package.path路徑以及SERVICE_NAME,SERVICE_PATH等

第47行,設置"snax"類型的消息分發函數dispatcher。調用snax.newservice會收到"snax"消息,第一個參數id是1(t.system.init),在system組里:

         第15行,通過id獲取接口信息,此時method={1, "system", "init", f},然后執行24行分支

         第24-32行,執行snax服務的Lua文件的init函數,即snax服務的初始化邏輯放到init函數里

         第34-40行,同理,snax服務退出時,通常把退出邏輯放到exit函數里。除了init、exit,system組還包含hotfix,profile,都有相應的處理。

 1 -- service/snaxd.lua
 2 local snax_name = tostring(...)
 3 local loaderpath = skynet.getenv"snax_loader"
 4 local loader = loaderpath and assert(dofile(loaderpath))
 5 local func, pattern = snax_interface(snax_name, _ENV, loader)
 6 local snax_path = pattern:sub(1,pattern:find("?", 1, true)-1) .. snax_name ..  "/"
 7 package.path = snax_path .. "?.lua;" .. package.path
 8 
 9 SERVICE_NAME = snax_name
10 SERVICE_PATH = snax_path
11 
12 skynet.start(function()
13     local init = false
14     local function dispatcher( session , source , id, ...)
15         local method = func[id]
16 
17         if method[2] == "system" then
18             local command = method[3]
19             if command == "hotfix" then
20                 local hotfix = require "snax.hotfix"
21                skynet.ret(skynet.pack(hotfix(func, ...)))
22            elseif command == "profile" then
23                 skynet.ret(skynet.pack(profile_table))
24             elseif command == "init" then
25                 assert(not init, "Already init")
26                 local initfunc = method[4] or function() end
27                 initfunc(...)
28                 skynet.ret()
29                 skynet.info_func(function()
30                     return profile_table
31                 end)
32                 init = true
33             else
34                 assert(init, "Never init")
35                 assert(command == "exit")
36                 local exitfunc = method[4] or function() end
37                 exitfunc(...)
38                 skynet.ret()
39                 init = false
40                 skynet.exit()
41             end
42         else
43             assert(init, "Init first")
44             timing(method, ...)
45         end
46     end
47     skynet.dispatch("snax", dispatcher)
48 
49     -- set lua dispatcher
50     function snax.enablecluster()
51         skynet.dispatch("lua", dispatcher)
52     end
53 end)

3. 給snaxd服務發送消息

snax.newservice啟動服務后,通過snax.bind綁定后返回一個對象,對象里包含接口信息,供使用者調用:

第8-12行,如果已經綁定過,從緩存里獲取即可

第13-14行,獲取Lua代碼接口信息,然后通過wrapper封裝

第19-26行,返回一個table,包含4個域:handle,服務地址;type,服務名稱;post,不需要返回的請求;req,需要返回的請求。

當我們用經典的test.req.f1()給test服務發送消息,流程是:調用req域 -> gen_req接口(第28行) -> 獲取接口編號id(第31行) -> 給handle服務(snaxd)發送消息(第36行)

 1 -- lualib/skynet/snax.lua
 2 function snax.newservice(name, ...)
 3     local handle = snax.rawnewservice(name, ...)
 4     return snax.bind(handle, name)
 5 end
 6 
 7 function snax.bind(handle, type)
 8     local ret = handle_cache[handle]
 9     if ret then
10         assert(ret.type == type)
11         return ret
12     end
13     local t = snax.interface(type)
14     ret = wrapper(handle, type, t)
15     handle_cache[handle] = ret
16     return ret
17 end
18 
19 local function wrapper(handle, name, type)
20     return setmetatable ({
21         post = gen_post(type, handle),
22         req = gen_req(type, handle),
23         type = name,
24         handle = handle,
25     }, meta)
26 end
27 
28 local function gen_req(type, handle)
29     return setmetatable({} , {
30         __index = function( t, k )
31             local id = type.response[k]
32             if not id then
33                 error(string.format("request %s:%s no exist", type.name, k))
34             end
35             return function(...)
36                 return skynet_call(handle, "snax", id, ...)
37             end
38     end })
39 end

snaxd服務收到消息后,因為是response/accept組的信息,最終調用timing(method, ...)。如果是accept類型,直接執行對應的函數即可(第4行);如果是response類型,執行對應的函數並返回(第17行)。至此,完成跟snax服務一次交互。

 1  -- service/snaxd.lua
 2  local function timing( method, ... )
 3      local err, msg
 4      profile.start()
 5      if method[2] == "accept" then
 6          -- no return
 7          err,msg = xpcall(method[4], traceback, ...)
 8      else
 9          err,msg = xpcall(return_f, traceback, method[4], ...)
10      end
11      local ti = profile.stop()
12      update_stat(method[3], ti)
13      assert(err,msg)
14  end
15 
16  local function return_f(f, ...)
17      return skynet.ret(skynet.pack(f(...)))
18  end

 4. snax服務的熱更新

snax服務支持熱更新(只能熱更snax類型的服務)。snax框架約定了Lua文件的格式,所以可獲取Lua代碼里所有閉包信息,通過修改這些閉包狀態達到熱更的目的。調用snax.hotfix進行熱更新(可在debug_console加一個hotfix命令調用snax.hotfix接口),最終調用到inject接口(第9行)。

 1 -- lualib/skynet/snax.lua
 2 function snax.hotfix(obj, source, ...)
 3     local t = snax.interface(obj.type)
 4     return test_result(skynet_call(obj.handle, "snax", t.system.hotfix, source, ...))
 5 end
 6 
 7 -- lualib/snax/hotfix.lua
 8 return function (funcs, source, ...)
 9     return pcall(inject, funcs, source, ...)
10 end

inject接口的參數:funcs原Lua文件的接口信息,source要熱更的Lua代碼塊,以及可變參數。工作流程是:

第2行,通過snax_interface接口分析加載要熱更的Lua代碼,保存在patch變量中。

第3行,通過funcs獲取原有Lua代碼所有閉包。

第5-10行,依次處理patch里的每個閉包,調用_patch:

           第25-41行,獲取閉包每個上值的名字和值,如果熱更的Lua代碼里沒提供值,則表示這個上值不需要熱更,復用原來的值即可,即調用debug.upvaluejoin將現有閉包引用原有的上值

           第21行,更新完一個閉包的所有上值后,覆蓋原來的接口信息。

第12-15行,運行熱更的Lua代碼里的hotfix接口,在這個接口里可以查看和修改服務的狀態(內部local變量的值)。示例參看官方wiki。

 1 local function inject(funcs, source, ...)
 2     local patch = si("patch", dummy_env, loader(source))
 3     local global = collect_all_uv(funcs)
 4 
 5     for _, v in pairs(patch) do
 6         local _, group, name, f = table.unpack(v)
 7         if f then
 8             patch_func(funcs, global, group, name, f)
 9         end
10     end
11 
12     local hf = find_func(patch, "system", "hotfix")
13     if hf and hf[4] then
14         return hf[4](...)
15     end
16 end
17 
18 local function patch_func(funcs, global, group, name, f)
19     local desc = assert(find_func(funcs, group, name) , string.format("Patch mismatch %s.%s", group, name))
20     _patch(global, f)
21     desc[4] = f
22 end
23 
24 local function _patch(global, f)
25     local i = 1
26     while true do
27         local name, value = debug.getupvalue(f, i)
28         if name == nil then
29             break
30         elseif value == nil or value == dummy_env then
31             local old_uv = global[name]
32             if old_uv then
33                 debug.upvaluejoin(f, i, old_uv.func, old_uv.index)
34             end
35         else
36            if type(value) == "function" then
37                _patch(global, value)
38            end
39         end
40         i = i + 1
41     end
42 end

 總結:snax是一個簡單實用的框架,對比用原生的skynet接口,snax編寫出的Lua代碼更加直觀易懂,且熱更新也比較方便。但有幾個易錯點:

  1. snax.queryservice會發送rpc請求,是個阻塞調用。

  2. 編寫的Lua服務在loadfile過程中不能調用print,math等系統接口,因為還未設置package.path路徑。當然,也可以自己設置加載器支持指定的系統接口。

  3.  snax熱更不能增加新的遠程response/accept方法。已經新增加的方法不能被snaxd服務識別。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM