skynet是基于多线程的,每个actor都会被单独的线程调度,且每个actor可以杀死其它actor,给其它actor发送消息,创建actor,也就是一个actor可能被多个线程持有,那么就会面临三个问题:
- 一个actor被同时使用时,如何安全释放。
- actor被释放后,外部使用时如何检测该actor已经无效了,以便流程能继续。
- 若信箱里的消息具有请求回应语义,那么如果通知消息源。
框架使用的是handle映射与引用计数的手法,对外暴露sc(skynet_context)的handle,而不是指针。这个模块实现在/skynet-src/skynet_handle.c中,来分析一下其具体原理,从头文件的接口入手:
skynet_handle.h
1 #ifndef SKYNET_CONTEXT_HANDLE_H 2 #define SKYNET_CONTEXT_HANDLE_H
3
4 #include <stdint.h>
5
6 // reserve high 8 bits for remote id
7 #define HANDLE_MASK 0xffffff
8 #define HANDLE_REMOTE_SHIFT 24
9
10 struct skynet_context; 11
12 uint32_t skynet_handle_register(struct skynet_context *); 13 int skynet_handle_retire(uint32_t handle); 14 struct skynet_context * skynet_handle_grab(uint32_t handle); 15 void skynet_handle_retireall(); 16
17 uint32_t skynet_handle_findname(const char * name); 18 const char * skynet_handle_namehandle(uint32_t handle, const char *name); 19
20 void skynet_handle_init(int harbor); 21
22 #endif
handle是一个uint32_t的整数,高8位表示远程节点(这是框架自带的集群设施,后面的分析都会无视该部分,一来它不是框架核心,二来这个集群设施已经不被推荐使用)。
先来看看它内部的数据结构:
#define DEFAULT_SLOT_SIZE 4
#define MAX_SLOT_SIZE 0x40000000
struct handle_name { char * name; uint32_t handle; }; struct handle_storage { struct rwlock lock; uint32_t harbor; uint32_t handle_index; int slot_size; struct skynet_context ** slot; int name_cap; int name_count; struct handle_name *name; }; static struct handle_storage *H = NULL;
看上去就一个sc的数组,看不出什么,看其它的方法吧,skynet_handle_register:
1 uint32_t 2 skynet_handle_register(struct skynet_context *ctx) { 3 struct handle_storage *s = H; 4
5 rwlock_wlock(&s->lock); 6
7 for (;;) { 8 int i; 9 for (i=0;i<s->slot_size;i++) { 10 uint32_t handle = (i+s->handle_index) & HANDLE_MASK; 11 int hash = handle & (s->slot_size-1); 12 if (s->slot[hash] == NULL) { 13 s->slot[hash] = ctx; 14 s->handle_index = handle + 1; 15
16 rwlock_wunlock(&s->lock); 17
18 handle |= s->harbor; 19 return handle; 20 } 21 } 22 assert((s->slot_size*2 - 1) <= HANDLE_MASK); 23 struct skynet_context ** new_slot = skynet_malloc(s->slot_size * 2 * sizeof(struct skynet_context *)); 24 memset(new_slot, 0, s->slot_size * 2 * sizeof(struct skynet_context *)); 25 for (i=0;i<s->slot_size;i++) { 26 int hash = skynet_context_handle(s->slot[i]) & (s->slot_size * 2 - 1); 27 assert(new_slot[hash] == NULL); 28 new_slot[hash] = s->slot[i]; 29 } 30 skynet_free(s->slot); 31 s->slot = new_slot; 32 s->slot_size *= 2; 33 } 34 }
这个方法是添加一个sc的handle映射。
从代码看,是一种hash映射,用读写锁来保证线程安全。9-21行是哈希值的选取过程,就是一个自增长的整数,每计算一次就加1,用handle_index做计数器,用取模来映射到sc数组上。用二次探测法来解决冲突,第9行循环保证冲突探测会覆盖整个数组。
到22行,就说明数组满了,此时会成倍扩展原数组,将handle在新数组上重新取模映射一遍。这种hash的规则有两点好处:1、hash值不会重复.2、查找过程是真正O(1)的.
从第22行和handle_index的修改处,可以知道这个函数基于两个前提:1、数组大小不会超过0xffffff.2、handle_index没有处理溢出的情况,可能为0,也就是假定不会溢出。个人觉得handle_index还是处理一下溢出的情况较好,如果大于0xffffff,就设为1。
再来看看skynet_handle_grab:
1 struct skynet_context *
2 skynet_handle_grab(uint32_t handle) { 3 struct handle_storage *s = H; 4 struct skynet_context * result = NULL; 5
6 rwlock_rlock(&s->lock); 7
8 uint32_t hash = handle & (s->slot_size-1); 9 struct skynet_context * ctx = s->slot[hash]; 10 if (ctx && skynet_context_handle(ctx) == handle) { 11 result = ctx; 12 skynet_context_grab(result); 13 } 14
15 rwlock_runlock(&s->lock); 16
17 return result; 18 }
这个函数作用是根据handle查找对应的sc,handle无效就返回NULL.上的是读锁,查找过程很简单,将handle取模,然后判断索引处的元素的handle是否一致。引用计数保存在sc里,并没有在本模块中,其实应该放在本模块中更为纯粹,sc里只需要知道如何释放自己就行了。查找成功会增加sc的计数(skynet_context_grab)。
再来看看skynet_handle_retire:
1 int
2 skynet_handle_retire(uint32_t handle) { 3 int ret = 0; 4 struct handle_storage *s = H; 5
6 rwlock_wlock(&s->lock); 7
8 uint32_t hash = handle & (s->slot_size-1); 9 struct skynet_context * ctx = s->slot[hash]; 10
11 if (ctx != NULL && skynet_context_handle(ctx) == handle) { 12 s->slot[hash] = NULL; 13 ret = 1; 14 int i; 15 int j=0, n=s->name_count; 16 for (i=0; i<n; ++i) { 17 if (s->name[i].handle == handle) { 18 skynet_free(s->name[i].name); 19 continue; 20 } else if (i!=j) { 21 s->name[j] = s->name[i]; 22 } 23 ++j; 24 } 25 s->name_count = j; 26 } else { 27 ctx = NULL; 28 } 29
30 rwlock_wunlock(&s->lock); 31
32 if (ctx) { 33 // release ctx may call skynet_handle_* , so wunlock first.
34 skynet_context_release(ctx); 35 } 36
37 return ret; 38 }
这个函数的作用是解除handle映射,而不是递减引用计数。
具体实现有两步:1、清空handle对应的槽,调用skynet_context_release.2、如果有注册命名,删除对应的节点。
其实将释放sc的控制放在本模块会更好。
其余的方法就是handle命名的支持,名称映射保存在数组中,按字典序排序,查找时用二分查找法。
现在可以看sc生命周期具体的场景了,看两个地方就行了:
- 消息调度处,skynet_context_message_dispatch函数里.
- sc的对外接口,主要是skynet_command.
在skynet_context_message_dispatch里可以看到(/skynet-src/skynet_server.c的285行):
struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { struct drop_t d = { handle }; skynet_mq_release(q, drop_message, &d); return skynet_globalmq_pop(); }
通过skynet_handle_grab做了sc无效的检测,也就解决了开头提出的问题2。sc其它的对外接口也做了这样的判断。
那么剩下就是问题1,安全释放的问题。来看sc的对外释放接口,cmd_exit,cmd_kill,调的都是handle_exit:
1 static void
2 handle_exit(struct skynet_context * context, uint32_t handle) { 3 if (handle == 0) { 4 handle = context->handle; 5 skynet_error(context, "KILL self"); 6 } else { 7 skynet_error(context, "KILL :%0x", handle); 8 } 9 if (G_NODE.monitor_exit) { 10 skynet_send(context, handle, G_NODE.monitor_exit, PTYPE_CLIENT, 0, NULL, 0); 11 } 12 skynet_handle_retire(handle); 13 }
这个函数最终调的skynet_handle_retire,它解除handle映射后调的是skynet_context_release。
来看看skynet_context_release:
1 static void
2 delete_context(struct skynet_context *ctx) { 3 if (ctx->logfile) { 4 fclose(ctx->logfile); 5 } 6 skynet_module_instance_release(ctx->mod, ctx->instance); 7 skynet_mq_mark_release(ctx->queue); 8 CHECKCALLING_DESTROY(ctx) 9 skynet_free(ctx); 10 context_dec(); 11 } 12
13 struct skynet_context *
14 skynet_context_release(struct skynet_context *ctx) { 15 if (ATOM_DEC(&ctx->ref) == 0) { 16 delete_context(ctx); 17 return NULL; 18 } 19 return ctx; 20 }
引用计数为0后就会释放sc,那么问题1是这样来保证的:
调用handle_exit后会有两种情况:
1、其它逻辑流已经获取了sc,那么引用计数一定大于0,此时不会释放sc,当最后一个逻辑流递减引用计数时才会释放,是安全的。
2、sc被释放,其它逻辑流开始skynet_handle_grab,因为handle映射已经解除,所有查找无效,逻辑流可以知晓这一情况作出判断,是安全的。
sc释放时,没有释放信箱(message_queue),仅调用了skynet_mq_mark_release设置了释放标志,那它在哪里释放的呢?先来想想这样一个情况,如果sc释放了,信箱没被释放,那么skynet_handle_grab就会查找失败,而信箱还会在1级队列中,那么释放的地方只可能在skynet_context_message_dispatch里,回过头来看看它,就是在判断sc无效的分支里,调用了skynet_mq_release释放的信箱。
为什么信箱要独立出来分释放,而不和sc一起释放?因为sc是通过引用计数释放的,释放时机不明确,可能在任意一个逻辑流中,那么消息调度中是否应该将它压回1级队列就无法判断了,所以要独立出来。
只剩问题3的解决了,这只需要看信箱释放时是如何处理消息的就行了,在/skynet-src/skynet_server.c的drop_message里:
static void drop_message(struct skynet_message *msg, void *ud) { struct drop_t *d = ud; skynet_free(msg->data); uint32_t source = d->handle; assert(source); // report error to the message source
skynet_send(NULL, source, msg->source, PTYPE_ERROR, 0, NULL, 0); }
通过向消息源发送一条PTYPE_ERROR来解决,这样期望收到回应的sc就有机会结束这条挂起的流程了。不过有个疑问,为什么回应时不带上session,难道要消息源自己查找信箱么?这点再消息分发的时候再看吧。
如果没有gc,那么在多线程编程中,如何安全释放资源是一定会面临的问题。通常将它独立到另外的模块中解决,有两种常用的方法:
- 本文的handle映射和引用计数。c++中通常用智能指针,通过析构、拷贝构造函数自动来加减引用计数做强制保证。个人觉得前者更为灵活。
- 释放时只打上标记,以一定频率定时回收资源。
ps:还是gc好,写代码时没有心里负担。