為了了解 skynet.call 的調用過程,需要先看看 skynet的隊列是如何把包分到不同工作線程的。看下圖
查看 global_queue 的skynet_globalmq_push和skynet_globamq_pop,很容易可以找到兩個關鍵的函數:
skyent_context_push
和
skynet_context_message_dispatch
先來看出口,skynet_context_message_dispatch。在skynet的啟動函數中,我們已經知道skynet_start里面的start(config->thread)啟動了 worker等線程:
thread_worker(void *p) { //初始化 ... struct message_queue * q = NULL; while (!m->quit) { //循環調用 skynet_context_message_dispatch q = skynet_context_message_dispatch(sm, q, weight); if (q == NULL) { //沒包了就掛其線程 ... } } return NULL; }
很清晰的代碼,worker線程不斷調用 skynet_context_message_dispatch 來讀取q里面的skynet_message 隊列,並進行分發。我們來看它是怎么分發的。
struct message_queue * skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) { //q為空時,重新從global_queue中取下一個 message_queue if (q == NULL) { q = skynet_globalmq_pop(); if (q==NULL) return NULL; } //當前 message_queue 所屬服務的 context 的handle id uint32_t handle = skynet_mq_handle(q); struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { struct drop_t d = { handle }; skynet_mq_release(q, drop_message, &d); return skynet_globalmq_pop(); } int i,n=1; struct skynet_message msg; for (i=0;i<n;i++) { //從message_queue 中 pop一個msg出來 if (skynet_mq_pop(q,&msg)) { //若message_queue為空,返回1表示失敗,釋放ctx的引用次數 skynet_context_release(ctx); //把返回global_queue里面的下一個message_queue,以供skynet_context_message_dispatch調用 return skynet_globalmq_pop(); } else if (i==0 && weight >= 0) { n = skynet_mq_length(q); n >>= weight; } int overload = skynet_mq_overload(q); if (overload) { skynet_error(ctx, "May overload, message queue length = %d", overload); } skynet_monitor_trigger(sm, msg.source , handle); //若 ctx->cb不為空,使用dispatch_message調用 ctx->cb if (ctx->cb == NULL) { skynet_free(msg.data); } else { dispatch_message(ctx, &msg); } skynet_monitor_trigger(sm, 0,0); } assert(q == ctx->queue); //若global_queue中還有下一個message_queue,返回下一個message_queue供分發,若為空則繼續執行當前message_queue的請求 struct message_queue *nq = skynet_globalmq_pop(); if (nq) { // If global mq is not empty , push q back, and return next queue (nq) // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch) skynet_globalmq_push(q); q = nq; } skynet_context_release(ctx); return q; }
那么,從全局隊列最終拿到的 skynet_message包,最后交由了 dispatch_message和ctx-cb來處理了。dispatch_message把msg里面的東西取出來后,調用ctx->cb來進行處理
static void dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) { ... if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) { skynet_free(msg->data); } CHECKCALLING_END(ctx) }
在skynet的啟動筆記中,已經知道了,首先是:
snlua_init 用 skynet_callback(ctx,l,_launch) 把 ctx->cb注冊為 _launch
然后立馬投遞第一個消息
消息重新進到 dispatch_message,調用 _launch ,把 ctx->cb注冊為 skynet.dispatch_message
啟動完成后,以后的所有消息其實都進到了 skynet.dispatch_message,然后調用了 raw_dispatch_message
function skynet.dispatch_message(...) local succ, err = pcall(raw_dispatch_message,...) ... end
那接着來看 raw_dispatch_message
local function raw_dispatch_message(prototype, msg, sz, session, source, ...) if prototype == 1 then ... else local p = proto[prototype] ... local f = p.dispatch if f then ... local co = co_create(f) ... suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...))) end end end
接下來要找的是 proto[prototype].disproto所有位置,找出那里定義 dispatch
的。看到 這個函數:
function skynet.register_protocol(class) local name = class.name local id = class.id ... proto[name] = class proto[id] = class end
以及
function skynet.dispatch(typename, func) local p = proto[typename] if func then local ret = p.dispatch p.dispatch = func return ret else return p and p.dispatch end end
沒錯了,就是class.dispatch 。所有的消息,最終進到的就是 class.dispatch。
///////////////////////////////////////////////////////////////////////
skynet.regiser_protocol 和 skynet.dispatch 你會在lua服務中經常看見。以launcher.lua為例子
launcher.lua在啟動時,注冊一個 name為"text"的table,它的dispatch也定義在下面
所以你應該能看到 skynet.call(".launcher","text",...)這種調用
skynet.register_protocol { name = "text", id = skynet.PTYPE_TEXT, unpack = skynet.tostring, dispatch = function(session, address , cmd) if cmd == "" then command.LAUNCHOK(address) elseif cmd == "ERROR" then command.ERROR(address) else error ("Invalid text command " .. cmd) end end, } //定義 launcher服務的 proto["lua"] 的dispatch skynet.dispatch("lua", function(session, address, cmd , ...) cmd = string.upper(cmd) local f = command[cmd] if f then local ret = f(address, ...) if ret ~= NORET then skynet.ret(skynet.pack(ret)) end else skynet.ret(skynet.pack {"Unknown command"} ) end end)
但這里還有一個問題,上面的proto["lua"] 是誰注冊的呢? 查找skynet.register_protocol,我們能找到這個位置:
--skynet.lua ----- register protocol do local REG = skynet.register_protocol REG { name = "lua", id = skynet.PTYPE_LUA, pack = skynet.pack, unpack = skynet.unpack, } REG { name = "response", id = skynet.PTYPE_RESPONSE, } REG { name = "error", id = skynet.PTYPE_ERROR, unpack = function(...) return ... end, dispatch = _error_dispatch, } end
在你第一次require "skynet"
的時候,它已經默認幫你注冊了"lua","response","error"3種消息,然后你創建新的lua服務時,調用skynet.dispatch 為 proto["lua"] 指定dispatch,之后通過 skynet.call("服務名","lua",...) 調用的消息就能最終投遞到你定義的處理函數里面了。
到了這里,從隊列取出數據,並分發到指定處理函數dispath的完整流程我們以及看到了。接下來,我們來看 消息是如果放入global_queue的。
來看 skynet.call 函數(skynet.send其實也一樣的,只是它不管返回)
function skynet.call(addr, typename, ...) //如proto["lua"] ,消息類型id放入msg中 local p = proto[typename] local session = c.send(addr, p.id , nil , p.pack(...)) ... //等待返回 return p.unpack(yield_call(addr, session)) end
這里的 c.send 的調用,我們看一下 c 的定義:
local c = require "skynet.core"
這里的 skynet.core ,實際上調用的是 skynet.so ,而從 skynet 的make log我們可以看到這樣一行:
cc -g -O2 -Wall -I3rd/lua -fPIC --shared lualib-src/lua-skynet.c lualib-src/lua-seri.c -o luaclib/skynet.so -Iskynet-src -Iservice-src -Ilualib-src
在 lualib-src/lua-skynet.c 中,我們看到這段代碼:
luaL_Reg l[] = { { "send" , _send }, { "genid", _genid }, { "redirect", _redirect }, { "command" , _command }, { "intcommand", _intcommand }, { "error", _error }, { "tostring", _tostring }, { "harbor", _harbor }, { "pack", _luaseri_pack }, { "unpack", _luaseri_unpack }, { "packstring", lpackstring }, { "trash" , ltrash }, { "callback", _callback }, { NULL, NULL }, };
這里的 luaL_Reg 把c函數注冊到lua中,從而讓lua調用這些函數。
所以 c.send 調用的,就是這里的 _send
_send 調用了 skynet_send ,如果目標在當前進程,將調用 skynet_context_push
然后 skyent_context_push 調用
skynet_mq_push(ctx->queue, message);
把消息放如了全局隊列,最后來看看 skynet_mq_push :
void skynet_mq_push(struct message_queue *q, struct skynet_message *message) { assert(message); SPIN_LOCK(q) //把msg放到隊列尾,然后 ++ q->taiskynet_globalmq_pushl q->queue[q->tail] = *message; if (++ q->tail >= q->cap) { q->tail = 0; } if (q->head == q->tail) { expand_queue(q); } //若ctx->queue未放入global_queue,放進去 if (q->in_global == 0) { q->in_global = MQ_IN_GLOBAL; skynet_globalmq_push(q); } SPIN_UNLOCK(q) }
值得一提的是,取消息從 ctx->queue 的head開始取,push消息則是從 tail push。
所以先投遞的消息會先執行,但由於協程的原因,還是不能保證先投遞的消息先執行完。