skynet源碼分析3:消息調度


從四個方面來說:

  1、消息發送

  2、工作線程控制

  3、信箱調度

  4、消息分發

與調度相關的代碼實現在/skynet-src/skynet_mq.c,/skynet-src/skynet_start.c,/skynet-src/skynet_server.c三個文件中,整體上是一個m:n的調度器。


消息發送


skynet的消息定義在/skynet-src/skynet_mq.h中:

 

struct skynet_message { uint32_t source; int session; void * data; size_t sz; }; // type is encoding in skynet_message.sz high 8bit
#define MESSAGE_TYPE_MASK (SIZE_MAX >> 8)
#define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)

 

source:消息源(sc)的句柄。

session:用來做上下文的標識

data:消息指針

sz:消息長度,消息的請求類型定義在高8位

消息發出后,會被保存在接收者sc的信箱(message_queue字段)中,發送消息也就是向信箱壓入一條消息。來看看發送函數吧,在/skynet-src/skynet_server.c中:

 1 int
 2 skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {  3     if ((sz & MESSAGE_TYPE_MASK) != sz) {  4         skynet_error(context, "The message to %x is too large", destination);  5         if (type & PTYPE_TAG_DONTCOPY) {  6  skynet_free(data);  7  }  8         return -1;  9  } 10     _filter_args(context, type, &session, (void **)&data, &sz); 11 
12     if (source == 0) { 13         source = context->handle; 14  } 15 
16     if (destination == 0) { 17         return session; 18  } 19     if (skynet_harbor_message_isremote(destination)) { 20         struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg)); 21         rmsg->destination.handle = destination; 22         rmsg->message = data; 23         rmsg->sz = sz; 24  skynet_harbor_send(rmsg, source, session); 25     } else { 26         struct skynet_message smsg; 27         smsg.source = source; 28         smsg.session = session; 29         smsg.data = data; 30         smsg.sz = sz; 31 
32         if (skynet_context_push(destination, &smsg)) { 33  skynet_free(data); 34             return -1; 35  } 36  } 37     return session; 38 }

3-9行對消息長度做了限制,MESSAGE_TYPE_MASK等於(SIZE_MAX >> 8),也就是最大只能為224,16MB。

_filter_args根據type做了兩個處理:

1、(type & PTYPE_TAG_DONTCOPY) == 0

  會將data復制一份用作實際發送,這種情況下原來的data就要由調用者負責釋放。

2、(type & PTYPE_TAG_ALLOCSESSION) > 0

  會從sc的session計數器分配一個session.

處理完后,type會合並到sz的高8位。

最后一步就是投遞到接收者的信箱了,根據接收者句柄判斷是否為遠程節點,如果是就用harbo發送。(內置的集群方案,現在已經不推薦使用)。成功返回session,失敗返回-1,並且釋放data.

 


 

工作線程的控制

skynet運行后,會啟動固定的線程來輪流調度sc(skynet_context),線程數由配置文件中的thread字段定義,默認是4個。那它是如何控制這些線程的呢?具體實現在/skynet-src/skynet_start.c中。

在208行,啟動了工作線程:

static int weight[] = { -1, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, }; struct worker_parm wp[thread]; for (i=0;i<thread;i++) { wp[i].m = m; wp[i].id = i; if (i < sizeof(weight)/sizeof(weight[0])) { wp[i].weight= weight[i]; } else { wp[i].weight = 0; } create_thread(&pid[i+3], thread_worker, &wp[i]); }

直接來看線程函數thread_worker把,在152行:

 1 static void *
 2 thread_worker(void *p) {  3     struct worker_parm *wp = p;  4     int id = wp->id;  5     int weight = wp->weight;  6     struct monitor *m = wp->m;  7     struct skynet_monitor *sm = m->m[id];  8  skynet_initthread(THREAD_WORKER);  9     struct message_queue * q = NULL; 10     while (!m->quit) { 11         q = skynet_context_message_dispatch(sm, q, weight); 12         if (q == NULL) { 13             if (pthread_mutex_lock(&m->mutex) == 0) { 14                 ++ m->sleep; 15                 // "spurious wakeup" is harmless, 16                 // because skynet_context_message_dispatch() can be call at any time.
17                 if (!m->quit) 18                     pthread_cond_wait(&m->cond, &m->mutex); 19                 -- m->sleep; 20                 if (pthread_mutex_unlock(&m->mutex)) { 21                     fprintf(stderr, "unlock mutex error"); 22                     exit(1); 23  } 24  } 25  } 26  } 27     return NULL; 28 }

控制這種生命周期與進程一致的工作線程,主要有兩個細節:1、均勻不重復的分配任務。2、不空轉、最小時延。前者處理線程同步就好。來看看skynet是如何處理后者的吧:

它用得是條件變量來處理空轉的,用條件變量有兩點好處:1、讓出cpu時間片.2、由外部決定何時喚醒,這樣可以在有任務時再喚醒,既能最大化的不空轉,又能減小處理任務的時延。

具體實現是條件變量的標准應用了,和《unix高級編程》條件變量的例子幾乎一樣。這里還有一個sleep的計數,有什么用呢?用來判斷要不要調用pthread_cond_signal的。

最后還有一個問題,等待的線程是在哪里被喚醒的呢?在socket線程和timer線程里喚醒的,前者有socket消息時會調用一次,后者每個刷新時間會喚醒一次。


信箱的調度

上一篇時,在sc里我們看到過一個message_queue類型的字段,這就是信箱。skynet中用了兩種隊列來存儲消息並完成調度,下面稱為12級隊列,1級隊列是一個單鏈表,每個節點是2級隊列,2級隊列(message_queue)是一個自動擴展的循環隊列,用來存儲消息。這兩個隊列實現在/skynet-src/skynet_mq.c中,實現的很簡單,並沒有用復雜的無鎖結構,而是自旋鎖保證線程安全的鏈表,循環隊列。

信箱的調度就是12級隊列的調度,整體結構描述如下:

while(1){

  1級隊列出隊;

  調度2級隊列;

  1級隊列入隊;

}

這部分實現在/skynet-src/skynet_server的275行skynet_context_message_dispatch()中:

 1 struct message_queue * 
 2 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {  3     if (q == NULL) {  4         q = skynet_globalmq_pop();  5         if (q==NULL)  6             return NULL;  7  }  8 
 9     uint32_t handle = skynet_mq_handle(q); 10 
11     struct skynet_context * ctx = skynet_handle_grab(handle); 12     if (ctx == NULL) { 13         struct drop_t d = { handle }; 14         skynet_mq_release(q, drop_message, &d); 15         return skynet_globalmq_pop(); 16  } 17 
18     int i,n=1; 19     struct skynet_message msg; 20 
21     for (i=0;i<n;i++) { 22         if (skynet_mq_pop(q,&msg)) { 23  skynet_context_release(ctx); 24             return skynet_globalmq_pop(); 25         } else if (i==0 && weight >= 0) { 26             n = skynet_mq_length(q); 27             n >>= weight; 28  } 29         int overload = skynet_mq_overload(q); 30         if (overload) { 31             skynet_error(ctx, "May overload, message queue length = %d", overload); 32  } 33 
34  skynet_monitor_trigger(sm, msg.source , handle); 35 
36         if (ctx->cb == NULL) { 37  skynet_free(msg.data); 38         } else { 39             dispatch_message(ctx, &msg); 40  } 41 
42         skynet_monitor_trigger(sm, 0,0); 43  } 44 
45     assert(q == ctx->queue); 46     struct message_queue *nq = skynet_globalmq_pop(); 47     if (nq) { 48         // If global mq is not empty , push q back, and return next queue (nq) 49         // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
50  skynet_globalmq_push(q); 51         q = nq; 52  } 53  skynet_context_release(ctx); 54 
55     return q; 56 }

這個函數的作用是,調度傳入的2級隊列,並返回下一個可調度的2級隊列。在上面的實現中,有四個細節之處:

1、22-24行,當2級隊列為空時並沒有將其壓入1級隊列,那它從此就消失了嗎?不,這樣做是為了減少空轉1級隊列,那這個2級隊列是什么時候壓回的呢?在message_queue中,有一個

in_global標記是否在1級隊列中,當2級隊列的出隊(skynet_mq_pop)失敗時,這個標記就會被置0,在2級隊列入隊時(skynet_mq_push)會判斷這個標記,如果為0,那么就會將自己壓入1級隊列。(skynet_mq_mark_release也會判斷)所以這個2級隊列在下次入隊時會壓回。

2、25-27,修改了for循環的次數,也就是每次調度處理多少條消息。這個次數與傳入的weight有關,我們回過頭來看這個weight是從哪里來的,源頭在工作線程創建時:

static int weight[] = { 
        -1, -1, -1, -1, 0, 0, 0, 0,
        1, 1, 1, 1, 1, 1, 1, 1, 
        2, 2, 2, 2, 2, 2, 2, 2, 
        3, 3, 3, 3, 3, 3, 3, 3, };
    struct worker_parm wp[thread];
    for (i=0;i<thread;i++) {
        wp[i].m = m;
        wp[i].id = i;
        if (i < sizeof(weight)/sizeof(weight[0])) {
            wp[i].weight= weight[i];
        } else {
            wp[i].weight = 0;
        }
        create_thread(&pid[i+3], thread_worker, &wp[i]);
    }

再來看看 n >>= weight,嗯,大致就是:把工作線程分為組,前四組每組8個,超過的歸入第5組,AE組每次調度處理一條消息,B組每次處理(n/2)條,C組每次處理(n/4)條,D組每次處理(n/8)條。是為了均勻的使用多核。

3、29-32做了一個負載判斷,負載的閥值是1024。不過也僅僅是輸出一條log提醒一下而以.

4、34、42觸發了一下monitor,這個監控是用來檢測消息處理是否發生了死循環,不過也僅僅只是輸出一條log提醒一下。這個檢測是放在一個專門的監控線程里做的,判斷死循環的時間是5秒。具體機制這里就不說了,其實現在/skynet-src/skynet_monitor.c中


消息分發

信箱調度時,從2級隊列取出消息后就會調用dispatch_message函數做分發,在/skynet-src/skynet_server.c中:

 1 static void
 2 dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {  3     assert(ctx->init);  4  CHECKCALLING_BEGIN(ctx)  5     pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));  6     int type = msg->sz >> MESSAGE_TYPE_SHIFT;  7     size_t sz = msg->sz & MESSAGE_TYPE_MASK;  8     if (ctx->logfile) {  9         skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz); 10  } 11     if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) { 12         skynet_free(msg->data); 13  } 14  CHECKCALLING_END(ctx) 15 }

step1:將sc句柄保存在線程本地變量中.

step2:如果開啟了錄像功能,就將data的數據dump到日志文件

step3:調用sc的回調函數,根據返回值覺得是否釋放data,0釋放,1不釋放.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM