本篇主要介紹在Lua服務里調用skynet網絡層底層接口的流程,Lua層的api主要在lualib/skynet/socket.lua,可參考官方wiki https://github.com/cloudwu/skynet/wiki/Socket。
通過一個簡單的例子說明Lua服務是如何最終調用到網絡層底層接口的:
1 local socket = require “socket” 2 local skynet = require "skynet" 3 4 local function loop(fd) 5 socket.start(fd) 6 while true do 7 local data = socket.readline('\n') 8 print(data, #data) 9 end 10 end 11 12 skynet.start(function() 13 local listen_fd = socket.listen(ip, hort) 14 socket.start(listen_fd, function(fd, addr) 15 print("connect fd[%d], addr[%s]", fd, addr) 16 skynet.fork(loop, fd) 17 end) 18 end)
1. api調用流程概述
在服務啟動時,調用socket.listen監聽。調用流程是:driver.listen(第7行)——>skynet_socket_listen(第17行)——>socket_server_listen(第29行)——>send_request(第47行),最后向發送管道寫數據。Lua接口執行流程是:socket.lua -> lua-socket.c ->skynet_socket.c -> socket_server.c
注:第34行,do_listen依次調用了unix網絡系統接口socket,bind,listen。
1 // lualib/skynet/socket.lua 2 function socket.listen(host, port, backlog) 3 if port == nil then 4 host, port = string.match(host, "([^:]+):(.+)$") 5 port = tonumber(port) 6 end 7 return driver.listen(host, port, backlog) 8 end 9 10 // lualib-src/lua-socket.c 11 static int 12 llisten(lua_State *L) { 13 const char * host = luaL_checkstring(L,1); 14 int port = luaL_checkinteger(L,2); 15 int backlog = luaL_optinteger(L,3,BACKLOG); 16 struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1)); 17 int id = skynet_socket_listen(ctx, host,port,backlog); 18 if (id < 0) { 19 return luaL_error(L, "Listen error"); 20 } 21 22 lua_pushinteger(L,id); 23 return 1; 24 } 25 26 // skynet-src/skynet_socket.c 27 skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog) { 28 uint32_t source = skynet_context_handle(ctx); 29 return socket_server_listen(SOCKET_SERVER, source, host, port, backlog); 30 } 31 32 // skynet-src/socket_server.c 33 socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) { 34 int fd = do_listen(addr, port, backlog); 35 if (fd < 0) { 36 return -1; 37 } 38 struct request_package request; 39 int id = reserve_id(ss); 40 if (id < 0) { 41 close(fd); 42 return id; 43 } 44 request.u.listen.opaque = opaque; 45 request.u.listen.id = id; 46 request.u.listen.fd = fd; 47 send_request(ss, &request, 'L', sizeof(request.u.listen)); 48 return id; 49 }
2. socket連接過程
skynet里的socket結構有幾種狀態:
1 #define SOCKET_TYPE_INVALID 0 //可使用 2 #define SOCKET_TYPE_RESERVE 1 //已占用 3 #define SOCKET_TYPE_PLISTEN 2 //等待監聽(監聽套接字擁有) 4 #define SOCKET_TYPE_LISTEN 3 //監聽,可接受客戶端的連接(監聽套接字才擁有) 5 #define SOCKET_TYPE_CONNECTING 4 //正在連接(connect失敗時狀態,tcp會嘗試重新connect) 6 #define SOCKET_TYPE_CONNECTED 5 //已連接,可以收發數據 7 #define SOCKET_TYPE_HALFCLOSE 6 8 #define SOCKET_TYPE_PACCEPT 7 //等待連接(連接套接字才擁有) 9 #define SOCKET_TYPE_BIND 8
當工作線程執行socket.listen后,socket線程從接收管道讀取數據,執行ctrl_cmd,調用listen_socket(第6行),此時該socket狀態是SOCKET_TYPE_PLISTEN(第18行)
1 // skynet-src/socket_server.c 2 static int 3 ctrl_cmd(struct socket_server *ss, struct socket_message *result) { 4 ... 5 case 'L': 6 return listen_socket(ss,(struct request_listen *)buffer, result); 7 ... 8 } 9 10 static int 11 listen_socket(struct socket_server *ss, struct request_listen * request, struct socket_message *result) { 12 int id = request->id; 13 int listen_fd = request->fd; 14 struct socket *s = new_fd(ss, id, listen_fd, PROTOCOL_TCP, request->opaque, false); 15 if (s == NULL) { 16 goto _failed; 17 } 18 s->type = SOCKET_TYPE_PLISTEN; 19 return -1; 20 ... 21 }
接着,Lua服務調用socket.start,最終socket線程執行start_socket,此時socket狀態是SOCKET_TYPE_LISTEN,等待客戶端的連接請求。
1 // skynet-src/socket_server.c 2 static int 3 start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) { 4 ... 5 if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) { 6 if (sp_add(ss->event_fd, s->fd, s)) { 7 force_close(ss, s, &l, result); 8 result->data = strerror(errno); 9 return SOCKET_ERR; 10 } 11 s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN; 12 s->opaque = request->opaque; 13 result->data = "start"; 14 return SOCKET_OPEN; 15 } 16 ... 17 }
當客戶端發起連接請求后,epoll事件返回,調用report_accept(第5行)
第14行,調用unix網絡系統接口accept,接受客戶端的請求。由於客戶端已發起連接,所以不會阻塞。
第16行,從socket池中獲取可用的socket id
17-22行,初始化該socket,此時socket狀態是SOCKET_TYPE_PACCEPT
1 int 2 socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) { 3 ... 4 case SOCKET_TYPE_LISTEN: { 5 int ok = report_accept(ss, s, result); 6 ... 7 } 8 9 // return 0 when failed, or -1 when file limit 10 static int 11 report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) { 12 union sockaddr_all u; 13 socklen_t len = sizeof(u); 14 int client_fd = accept(s->fd, &u.s, &len); 15 ... 16 int id = reserve_id(ss); 17 struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false); 18 ns->type = SOCKET_TYPE_PACCEPT; 19 result->opaque = s->opaque; 20 result->id = s->id; 21 result->ud = id; 22 result->data = NULL; 23 24 ... 25 return 1; 26 }
接着,Lua服務再次調用socket.start(id),此時id是連接的socket,而不是監聽的socket。此時,socket狀態是SOCKET_TYPE_CONNECTED,連接已經建立,可以收發數據。這就是整個socket連接過程。
至於怎么通知到 Lua服務稍后分析。
1 // skynet-src/socket_server.c 2 static int 3 start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) { 4 ... 5 s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN; 6 ... 7 }
關閉socket,socket.close
發送數據有兩個api,正常發送socket.write, 低優先級發送socket.lwrite。
3. 網絡層如何通知給Lua服務
socket線程在運行過程(socket_server_poll)中,當收到網絡數據會調用forward_message_tcp
第19行,調用unix系統接口讀取socket上的數據
21-24行,采用args-value形式構造result,opaque是Lua服務的地址,id是該socket在池中的索引,ud是實際讀取到的字節數,data是數據
第25行,返回SOCKET_DATA,表示接收到數據。
1 // skynet-src/socket_server.c 2 int 3 socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) { 4 ... 5 default: 6 if (e->read) { 7 int type; 8 if (s->protocol == PROTOCOL_TCP) { 9 type = forward_message_tcp(ss, s, &l, result); 10 ... 11 return type 12 } 13 14 static int 15 forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * resu 16 lt) { 17 int sz = s->p.size; 18 char * buffer = MALLOC(sz); 19 int n = (int)read(s->fd, buffer, sz); 20 ... 21 result->opaque = s->opaque; 22 result->id = s->id; 23 result->ud = n; 24 result->data = buffer; 25 return SOCKET_DATA; 26 }
由於socket_server_poll返回的是SOCKET_DATA,調用forward_message(第11行),
23-26行,構造即將要發送的消息數據,用到了上面返回的result
28-32行,構造skynet消息結構,因為是在網絡層發送的,不是具體的某個服務,所以source,session字段都設置成0即可
第34行,把消息發送給與socket對應的服務地址。
至此,網絡消息通知給具體的Lua服務。
1 // skynet-src/skynet_socket.c 2 int 3 skynet_socket_poll() { 4 struct socket_server *ss = SOCKET_SERVER; 5 assert(ss); 6 struct socket_message result; 7 int more = 1; 8 int type = socket_server_poll(ss, &result, &more); 9 switch (type) { 10 case SOCKET_DATA: 11 forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result); 12 break; 13 ... 14 return 1; 15 } 16 17 // mainloop thread 18 static void 19 forward_message(int type, bool padding, struct socket_message * result) { 20 struct skynet_socket_message *sm; 21 size_t sz = sizeof(*sm); 22 ... 23 sm = (struct skynet_socket_message *)skynet_malloc(sz); 24 sm->type = type; 25 sm->id = result->id; 26 sm->ud = result->ud; 27 ... 28 struct skynet_message message; 29 message.source = 0; 30 message.session = 0; 31 message.data = sm; 32 message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT); 33 34 if (skynet_context_push((uint32_t)result->opaque, &message)) { 35 // todo: report somewhere to close socket 36 // don't call skynet_socket_close here (It will block mainloop) 37 skynet_free(sm->buffer); 38 skynet_free(sm); 39 } 40 }
4. Lua服務處理流程
當網絡數據到達Lua服務時,lualib/skynet/socket.lua中提供了相應的處理方案。調用消息分發函數socket_message,網絡數據類型包含正常數據傳輸(DATA),連接(CONNECT),關閉(CLOSE),錯誤(ERROR)等。
第15行,把客戶端發過來的數據push到該socket的緩沖池中。
1 -- lualib/skynet/socket.lua 2 skynet.register_protocol { 3 name = "socket", 4 id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6 5 unpack = driver.unpack, 6 dispatch = function (_, _, t, ...) 7 socket_message[t](...) 8 end 9 } 10 11 -- SKYNET_SOCKET_TYPE_DATA = 1 12 socket_message[1] = function(id, size, data) 13 local s = socket_pool[id] 14 ... 15 local sz = driver.push(s.buffer, buffer_pool, data, size) 16 ... 17 }
socket.read(id, sz),從一個socket上讀sz指定的字節數,如果緩沖池里有足夠多的數據,從緩沖池里pop出直接返回(第5行),否則,暫停當前協程(第15行),當數據夠或者連接斷開時重啟協程。
1 -- lualib/skynet/socket.lua 2 function socket.read(id, sz) 3 local s = socket_pool[id] 4 assert(s) 5 ... 6 local ret = driver.pop(s.buffer, buffer_pool, sz) 7 if ret then 8 return ret 9 end 10 if not s.connected then 11 return false, driver.readall(s.buffer, buffer_pool) 12 end 13 14 assert(not s.read_required) 15 s.read_required = sz 16 suspend(s) 17 ret = driver.pop(s.buffer, buffer_pool, sz) 18 if ret then 19 return ret 20 else 21 return false, driver.readall(s.buffer, buffer_pool) 22 end 23 end
socket.readline(id, sep),從一個socket上讀以sep分割的數據,默認是"\n",即讀一行數據。注:該api可以指定分隔符,不單單是一行數據。
socket.abandon(id),清除socket id在本服務內的數據結構,但不並關閉這個socket,用於把id轉給其他服務控制。通常,會設計一個master服務接收外部連接,等連接上后再將socket分配給一個slave服務控制,減少master服務的壓力。
5. 總結
socket庫的使用流程一般是:
-- master服務 local listen_fd = socket.listen(ip, port) //監聽一個地址 socket.start(listen_fd, function(fd, addr) slave.post.start(fd) //客戶端連接上,轉交給slave socket.abandon(fd) end) -- slave服務 function accept.start(fd) socket.start(fd) //接管socket ... end