從四個方面來說:
1、消息發送
2、工作線程控制
3、信箱調度
4、消息分發
與調度相關的代碼實現在/skynet-src/skynet_mq.c,/skynet-src/skynet_start.c,/skynet-src/skynet_server.c三個文件中,整體上是一個m:n的調度器。
消息發送
skynet的消息定義在/skynet-src/skynet_mq.h中:
struct skynet_message { uint32_t source; int session; void * data; size_t sz; }; // type is encoding in skynet_message.sz high 8bit
#define MESSAGE_TYPE_MASK (SIZE_MAX >> 8)
#define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)
source:消息源(sc)的句柄。
session:用來做上下文的標識
data:消息指針
sz:消息長度,消息的請求類型定義在高8位
消息發出后,會被保存在接收者sc的信箱(message_queue字段)中,發送消息也就是向信箱壓入一條消息。來看看發送函數吧,在/skynet-src/skynet_server.c中:
1 int
2 skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) { 3 if ((sz & MESSAGE_TYPE_MASK) != sz) { 4 skynet_error(context, "The message to %x is too large", destination); 5 if (type & PTYPE_TAG_DONTCOPY) { 6 skynet_free(data); 7 } 8 return -1; 9 } 10 _filter_args(context, type, &session, (void **)&data, &sz); 11
12 if (source == 0) { 13 source = context->handle; 14 } 15
16 if (destination == 0) { 17 return session; 18 } 19 if (skynet_harbor_message_isremote(destination)) { 20 struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg)); 21 rmsg->destination.handle = destination; 22 rmsg->message = data; 23 rmsg->sz = sz; 24 skynet_harbor_send(rmsg, source, session); 25 } else { 26 struct skynet_message smsg; 27 smsg.source = source; 28 smsg.session = session; 29 smsg.data = data; 30 smsg.sz = sz; 31
32 if (skynet_context_push(destination, &smsg)) { 33 skynet_free(data); 34 return -1; 35 } 36 } 37 return session; 38 }
3-9行對消息長度做了限制,MESSAGE_TYPE_MASK等於(SIZE_MAX >> 8),也就是最大只能為224,16MB。
_filter_args根據type做了兩個處理:
1、(type & PTYPE_TAG_DONTCOPY) == 0
會將data復制一份用作實際發送,這種情況下原來的data就要由調用者負責釋放。
2、(type & PTYPE_TAG_ALLOCSESSION) > 0
會從sc的session計數器分配一個session.
處理完后,type會合並到sz的高8位。
最后一步就是投遞到接收者的信箱了,根據接收者句柄判斷是否為遠程節點,如果是就用harbo發送。(內置的集群方案,現在已經不推薦使用)。成功返回session,失敗返回-1,並且釋放data.
工作線程的控制
skynet運行后,會啟動固定的線程來輪流調度sc(skynet_context),線程數由配置文件中的thread字段定義,默認是4個。那它是如何控制這些線程的呢?具體實現在/skynet-src/skynet_start.c中。
在208行,啟動了工作線程:
static int weight[] = { -1, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, }; struct worker_parm wp[thread]; for (i=0;i<thread;i++) { wp[i].m = m; wp[i].id = i; if (i < sizeof(weight)/sizeof(weight[0])) { wp[i].weight= weight[i]; } else { wp[i].weight = 0; } create_thread(&pid[i+3], thread_worker, &wp[i]); }
直接來看線程函數thread_worker把,在152行:
1 static void *
2 thread_worker(void *p) { 3 struct worker_parm *wp = p; 4 int id = wp->id; 5 int weight = wp->weight; 6 struct monitor *m = wp->m; 7 struct skynet_monitor *sm = m->m[id]; 8 skynet_initthread(THREAD_WORKER); 9 struct message_queue * q = NULL; 10 while (!m->quit) { 11 q = skynet_context_message_dispatch(sm, q, weight); 12 if (q == NULL) { 13 if (pthread_mutex_lock(&m->mutex) == 0) { 14 ++ m->sleep; 15 // "spurious wakeup" is harmless, 16 // because skynet_context_message_dispatch() can be call at any time.
17 if (!m->quit) 18 pthread_cond_wait(&m->cond, &m->mutex); 19 -- m->sleep; 20 if (pthread_mutex_unlock(&m->mutex)) { 21 fprintf(stderr, "unlock mutex error"); 22 exit(1); 23 } 24 } 25 } 26 } 27 return NULL; 28 }
控制這種生命周期與進程一致的工作線程,主要有兩個細節:1、均勻不重復的分配任務。2、不空轉、最小時延。前者處理線程同步就好。來看看skynet是如何處理后者的吧:
它用得是條件變量來處理空轉的,用條件變量有兩點好處:1、讓出cpu時間片.2、由外部決定何時喚醒,這樣可以在有任務時再喚醒,既能最大化的不空轉,又能減小處理任務的時延。
具體實現是條件變量的標准應用了,和《unix高級編程》條件變量的例子幾乎一樣。這里還有一個sleep的計數,有什么用呢?用來判斷要不要調用pthread_cond_signal的。
最后還有一個問題,等待的線程是在哪里被喚醒的呢?在socket線程和timer線程里喚醒的,前者有socket消息時會調用一次,后者每個刷新時間會喚醒一次。
信箱的調度
上一篇時,在sc里我們看到過一個message_queue類型的字段,這就是信箱。skynet中用了兩種隊列來存儲消息並完成調度,下面稱為12級隊列,1級隊列是一個單鏈表,每個節點是2級隊列,2級隊列(message_queue)是一個自動擴展的循環隊列,用來存儲消息。這兩個隊列實現在/skynet-src/skynet_mq.c中,實現的很簡單,並沒有用復雜的無鎖結構,而是自旋鎖保證線程安全的鏈表,循環隊列。
信箱的調度就是12級隊列的調度,整體結構描述如下:
while(1){
1級隊列出隊;
調度2級隊列;
1級隊列入隊;
}
這部分實現在/skynet-src/skynet_server的275行skynet_context_message_dispatch()中:
1 struct message_queue *
2 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) { 3 if (q == NULL) { 4 q = skynet_globalmq_pop(); 5 if (q==NULL) 6 return NULL; 7 } 8
9 uint32_t handle = skynet_mq_handle(q); 10
11 struct skynet_context * ctx = skynet_handle_grab(handle); 12 if (ctx == NULL) { 13 struct drop_t d = { handle }; 14 skynet_mq_release(q, drop_message, &d); 15 return skynet_globalmq_pop(); 16 } 17
18 int i,n=1; 19 struct skynet_message msg; 20
21 for (i=0;i<n;i++) { 22 if (skynet_mq_pop(q,&msg)) { 23 skynet_context_release(ctx); 24 return skynet_globalmq_pop(); 25 } else if (i==0 && weight >= 0) { 26 n = skynet_mq_length(q); 27 n >>= weight; 28 } 29 int overload = skynet_mq_overload(q); 30 if (overload) { 31 skynet_error(ctx, "May overload, message queue length = %d", overload); 32 } 33
34 skynet_monitor_trigger(sm, msg.source , handle); 35
36 if (ctx->cb == NULL) { 37 skynet_free(msg.data); 38 } else { 39 dispatch_message(ctx, &msg); 40 } 41
42 skynet_monitor_trigger(sm, 0,0); 43 } 44
45 assert(q == ctx->queue); 46 struct message_queue *nq = skynet_globalmq_pop(); 47 if (nq) { 48 // If global mq is not empty , push q back, and return next queue (nq) 49 // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
50 skynet_globalmq_push(q); 51 q = nq; 52 } 53 skynet_context_release(ctx); 54
55 return q; 56 }
這個函數的作用是,調度傳入的2級隊列,並返回下一個可調度的2級隊列。在上面的實現中,有四個細節之處:
1、22-24行,當2級隊列為空時並沒有將其壓入1級隊列,那它從此就消失了嗎?不,這樣做是為了減少空轉1級隊列,那這個2級隊列是什么時候壓回的呢?在message_queue中,有一個
in_global標記是否在1級隊列中,當2級隊列的出隊(skynet_mq_pop)失敗時,這個標記就會被置0,在2級隊列入隊時(skynet_mq_push)會判斷這個標記,如果為0,那么就會將自己壓入1級隊列。(skynet_mq_mark_release也會判斷)所以這個2級隊列在下次入隊時會壓回。
2、25-27,修改了for循環的次數,也就是每次調度處理多少條消息。這個次數與傳入的weight有關,我們回過頭來看這個weight是從哪里來的,源頭在工作線程創建時:
static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
struct worker_parm wp[thread];
for (i=0;i<thread;i++) {
wp[i].m = m;
wp[i].id = i;
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}
create_thread(&pid[i+3], thread_worker, &wp[i]);
}
再來看看 n >>= weight,嗯,大致就是:把工作線程分為組,前四組每組8個,超過的歸入第5組,AE組每次調度處理一條消息,B組每次處理(n/2)條,C組每次處理(n/4)條,D組每次處理(n/8)條。是為了均勻的使用多核。
3、29-32做了一個負載判斷,負載的閥值是1024。不過也僅僅是輸出一條log提醒一下而以.
4、34、42觸發了一下monitor,這個監控是用來檢測消息處理是否發生了死循環,不過也僅僅只是輸出一條log提醒一下。這個檢測是放在一個專門的監控線程里做的,判斷死循環的時間是5秒。具體機制這里就不說了,其實現在/skynet-src/skynet_monitor.c中
消息分發
信箱調度時,從2級隊列取出消息后就會調用dispatch_message函數做分發,在/skynet-src/skynet_server.c中:
1 static void
2 dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) { 3 assert(ctx->init); 4 CHECKCALLING_BEGIN(ctx) 5 pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle)); 6 int type = msg->sz >> MESSAGE_TYPE_SHIFT; 7 size_t sz = msg->sz & MESSAGE_TYPE_MASK; 8 if (ctx->logfile) { 9 skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz); 10 } 11 if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) { 12 skynet_free(msg->data); 13 } 14 CHECKCALLING_END(ctx) 15 }
step1:將sc句柄保存在線程本地變量中.
step2:如果開啟了錄像功能,就將data的數據dump到日志文件
step3:調用sc的回調函數,根據返回值覺得是否釋放data,0釋放,1不釋放.