skynet源碼分析之網絡層——Lua層介紹


本篇主要介紹在Lua服務里調用skynet網絡層底層接口的流程,Lua層的api主要在lualib/skynet/socket.lua,可參考官方wiki https://github.com/cloudwu/skynet/wiki/Socket

通過一個簡單的例子說明Lua服務是如何最終調用到網絡層底層接口的:

 1    local socket = require “socket”
 2    local skynet = require "skynet"
 3     
 4    local function loop(fd)
 5        socket.start(fd)
 6        while true do
 7            local data = socket.readline('\n')
 8            print(data, #data)
 9        end
10    end
11   
12   skynet.start(function()
13       local listen_fd = socket.listen(ip, hort)
14       socket.start(listen_fd, function(fd, addr)
15           print("connect fd[%d], addr[%s]", fd, addr)
16           skynet.fork(loop, fd)
17       end)
18   end)
1. api調用流程概述

在服務啟動時,調用socket.listen監聽。調用流程是:driver.listen(第7行)——>skynet_socket_listen(第17行)——>socket_server_listen(第29行)——>send_request(第47行),最后向發送管道寫數據。Lua接口執行流程是:socket.lua -> lua-socket.c ->skynet_socket.c -> socket_server.c

注:第34行,do_listen依次調用了unix網絡系統接口socket,bind,listen。

 1 // lualib/skynet/socket.lua
 2 function socket.listen(host, port, backlog)
 3     if port == nil then
 4         host, port = string.match(host, "([^:]+):(.+)$")
 5         port = tonumber(port)
 6     end
 7     return driver.listen(host, port, backlog)
 8 end
 9 
10 // lualib-src/lua-socket.c
11 static int
12 llisten(lua_State *L) {
13     const char * host = luaL_checkstring(L,1);
14     int port = luaL_checkinteger(L,2);
15     int backlog = luaL_optinteger(L,3,BACKLOG);
16     struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
17     int id = skynet_socket_listen(ctx, host,port,backlog);
18     if (id < 0) {
19         return luaL_error(L, "Listen error");
20     }
21 
22     lua_pushinteger(L,id);
23     return 1;
24 }
25 
26 // skynet-src/skynet_socket.c
27 skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog) {
28     uint32_t source = skynet_context_handle(ctx);
29     return socket_server_listen(SOCKET_SERVER, source, host, port, backlog);
30 }
31 
32 // skynet-src/socket_server.c
33 socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) {
34     int fd = do_listen(addr, port, backlog);
35     if (fd < 0) {
36         return -1;
37     }
38     struct request_package request;
39     int id = reserve_id(ss);
40     if (id < 0) {
41         close(fd);
42         return id;
43     }
44     request.u.listen.opaque = opaque;
45     request.u.listen.id = id;
46     request.u.listen.fd = fd;
47     send_request(ss, &request, 'L', sizeof(request.u.listen));
48     return id;
49 }
2. socket連接過程

skynet里的socket結構有幾種狀態:

1 #define SOCKET_TYPE_INVALID 0 //可使用
2 #define SOCKET_TYPE_RESERVE 1 //已占用
3 #define SOCKET_TYPE_PLISTEN 2 //等待監聽(監聽套接字擁有)
4 #define SOCKET_TYPE_LISTEN 3 //監聽,可接受客戶端的連接(監聽套接字才擁有)
5 #define SOCKET_TYPE_CONNECTING 4 //正在連接(connect失敗時狀態,tcp會嘗試重新connect)
6 #define SOCKET_TYPE_CONNECTED 5 //已連接,可以收發數據
7 #define SOCKET_TYPE_HALFCLOSE 6
8 #define SOCKET_TYPE_PACCEPT 7 //等待連接(連接套接字才擁有)
9 #define SOCKET_TYPE_BIND 8

當工作線程執行socket.listen后,socket線程從接收管道讀取數據,執行ctrl_cmd,調用listen_socket(第6行),此時該socket狀態是SOCKET_TYPE_PLISTEN(第18行)

 1 // skynet-src/socket_server.c
 2 static int
 3 ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
 4     ...  
 5     case 'L':
 6         return listen_socket(ss,(struct request_listen *)buffer, result);    
 7     ...
 8 }
 9 
10 static int
11 listen_socket(struct socket_server *ss, struct request_listen * request, struct socket_message *result) {
12     int id = request->id;
13     int listen_fd = request->fd;
14     struct socket *s = new_fd(ss, id, listen_fd, PROTOCOL_TCP, request->opaque, false);
15     if (s == NULL) {
16         goto _failed;
17     }
18     s->type = SOCKET_TYPE_PLISTEN;
19     return -1;
20     ...
21 }

接着,Lua服務調用socket.start,最終socket線程執行start_socket,此時socket狀態是SOCKET_TYPE_LISTEN,等待客戶端的連接請求。

 1 // skynet-src/socket_server.c
 2   static int
 3   start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) {
 4       ...
 5       if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) {
 6           if (sp_add(ss->event_fd, s->fd, s)) {
 7               force_close(ss, s, &l, result);
 8               result->data = strerror(errno);
 9               return SOCKET_ERR;
10           }
11          s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN;
12          s->opaque = request->opaque;
13          result->data = "start";
14          return SOCKET_OPEN;
15      }
16      ...
17  }

 當客戶端發起連接請求后,epoll事件返回,調用report_accept(第5行)

第14行,調用unix網絡系統接口accept,接受客戶端的請求。由於客戶端已發起連接,所以不會阻塞。

第16行,從socket池中獲取可用的socket id

17-22行,初始化該socket,此時socket狀態是SOCKET_TYPE_PACCEPT

 1 int 
 2 socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
 3     ...
 4     case SOCKET_TYPE_LISTEN: {
 5         int ok = report_accept(ss, s, result);
 6     ...
 7 }
 8 
 9 // return 0 when failed, or -1 when file limit
10 static int
11 report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) {
12     union sockaddr_all u;
13     socklen_t len = sizeof(u);
14     int client_fd = accept(s->fd, &u.s, &len);
15     ...
16     int id = reserve_id(ss);
17     struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);
18     ns->type = SOCKET_TYPE_PACCEPT;
19     result->opaque = s->opaque;
20     result->id = s->id;
21     result->ud = id;
22     result->data = NULL;
23 
24     ...
25     return 1;
26 }

接着,Lua服務再次調用socket.start(id),此時id是連接的socket,而不是監聽的socket。此時,socket狀態是SOCKET_TYPE_CONNECTED,連接已經建立,可以收發數據。這就是整個socket連接過程。

至於怎么通知到 Lua服務稍后分析。

1  // skynet-src/socket_server.c
2  static int
3  start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) {
4      ...
5      s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN;
6      ...
7  }

關閉socket,socket.close

發送數據有兩個api,正常發送socket.write, 低優先級發送socket.lwrite。

3. 網絡層如何通知給Lua服務

socket線程在運行過程(socket_server_poll)中,當收到網絡數據會調用forward_message_tcp

第19行,調用unix系統接口讀取socket上的數據

21-24行,采用args-value形式構造result,opaque是Lua服務的地址,id是該socket在池中的索引,ud是實際讀取到的字節數,data是數據

第25行,返回SOCKET_DATA,表示接收到數據。

 1 // skynet-src/socket_server.c
 2 int 
 3 socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
 4     ...
 5     default:
 6         if (e->read) {
 7             int type;
 8             if (s->protocol == PROTOCOL_TCP) {
 9                 type = forward_message_tcp(ss, s, &l, result);
10         ...
11     return type
12 }
13 
14 static int
15 forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * resu
16 lt) {
17     int sz = s->p.size;
18     char * buffer = MALLOC(sz);
19     int n = (int)read(s->fd, buffer, sz);
20     ...  
21     result->opaque = s->opaque;
22     result->id = s->id;
23     result->ud = n;
24     result->data = buffer;
25     return SOCKET_DATA;
26 }

 由於socket_server_poll返回的是SOCKET_DATA,調用forward_message(第11行),

23-26行,構造即將要發送的消息數據,用到了上面返回的result

28-32行,構造skynet消息結構,因為是在網絡層發送的,不是具體的某個服務,所以source,session字段都設置成0即可

第34行,把消息發送給與socket對應的服務地址。

至此,網絡消息通知給具體的Lua服務。

 1 // skynet-src/skynet_socket.c
 2 int 
 3 skynet_socket_poll() {
 4     struct socket_server *ss = SOCKET_SERVER;
 5     assert(ss);
 6     struct socket_message result;
 7     int more = 1;
 8     int type = socket_server_poll(ss, &result, &more);
 9     switch (type) {
10     case SOCKET_DATA:
11         forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
12         break;
13         ...
14     return 115 }
16 
17 // mainloop thread
18 static void
19 forward_message(int type, bool padding, struct socket_message * result) {
20     struct skynet_socket_message *sm;
21     size_t sz = sizeof(*sm);
22     ...
23     sm = (struct skynet_socket_message *)skynet_malloc(sz);
24     sm->type = type;
25     sm->id = result->id;
26     sm->ud = result->ud;
27     ...
28     struct skynet_message message;
29     message.source = 0;
30     message.session = 0;
31     message.data = sm;
32     message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);
33         
34     if (skynet_context_push((uint32_t)result->opaque, &message)) {
35         // todo: report somewhere to close socket
36         // don't call skynet_socket_close here (It will block mainloop)
37         skynet_free(sm->buffer);
38         skynet_free(sm);
39     }
40 }

 4. Lua服務處理流程

當網絡數據到達Lua服務時,lualib/skynet/socket.lua中提供了相應的處理方案。調用消息分發函數socket_message,網絡數據類型包含正常數據傳輸(DATA),連接(CONNECT),關閉(CLOSE),錯誤(ERROR)等。

第15行,把客戶端發過來的數據push到該socket的緩沖池中。

 1  -- lualib/skynet/socket.lua
 2  skynet.register_protocol {
 3      name = "socket",
 4      id = skynet.PTYPE_SOCKET,       -- PTYPE_SOCKET = 6
 5      unpack = driver.unpack,
 6      dispatch = function (_, _, t, ...)
 7          socket_message[t](...)
 8      end
 9  }
10  
11  -- SKYNET_SOCKET_TYPE_DATA = 1
12  socket_message[1] = function(id, size, data)
13      local s = socket_pool[id]
14      ...
15      local sz = driver.push(s.buffer, buffer_pool, data, size)
16      ...
17  }

socket.read(id, sz),從一個socket上讀sz指定的字節數,如果緩沖池里有足夠多的數據,從緩沖池里pop出直接返回(第5行),否則,暫停當前協程(第15行),當數據夠或者連接斷開時重啟協程。

 1 -- lualib/skynet/socket.lua
 2  function socket.read(id, sz)
 3      local s = socket_pool[id]
 4      assert(s)
 5      ...
 6      local ret = driver.pop(s.buffer, buffer_pool, sz)
 7      if ret then
 8          return ret
 9      end
10      if not s.connected then
11          return false, driver.readall(s.buffer, buffer_pool)
12      end
13  
14      assert(not s.read_required)
15      s.read_required = sz
16      suspend(s)
17      ret = driver.pop(s.buffer, buffer_pool, sz)
18      if ret then
19          return ret
20      else
21          return false, driver.readall(s.buffer, buffer_pool)
22      end
23  end

socket.readline(id, sep),從一個socket上讀以sep分割的數據,默認是"\n",即讀一行數據。注:該api可以指定分隔符,不單單是一行數據。

socket.abandon(id),清除socket id在本服務內的數據結構,但不並關閉這個socket,用於把id轉給其他服務控制。通常,會設計一個master服務接收外部連接,等連接上后再將socket分配給一個slave服務控制,減少master服務的壓力。

5. 總結

socket庫的使用流程一般是:

-- master服務
local listen_fd = socket.listen(ip, port)  //監聽一個地址
socket.start(listen_fd, function(fd, addr)
     slave.post.start(fd)  //客戶端連接上,轉交給slave
     socket.abandon(fd)
end)

-- slave服務
function accept.start(fd)
      socket.start(fd) //接管socket
       ...
end 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM