skynet源碼分析4:actor生命周期管理


skynet是基於多線程的,每個actor都會被單獨的線程調度,且每個actor可以殺死其它actor,給其它actor發送消息,創建actor,也就是一個actor可能被多個線程持有,那么就會面臨三個問題:

  1. 一個actor被同時使用時,如何安全釋放。
  2. actor被釋放后,外部使用時如何檢測該actor已經無效了,以便流程能繼續。
  3. 若信箱里的消息具有請求回應語義,那么如果通知消息源。

框架使用的是handle映射與引用計數的手法,對外暴露sc(skynet_context)的handle,而不是指針。這個模塊實現在/skynet-src/skynet_handle.c中,來分析一下其具體原理,從頭文件的接口入手:

skynet_handle.h

 1 #ifndef SKYNET_CONTEXT_HANDLE_H  2 #define SKYNET_CONTEXT_HANDLE_H
 3 
 4 #include <stdint.h>
 5 
 6 // reserve high 8 bits for remote id
 7 #define HANDLE_MASK 0xffffff
 8 #define HANDLE_REMOTE_SHIFT 24
 9 
10 struct skynet_context; 11 
12 uint32_t skynet_handle_register(struct skynet_context *); 13 int skynet_handle_retire(uint32_t handle); 14 struct skynet_context * skynet_handle_grab(uint32_t handle); 15 void skynet_handle_retireall(); 16 
17 uint32_t skynet_handle_findname(const char * name); 18 const char * skynet_handle_namehandle(uint32_t handle, const char *name); 19 
20 void skynet_handle_init(int harbor); 21 
22 #endif

handle是一個uint32_t的整數,高8位表示遠程節點(這是框架自帶的集群設施,后面的分析都會無視該部分,一來它不是框架核心,二來這個集群設施已經不被推薦使用)。

先來看看它內部的數據結構:

#define DEFAULT_SLOT_SIZE 4
#define MAX_SLOT_SIZE 0x40000000

struct handle_name { char * name; uint32_t handle; }; struct handle_storage { struct rwlock lock; uint32_t harbor; uint32_t handle_index; int slot_size; struct skynet_context ** slot; int name_cap; int name_count; struct handle_name *name; }; static struct handle_storage *H = NULL;

看上去就一個sc的數組,看不出什么,看其它的方法吧,skynet_handle_register:

 1 uint32_t  2 skynet_handle_register(struct skynet_context *ctx) {  3     struct handle_storage *s = H;  4 
 5     rwlock_wlock(&s->lock);  6     
 7     for (;;) {  8         int i;  9         for (i=0;i<s->slot_size;i++) { 10             uint32_t handle = (i+s->handle_index) & HANDLE_MASK; 11             int hash = handle & (s->slot_size-1); 12             if (s->slot[hash] == NULL) { 13                 s->slot[hash] = ctx; 14                 s->handle_index = handle + 1; 15 
16                 rwlock_wunlock(&s->lock); 17 
18                 handle |= s->harbor; 19                 return handle; 20  } 21  } 22         assert((s->slot_size*2 - 1) <= HANDLE_MASK); 23         struct skynet_context ** new_slot = skynet_malloc(s->slot_size * 2 * sizeof(struct skynet_context *)); 24         memset(new_slot, 0, s->slot_size * 2 * sizeof(struct skynet_context *)); 25         for (i=0;i<s->slot_size;i++) { 26             int hash = skynet_context_handle(s->slot[i]) & (s->slot_size * 2 - 1); 27             assert(new_slot[hash] == NULL); 28             new_slot[hash] = s->slot[i]; 29  } 30         skynet_free(s->slot); 31         s->slot = new_slot; 32         s->slot_size *= 2; 33  } 34 }

這個方法是添加一個sc的handle映射。

從代碼看,是一種hash映射,用讀寫鎖來保證線程安全。9-21行是哈希值的選取過程,就是一個自增長的整數,每計算一次就加1,用handle_index做計數器,用取模來映射到sc數組上。用二次探測法來解決沖突,第9行循環保證沖突探測會覆蓋整個數組。

到22行,就說明數組滿了,此時會成倍擴展原數組,將handle在新數組上重新取模映射一遍。這種hash的規則有兩點好處:1、hash值不會重復.2、查找過程是真正O(1)的.

從第22行和handle_index的修改處,可以知道這個函數基於兩個前提:1、數組大小不會超過0xffffff.2、handle_index沒有處理溢出的情況,可能為0,也就是假定不會溢出。個人覺得handle_index還是處理一下溢出的情況較好,如果大於0xffffff,就設為1。

再來看看skynet_handle_grab:

 1 struct skynet_context * 
 2 skynet_handle_grab(uint32_t handle) {  3     struct handle_storage *s = H;  4     struct skynet_context * result = NULL;  5 
 6     rwlock_rlock(&s->lock);  7 
 8     uint32_t hash = handle & (s->slot_size-1);  9     struct skynet_context * ctx = s->slot[hash]; 10     if (ctx && skynet_context_handle(ctx) == handle) { 11         result = ctx; 12  skynet_context_grab(result); 13  } 14 
15     rwlock_runlock(&s->lock); 16 
17     return result; 18 }

這個函數作用是根據handle查找對應的sc,handle無效就返回NULL.上的是讀鎖,查找過程很簡單,將handle取模,然后判斷索引處的元素的handle是否一致。引用計數保存在sc里,並沒有在本模塊中,其實應該放在本模塊中更為純粹,sc里只需要知道如何釋放自己就行了。查找成功會增加sc的計數(skynet_context_grab)。

再來看看skynet_handle_retire:

 1 int
 2 skynet_handle_retire(uint32_t handle) {  3     int ret = 0;  4     struct handle_storage *s = H;  5 
 6     rwlock_wlock(&s->lock);  7 
 8     uint32_t hash = handle & (s->slot_size-1);  9     struct skynet_context * ctx = s->slot[hash]; 10 
11     if (ctx != NULL && skynet_context_handle(ctx) == handle) { 12         s->slot[hash] = NULL; 13         ret = 1; 14         int i; 15         int j=0, n=s->name_count; 16         for (i=0; i<n; ++i) { 17             if (s->name[i].handle == handle) { 18                 skynet_free(s->name[i].name); 19                 continue; 20             } else if (i!=j) { 21                 s->name[j] = s->name[i]; 22  } 23             ++j; 24  } 25         s->name_count = j; 26     } else { 27         ctx = NULL; 28  } 29 
30     rwlock_wunlock(&s->lock); 31 
32     if (ctx) { 33         // release ctx may call skynet_handle_* , so wunlock first.
34  skynet_context_release(ctx); 35  } 36 
37     return ret; 38 }

這個函數的作用是解除handle映射,而不是遞減引用計數。

具體實現有兩步:1、清空handle對應的槽,調用skynet_context_release.2、如果有注冊命名,刪除對應的節點。

其實將釋放sc的控制放在本模塊會更好。

其余的方法就是handle命名的支持,名稱映射保存在數組中,按字典序排序,查找時用二分查找法。

現在可以看sc生命周期具體的場景了,看兩個地方就行了:

  1. 消息調度處,skynet_context_message_dispatch函數里.
  2. sc的對外接口,主要是skynet_command.

在skynet_context_message_dispatch里可以看到(/skynet-src/skynet_server.c的285行):

struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { struct drop_t d = { handle }; skynet_mq_release(q, drop_message, &d); return skynet_globalmq_pop(); }

通過skynet_handle_grab做了sc無效的檢測,也就解決了開頭提出的問題2。sc其它的對外接口也做了這樣的判斷。

那么剩下就是問題1,安全釋放的問題。來看sc的對外釋放接口,cmd_exit,cmd_kill,調的都是handle_exit:

 1 static void
 2 handle_exit(struct skynet_context * context, uint32_t handle) {  3     if (handle == 0) {  4         handle = context->handle;  5         skynet_error(context, "KILL self");  6     } else {  7         skynet_error(context, "KILL :%0x", handle);  8  }  9     if (G_NODE.monitor_exit) { 10         skynet_send(context,  handle, G_NODE.monitor_exit, PTYPE_CLIENT, 0, NULL, 0); 11  } 12  skynet_handle_retire(handle); 13 }

這個函數最終調的skynet_handle_retire,它解除handle映射后調的是skynet_context_release。

來看看skynet_context_release:

 1 static void 
 2 delete_context(struct skynet_context *ctx) {  3     if (ctx->logfile) {  4         fclose(ctx->logfile);  5  }  6     skynet_module_instance_release(ctx->mod, ctx->instance);  7     skynet_mq_mark_release(ctx->queue);  8  CHECKCALLING_DESTROY(ctx)  9  skynet_free(ctx); 10  context_dec(); 11 } 12 
13 struct skynet_context * 
14 skynet_context_release(struct skynet_context *ctx) { 15     if (ATOM_DEC(&ctx->ref) == 0) { 16  delete_context(ctx); 17         return NULL; 18  } 19     return ctx; 20 }

引用計數為0后就會釋放sc,那么問題1是這樣來保證的:

調用handle_exit后會有兩種情況:

1、其它邏輯流已經獲取了sc,那么引用計數一定大於0,此時不會釋放sc,當最后一個邏輯流遞減引用計數時才會釋放,是安全的。

2、sc被釋放,其它邏輯流開始skynet_handle_grab,因為handle映射已經解除,所有查找無效,邏輯流可以知曉這一情況作出判斷,是安全的。

sc釋放時,沒有釋放信箱(message_queue),僅調用了skynet_mq_mark_release設置了釋放標志,那它在哪里釋放的呢?先來想想這樣一個情況,如果sc釋放了,信箱沒被釋放,那么skynet_handle_grab就會查找失敗,而信箱還會在1級隊列中,那么釋放的地方只可能在skynet_context_message_dispatch里,回過頭來看看它,就是在判斷sc無效的分支里,調用了skynet_mq_release釋放的信箱。

為什么信箱要獨立出來分釋放,而不和sc一起釋放?因為sc是通過引用計數釋放的,釋放時機不明確,可能在任意一個邏輯流中,那么消息調度中是否應該將它壓回1級隊列就無法判斷了,所以要獨立出來。

只剩問題3的解決了,這只需要看信箱釋放時是如何處理消息的就行了,在/skynet-src/skynet_server.c的drop_message里:

static void drop_message(struct skynet_message *msg, void *ud) { struct drop_t *d = ud; skynet_free(msg->data); uint32_t source = d->handle; assert(source); // report error to the message source
    skynet_send(NULL, source, msg->source, PTYPE_ERROR, 0, NULL, 0); }

通過向消息源發送一條PTYPE_ERROR來解決,這樣期望收到回應的sc就有機會結束這條掛起的流程了。不過有個疑問,為什么回應時不帶上session,難道要消息源自己查找信箱么?這點再消息分發的時候再看吧。

 


 

如果沒有gc,那么在多線程編程中,如何安全釋放資源是一定會面臨的問題。通常將它獨立到另外的模塊中解決,有兩種常用的方法:

  1.  本文的handle映射和引用計數。c++中通常用智能指針,通過析構、拷貝構造函數自動來加減引用計數做強制保證。個人覺得前者更為靈活。
  2. 釋放時只打上標記,以一定頻率定時回收資源。

ps:還是gc好,寫代碼時沒有心里負擔。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM