redis簡單地實現了訂閱發布功能。
pubsub涉及到的結構主要是下面兩個:
typedef struct redisClient { ... dict *pubsub_channels; //該client訂閱的channels,以channel為key用dict的方式組織 list *pubsub_patterns; //該client訂閱的pattern,以list的方式組織 ... } redisClient; struct redisServer { ... dict *pubsub_channels; //redis server進程中維護的channel dict,它以channel為key,訂閱channel的client list為value list *pubsub_patterns; //redis server進程中維護的pattern list int notify_keyspace_events; ... };
沒搞懂的是在redisClient中,為什么channel和pattern一個用dict一個用list?
對應的command:
struct redisCommand redisCommandTable[] = { ... {"subscribe", subscribeCommand, -2,"rpslt",0,NULL,0,0,0,0,0}, //channel訂閱命令 {"unsubscribe", unsubscribeCommand, -1,"rpslt",0,NULL,0,0,0,0,0}, //channel退訂命令 {"psubscribe", psubscribeCommand, -2,"rpslt",0,NULL,0,0,0,0,0}, //pattern訂閱命令 {"punsubscribe", punsubscribeCommand, -1,"rpslt",0,NULL,0,0,0,0,0}, //pattern退訂命令 {"publish", publishCommand, 3,"pltrF",0,NULL,0,0,0,0,0}, //消息發布命令 {"pubsub", pubsubCommand, -2,"pltrR",0,NULL,0,0,0,0,0}, //pubsub命令,用於輸出channel相關的統計信息 ... }
pattern的匹配,里面調用的equalStringObjects就是redis實現的正則匹配:
int listMatchPubsubPattern(void *a, void *b) { pubsubPattern *pa = a, *pb = b; return (pa->client == pb->client) && (equalStringObjects(pa->pattern,pb->pattern)); }
訂閱某個channel的核心操作
int pubsubSubscribeChannel(redisClient *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0; /* Add the channel to the client -> channels hash table */ /* 以channel為key加到c->pubsub_channels當中 */ if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ /* server.pubsub_channels記錄了所有被訂閱的channel以及訂閱特定channel的clients, 這里把c加到該channel對應的列表當中 */ de = dictFind(server.pubsub_channels,channel); if (de == NULL) { clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ /* 給客戶端生成答復 */ addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,clientSubscriptionsCount(c)); return retval; }
退訂某個channel的核心操作
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { dictEntry *de; list *clients; listNode *ln; int retval = 0; /* Remove the channel from the client -> channels hash table */ /* 為什么需要保護起來? * 考慮到redis本身是單進程單線程的,所以這里不是為了防止在別的地方被free掉 */ incrRefCount(channel); /* channel may be just a pointer to the same object we have in the hash tables. Protect it... */ /* 從該client的訂閱dict中移除該channel */ if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1; /* Remove the client from the channel -> clients list hash table */ /* 從server.pubsub_channels中該channel維護的client列表上刪除該client */ de = dictFind(server.pubsub_channels,channel); redisAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); redisAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); /* 如果該channel上沒有其它訂閱者,則從server.pubsub_channels上刪除該channel */ if (listLength(clients) == 0) { /* Free the list and associated hash entry at all if this was * the latest client, so that it will be possible to abuse * Redis PUBSUB creating millions of channels. */ dictDelete(server.pubsub_channels,channel); } } /* Notify the client */ /* 如果需要的話,生成回復 */ if (notify) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.unsubscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } /* 減少引用計數 */ decrRefCount(channel); /* it is finally safe to release it */ return retval; }
訂閱/退訂pattern的操作也很類似,就不貼代碼了。
它還提供了pubsubUnsubscribeAllChannels和pubsubUnsubscribeAllPatterns,用於一次性退訂所有的channels/patters。實現上就是循環調用相應的退訂函數。
發布消息的核心操作
int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; listNode *ln; listIter li; /* Send to clients listening for that channel */ /* 先找到該channel */ de = dictFind(server.pubsub_channels,channel); if (de) { list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); /* 對所有訂閱該頻道的client發送消息 */ while ((ln = listNext(&li)) != NULL) { redisClient *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); addReplyBulk(c,message); receivers++; } } /* Send to clients listening to matching channels */ /* 如果pattern也有client在訂閱,那么還要進行模式的匹配並發送消息給相應的client */ if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; } } /* 釋放getDecodedObject返回的對象 */ decrRefCount(channel); } return receivers; }
有了上面的幾個核心操作,subscribe/unsubscribe, psubscribe/punsubscribe, publish幾個操作基本上就是直接調用上面的函數,這里就不貼代碼了。
有一點提一下:發布/訂閱跟具體的db無關,只跟client和具體的channel/pattern有關。
redis還提供一個pubsub命令,用於輸出當前訂閱的總體情況:
void pubsubCommand(redisClient *c) { if (!strcasecmp(c->argv[1]->ptr,"channels") && (c->argc == 2 || c->argc ==3)) { /* PUBSUB CHANNELS [<pattern>] */ /* 如果沒有指定pattern,則輸出所有channel的信息 * 如果指定了pattern,則輸出跟pattern匹配的所有channel信息 */ sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr; dictIterator *di = dictGetIterator(server.pubsub_channels); dictEntry *de; long mblen = 0; void *replylen; replylen = addDeferredMultiBulkLength(c); while((de = dictNext(di)) != NULL) { robj *cobj = dictGetKey(de); sds channel = cobj->ptr; if (!pat || stringmatchlen(pat, sdslen(pat), channel, sdslen(channel),0)) { addReplyBulk(c,cobj); mblen++; } } dictReleaseIterator(di); setDeferredMultiBulkLength(c,replylen,mblen); } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) { /* PUBSUB NUMSUB [Channel_1 ... Channel_N] */ int j; /* 輸出指定channel上client的數量 */ addReplyMultiBulkLen(c,(c->argc-2)*2); for (j = 2; j < c->argc; j++) { list *l = dictFetchValue(server.pubsub_channels,c->argv[j]); addReplyBulk(c,c->argv[j]); addReplyLongLong(c,l ? listLength(l) : 0); } } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) { /* PUBSUB NUMPAT */ /* 輸出pattern的數量 */ addReplyLongLong(c,listLength(server.pubsub_patterns)); } else { addReplyErrorFormat(c, "Unknown PUBSUB subcommand or wrong number of arguments for '%s'", (char*)c->argv[1]->ptr); } }
