繼上一篇介紹了skynet的網絡部分之后,這一篇以網關gate.lua為例,簡單分析下其串接和處理流程。
在官方給出的范例中,是以examples/main.lua作為啟動腳本的,在此過程中會創建watchdog服務:
1 local watchdog = skynet.newservice("watchdog") 2 skynet.call(watchdog, "lua", "start", { 3 port = 8888, 4 maxclient = max_client, 5 nodelay = true, 6 })
首先加載watchdog.lua腳本。而在watchdog.lua的加載過程中,創建了gate服務。加載gate.lua過程中,調用gateserver.start(gate),gateserver會向skynet注冊socket協議的處理:
1 skynet.register_protocol { 2 name = "socket", 3 id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6 4 unpack = function ( msg, sz ) 5 return netpack.filter( queue, msg, sz) 6 end, 7 dispatch = function (_, _, q, type, ...) 8 queue = q 9 if type then 10 MSG[type](...) 11 end 12 end 13 }
另外gateserver也會注冊lua協議的處理,這里就不展開了。gateserver中會攔截skynet_socket_message;也會攔截部分lua消息(一般是由watchdog轉發而來),並調用gate注冊進來的回調。注意gateserver才是skynet消息的入口,gate只不過是個回調而已。至此,gate服務加載完畢。
watchdog服務加載完畢后,main.lua中接着調用watchdog的start方法,其參數分別指定了偵聽的端口、最大客戶端連接數、是否延遲等。看下watchdog的start方法:
1 function CMD.start(conf) 2 skynet.call(gate, "lua", "open" , conf) 3 end
其緊接着調用gate的open方法,而這個方法在gateserver中被攔截了:
1 function CMD.open( source, conf ) 2 assert(not socket) 3 local address = conf.address or "0.0.0.0" 4 local port = assert(conf.port) 5 maxclient = conf.maxclient or 1024 6 nodelay = conf.nodelay 7 skynet.error(string.format("Listen on %s:%d", address, port)) 8 socket = socketdriver.listen(address, port) 9 socketdriver.start(socket) 10 if handler.open then 11 return handler.open(source, conf) 12 end 13 end
可以看到,在open方法中創建了socket並開始了偵聽過程。回憶上篇,socket操作的LuaAPI作為socketdriver被實現在lua-socket.c文件中,看一眼這里的listen是如何交互的:
1 static int 2 llisten(lua_State *L) { 3 const char * host = luaL_checkstring(L,1); 4 int port = luaL_checkinteger(L,2); 5 int backlog = luaL_optinteger(L,3,BACKLOG); 6 struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1)); 7 int id = skynet_socket_listen(ctx, host,port,backlog); 8 if (id < 0) { 9 return luaL_error(L, "Listen error"); 10 } 11 12 lua_pushinteger(L,id); 13 return 1; 14 }
析取參數,獲取關聯的skynet-context之后,調用skynet_socket.c的skynet_socket_listen:
1 int 2 skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog) { 3 uint32_t source = skynet_context_handle(ctx); 4 return socket_server_listen(SOCKET_SERVER, source, host, port, backlog); 5 }
拿到context-handle,這個handle在后續創建socket時會被關聯起來。handle作為參數opaque傳遞入socket_server.c中的socket_server_listen方法中:
1 int 2 socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) { 3 int fd = do_listen(addr, port, backlog); 4 if (fd < 0) { 5 return -1; 6 } 7 struct request_package request; 8 int id = reserve_id(ss); 9 if (id < 0) { 10 close(fd); 11 return id; 12 } 13 request.u.listen.opaque = opaque; 14 request.u.listen.id = id; 15 request.u.listen.fd = fd; 16 send_request(ss, &request, 'L', sizeof(request.u.listen)); 17 return id; 18 }
在作了bind和listen之后,將此socket描述符打包為request寫入socket_server的讀管道,由socket_server_poll輪循處理。至此,gate服務中listen的流程已經非常清晰了。
再回到gateserver.CMD.open方法中,在socketdriver.listen之后,緊接着調用socketdriver.start開始網絡事件的處理(具體細節請按上述流程參照源碼),最后調用gate.lua的回調handler.open。在這個過程中,gate服務的skynet-context-handle已經與對應的socket綁定了,后續關於此socket的SOCKET消息都會轉發到gate服務中來,具體到代碼則是由gateserver接收過濾后,再做進一步的分發處理。
那么一個新的客戶端連接是如何被接收創建的呢?
在socket_server_poll過程中,會檢查是否有網絡事件產生(以下是簡化的代碼):
1 int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) { 2 for (;;) { 3 // 管道select 4 ...... 5 6 // socket event check 7 ...... 8 9 // s: socket 10 switch (s->type) { 11 case SOCKET_TYPE_CONNECTING: 12 return report_connect(ss, s, result); 13 case SOCKET_TYPE_LISTEN: { 14 int ok = report_accept(ss, s, result); 15 if (ok > 0) { return SOCKET_ACCEPT; }
17 if (ok < 0) { return SOCKET_ERROR; } 20 // when ok == 0, retry 21 break; 22 }
23 // other
24 ......25 } 26 }
對於正在listen的socket(SOCKET_TYPE_LISTEN),當發生事件(即偵測到有新的連接)時,會在report_accept中accept得連接描述符fd並創建socket結構,然后返回SOCKET_ACCEPT交由上層的skynet_socket.c:skynet_socket_poll處理,后者會封裝類型為SKYNET_SOCKET_TYPE_ACCEPT的skynet-message並推入到gate服務的隊列中去,最終轉發到gateserver中所注冊的SOCKET協議入口。
回到gateserver中來(見上述gateserver摘錄的代碼),在接收到網絡消息時,先是在unpack中通過netpack(見lua-netpack.c)合並過濾消息,比如TCP消息粘包等(注意skynet_socket_message如果其padding為true,則表示非數據的命令,比如SKYNET_SOCKET_TYPE_ACCEPT)。對於SKYNET_SOCKET_TYPE_ACCEPT命令,netpack會解析並轉換為open命令,最后gateserver會調用到MSG.open:
1 function MSG.open(fd, msg) 2 if client_number >= maxclient then 3 socketdriver.close(fd) 4 return 5 end 6 if nodelay then 7 socketdriver.nodelay(fd) 8 end 9 connection[fd] = true 10 client_number = client_number + 1 11 handler.connect(fd, msg) 12 end
回調到gate.lua中的connect:
1 function handler.connect(fd, addr) 2 local c = { 3 fd = fd, 4 ip = addr, 5 } 6 connection[fd] = c 7 skynet.send(watchdog, "lua", "socket", "open", fd, addr) 8 end
watchdog.lua中的SOCKET.open:
1 function SOCKET.open(fd, addr) 2 skynet.error("New client from : " .. addr) 3 agent[fd] = skynet.newservice("agent") 4 skynet.call(agent[fd], "lua", "start", { gate = gate, client = fd, watchdog = skynet.self() }) 5 end
此時,會創建玩家agent服務,並調用start方法:
1 function CMD.start(conf) 2 local fd = conf.client 3 local gate = conf.gate 4 WATCHDOG = conf.watchdog 5 -- slot 1,2 set at main.lua 6 host = sprotoloader.load(1):host "package" 7 send_request = host:attach(sprotoloader.load(2)) 8 skynet.fork(function() 9 while true do 10 send_package(send_request "heartbeat") 11 skynet.sleep(500) 12 end 13 end) 14 15 client_fd = fd 16 skynet.call(gate, "lua", "forward", fd) 17 end
agent拿到gate服務的標識后,調用forward將自己的標識注冊到gate服務中來:
1 function CMD.forward(source, fd, client, address) 2 local c = assert(connection[fd]) 3 unforward(c) 4 c.client = client or 0 5 c.agent = address or source 6 forwarding[c.agent] = c 7 gateserver.openclient(fd) 8 end
gateserver.openclient開始偵聽此socket的事件:
function gateserver.openclient(fd) if connection[fd] then socketdriver.start(fd) end end
至此,一個新連接的建立流程就結束了。那么連接建立后網絡數據又是如何轉發進來的呢?流程依然是一致的,socket_server_poll偵測到READ后讀取數據並轉發到gateserver中來,后者調用netpack對數據粘包,此時會調用MSG.more或MSG.data將數據轉交給gate.message:
1 function handler.message(fd, msg, sz) 2 -- recv a package, forward it 3 local c = connection[fd] 4 local agent = c.agent 5 if agent then 6 skynet.redirect(agent, c.client, "client", 0, msg, sz) 7 else 8 skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz)) 9 end 10 end
直接將數據轉發給之前通過forward注冊進來的agent,采用的協議是"client"。agent中需注冊此協議的處理,拿到字節流並根據上層的業務協議對數據轉碼,做進一步的處理。這樣,數據接收和轉發的流程就結束了。其它方面,比如數據發送,關閉socket等等,流程上都是一致的,具體細節不再詳述。
