比起slave/harbor集群模式,skynet提供了用的更為廣泛的cluster集群模式,參考官方wiki https://github.com/cloudwu/skynet/wiki/Cluster。cluster模式利用socketchannel庫(http://www.cnblogs.com/RainRill/p/8892648.html) 與其他skynet進程進行交互,每個請求包帶一個唯一的session值,對端回應包附帶session值。cluster集群模式tcp通道是單向的,即skynet進程1(集群中的節點)通過tcp通道向進程2發送請求包,進程2回應包也走這一通道。但是,進程2向進程1發送請求包及進程1的回應包則是另一條tcp通道。
每個集群節點都有一份完整的cluster配置,會啟動一個clusterd的服務,調用loadconfig加載配置。
第11-19行,加載配置文件(也可以手動傳入配置table tmp)
第20-24行,保存節點名-地址映射關系
1 -- service/clusterd.lua 2 skynet.start(function() 3 loadconfig() 4 skynet.dispatch("lua", function(session , source, cmd, ...) 5 local f = assert(command[cmd]) 6 f(source, ...) 7 end) 8 end) 9 10 local function loadconfig(tmp) 11 if tmp == nil then 12 tmp = {} 13 if config_name then 14 local f = assert(io.open(config_name)) 15 local source = f:read "*a" 16 f:close() 17 assert(load(source, "@"..config_name, "t", tmp))() 18 end 19 end 20 for name,address in pairs(tmp) do 21 ... 22 node_address[name] = address 23 ... 24 end 25 end
以skynet進程1的A服務向skynet進程2的B服務發送請求包及回應為例,說明cluster的工作流程:
對於進程2,配置了 db = "127.0.0.1:2528",啟動后調用cluster.open "db"。
第4行,給clusterd服務發送消息。
第12-15行,啟動一個gate服務,然后通知gate服務監聽配置的地址。gate調用socket.listen監聽外部socket連接。
第20行,watchdog就是clusterd服務的地址。
1 -- lualib/skynet/cluster.lua 2 function cluster.open(port) 3 if type(port) == "string" then 4 skynet.call(clusterd, "lua", "listen", port) 5 else 6 skynet.call(clusterd, "lua", "listen", "0.0.0.0", port) 7 end 8 end 9 10 -- service/clusterd.lua 11 function command.listen(source, addr, port) 12 local gate = skynet.newservice("gate") 13 ... 14 skynet.call(gate, "lua", "open", { address = addr, port = port }) 15 skynet.ret(skynet.pack(nil)) 16 end 17 18 -- servcice/gate.lua 19 function handler.open(source, conf) 20 watchdog = conf.watchdog or source 21 end
對於進程1,調用cluster.call(db, "A", ...),給節點名為db(進程2)的A服務發送請求,最終調用到send_request
第9行,請求包帶上唯一的sesssion值
第11行,按cluster定義的模式打包數據
第15行,獲取socketchannel對象,如果第一次請求,會先創建socketchannel對象,並建立tcp連接
第16行,調用socketchannel的request接口發送請求包
1 -- lualib/skynet/cluster.lua 2 function cluster.call(node, address, ...) 3 -- skynet.pack(...) will free by cluster.core.packrequest 4 return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...)) 5 end 6 7 -- service/clusterd.lua 8 local function send_request(source, node, addr, msg, sz) 9 local session = node_session[node] or 1 10 -- msg is a local pointer, cluster.packrequest will free it 11 local request, new_session, padding = cluster.packrequest(addr, session, msg, sz) 12 node_session[node] = new_session 13 14 -- node_channel[node] may yield or throw error 15 local c = node_channel[node] 16 17 return c:request(request, session, padding) 18 end 19 20 function command.req(...) 21 local ok, msg = pcall(send_request, ...) 22 if ok then 23 ... 24 skynet.ret(msg) 25 end 26 end
創建socket對象時提供了response參數(第6行),所以是采用帶session值的請求-回應模式。
第11行,協程阻塞在socket.read上,此時暫停co,等待回應包
1 -- service/clusterd 2 local host, port = string.match(address, "([^:]+):(.*)$") 3 c = sc.channel { 4 host = host, 5 port = tonumber(port), 6 response = read_response, 7 nodelay = true, 8 } 9 10 local function read_response(sock) 11 local sz = socket.header(sock:read(2)) 12 local msg = sock:read(sz) 13 return cluster.unpackresponse(msg) -- session, ok, data, padding 14 end
對於進程2,gate服務收到進程1的tcp連接請求后,
第8行,給clusterd服務發送消息
第17-18行,clusterd收到后,新建一個clusteragent服務。注:clusteragent是skynet最近新加的。參考https://blog.codingnow.com/2018/04/skynet_cluster.html#more
第24-28行,clusteragent服務專門處理進程1的cluster模式的請求。每個cluster節點連接都新建一個cluseteragent服務去處理請求包。
1 -- service/gate.lua 2 function handler.connect(fd, addr) 3 local c = { 4 fd = fd, 5 ip = addr, 6 } 7 connection[fd] = c 8 skynet.send(watchdog, "lua", "socket", "open", fd, addr) 9 end 10 11 -- service/clusterd.lua 12 function command.socket(source, subcmd, fd, msg) 13 if subcmd == "open" then 14 skynet.error(string.format("socket accept from %s", msg)) 15 -- new cluster agent 16 cluster_agent[fd] = false 17 local agent = skynet.newservice("clusteragent", skynet.self(), source, fd) 18 cluster_agent[fd] = agent 19 ... 20 end 21 22 -- service/clusterdagent.lua 23 skynet.start(function() 24 skynet.register_protocol { 25 name = "client", 26 id = skynet.PTYPE_CLIENT, 27 unpack = cluster.unpackrequest, 28 dispatch = dispatch_request, 29 } 30 ... 31 end
當gate服務收到請求包后,轉發給對應的clusteragent服務(第7行),
1 -- service/gate.lua 2 function handler.message(fd, msg, sz) 3 -- recv a package, forward it 4 local c = connection[fd] 5 local agent = c.agent 6 if agent then 7 skynet.redirect(agent, c.client, "client", fd, msg, sz) 8 else 9 skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz)) 10 end 11 end
clusteragent服務消息分發函數dispatch_request,
第7-9行,如果是push請求,不需要回應,send給目的服務(B服務)后直接返回即可
第11行,如果是call請求,需要回應,給目的服務(B服務)發送消息,然后等待B服務處理完返回。
第14-21行,將消息打包成回應包,通過tcp返回給請求端(skynet進程1)。
進程1收到回應后,重啟協程,返回結果給請求服務(A服務)。這就是cluster模式的調用流程。
1 -- service/clusteragent.lua 2 local function dispatch_request(_,_,addr, session, msg, sz, padding, is_push) 3 if cluster.isname(addr) then 4 addr = register_name[addr] 5 end 6 if addr then 7 if is_push then 8 skynet.rawsend(addr, "lua", msg, sz) 9 return -- no response 10 else 11 ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz) 12 end 13 if ok then 14 response = cluster.packresponse(session, true, msg, sz) 15 if type(response) == "table" then 16 for _, v in ipairs(response) do 17 socket.lwrite(fd, v) 18 end 19 else 20 socket.write(fd, response) 21 end 22 ... 23 end