0.前言
redis對無序集合的操作幾個命令,本文介紹幾個命令實際操作過程。
1.sadd命令
void saddCommand(redisClient *c) {
robj *set;
int j, added = 0;
/*查找集合,如果不存在創建新的集合*/
set = lookupKeyWrite(c->db,c->argv[1]);
if (set == NULL) {
/*
*創建集合,如果添加的元素可以轉換為longlong類型,則存儲格式采用intset數據結構,否則采用hash table數據結構進行存儲
*/
set = setTypeCreate(c->argv[2]);
dbAdd(c->db,c->argv[1],set);
} else {
if (set->type != REDIS_SET) {
addReply(c,shared.wrongtypeerr);
return;
}
}
for (j = 2; j < c->argc; j++) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
/*元素添加進集合中*/
if (setTypeAdd(set,c->argv[j])) added++;
}
if (added) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
addReplyLongLong(c,added);
}
int setTypeAdd(robj *subject, robj *value) {
long long llval;
if (subject->encoding == REDIS_ENCODING_HT) {
if (dictAdd(subject->ptr,value,NULL) == DICT_OK) {
incrRefCount(value);
return 1;
}
} else if (subject->encoding == REDIS_ENCODING_INTSET) {
/*如果添加元素可以轉換為longlong類型,保存至intset中,否則需要轉換存儲結構為hash table*/
if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
/* 為了防止intset過大,set_max_intset_entries值作為一個閥值,占用空間大於此值,則將存儲結構轉換為hash table類型*/
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
setTypeConvert(subject,REDIS_ENCODING_HT);
return 1;
}
} else {
/* 轉換為longlong失敗,需要轉換為hash table*/
setTypeConvert(subject,REDIS_ENCODING_HT);
/* 新元素添加至hash table中*/
redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK);
incrRefCount(value);
return 1;
}
} else {
redisPanic("Unknown set encoding");
}
return 0;
}
2.求差集和並集命令(sdiff,sdiffstore,sunion,sunionstore)
sdiff求差集, sdiffstore求差集並保存結果, sunion求並集, sunionstore求並集並保存結果, 幾種運算過程都是通過sunionDiffGenericCommand函數進行,此處將幾個命令全部列出.
/*求並集*/
void sunionCommand(redisClient *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
}
/*求並集並保存結果*/
void sunionstoreCommand(redisClient *c) {
sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
}
/*求差集*/
void sdiffCommand(redisClient *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
}
/*求差集並保存結果*/
void sdiffstoreCommand(redisClient *c) {
sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
}
/*通用的求差集和並集函數*/
void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *ele, *dstset = NULL;
int j, cardinality = 0;
int diff_algo = 1;
/*取出需要操作的集合*/
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
lookupKeyWrite(c->db,setkeys[j]) :
lookupKeyRead(c->db,setkeys[j]);
if (!setobj) {
sets[j] = NULL;
continue;
}
if (checkType(c,setobj,REDIS_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
}
/*
*依據待運算集合中元素數量,選擇計算差集算法, 其中算法1時間復雜度:O(N*M), N是第一個集合中元素個數, M是參與運算的集合數量.
*算法2時間復雜度:O(N), N是所有集合中元素數量總和
*/
if (op == REDIS_OP_DIFF && sets[0]) {
long long algo_one_work = 0, algo_two_work = 0;
for (j = 0; j < setnum; j++) {
if (sets[j] == NULL) continue;
algo_one_work += setTypeSize(sets[0]);
algo_two_work += setTypeSize(sets[j]);
}
/*
*algo_one_work值即為算法1中N*M, algo_two_work值即為算法2中N. 考慮到如果參與運算集合為intset時, 算法1的時間復雜度穩定性要好於算法2,
*因此沒有直接比較兩者大小選擇算法, 而是算法1理論時間復雜度一半大於算法2時, 才使用算法2
*/
algo_one_work /= 2;
diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;
if (diff_algo == 1 && setnum > 1) {
/*為了提高算法1速度, 盡快找到重復元素, 對集合列表按照元素數量進行了降序排序*/
qsort(sets+1,setnum-1,sizeof(robj*),
qsortCompareSetsByRevCardinality);
}
}
/*創建一個臨時集合存放計算結果*/
dstset = createIntsetObject();
if (op == REDIS_OP_UNION) {
/* 求並集很簡單了, 直接遍歷所有元素, 添加進dstset集合中即可*/
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (setTypeAdd(dstset,ele)) cardinality++;
decrRefCount(ele);
}
setTypeReleaseIterator(si);
}
} else if (op == REDIS_OP_DIFF && sets[0] && diff_algo == 1) {
/*
*算法1對集合1進行遍歷, 並判斷集合1中的元素是否在其他集合中出現, 沒有出現則添加到dstset集合中, 作為差集的一個元素
*/
si = setTypeInitIterator(sets[0]);
/*
*循環外層對集合1進行遍歷, 內層對其他參與運算的集合進行遍歷
*/
while((ele = setTypeNextObject(si)) != NULL) {
for (j = 1; j < setnum; j++) {
if (!sets[j]) continue; /* no key is an empty set. */
if (sets[j] == sets[0]) break; /* same set! */
if (setTypeIsMember(sets[j],ele)) break;
}
if (j == setnum) {
/* 其他集合中沒有找到該元素, 添加到差集集合中*/
setTypeAdd(dstset,ele);
cardinality++;
}
decrRefCount(ele);
}
setTypeReleaseIterator(si);
} else if (op == REDIS_OP_DIFF && sets[0] && diff_algo == 2) {
/*
*算法2將集合1中元素直接copy進dstset集合中, 通過遍歷其他所有集合, 然后確認其他集合中的元素沒有在dstset中出現, 出現則從dstset中刪除, 最終獲取差集
*/
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (j == 0) {
/*集合1中元素添加進dstset中*/
if (setTypeAdd(dstset,ele)) cardinality++;
} else {
/*其他集合中元素出現在dstset中,則刪除該元素*/
if (setTypeRemove(dstset,ele)) cardinality--;
}
decrRefCount(ele);
}
setTypeReleaseIterator(si);
if (cardinality == 0) break;
}
}
if (!dstkey) {
/*運算結果不需要存儲,直接返回結果元素至客戶端*/
addReplyMultiBulkLen(c,cardinality);
si = setTypeInitIterator(dstset);
while((ele = setTypeNextObject(si)) != NULL) {
addReplyBulk(c,ele);
decrRefCount(ele);
}
setTypeReleaseIterator(si);
decrRefCount(dstset);
} else {
/* 需要存儲, 首先刪除原來可能已經存在dstkey的集合*/
int deleted = dbDelete(c->db,dstkey);
if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(REDIS_NOTIFY_SET,
op == REDIS_OP_UNION ? "sunionstore" : "sdiffstore",
dstkey,c->db->id);
} else {
decrRefCount(dstset);
addReply(c,shared.czero);
if (deleted)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
zfree(sets);
}
3.求交集命令(sinter,sinterstore)
sinter求交集, sinterstore求交集並保存結果, 都是通過sinterGenericCommand函數進行相應的操作
/*求交集*/
void sinterCommand(redisClient *c) {
sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
}
/*求交集並保存結果*/
void sinterstoreCommand(redisClient *c) {
sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
}
/*通用求交集函數*/
void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *eleobj, *dstset = NULL;
int64_t intobj;
void *replylen = NULL;
unsigned long j, cardinality = 0;
int encoding;
/*遍歷所有key, 讀出所有傳入的所有集合*/
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
lookupKeyWrite(c->db,setkeys[j]) :
lookupKeyRead(c->db,setkeys[j]);
if (!setobj) {
zfree(sets);
if (dstkey) {
if (dbDelete(c->db,dstkey)) {
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
addReply(c,shared.czero);
} else {
addReply(c,shared.emptymultibulk);
}
return;
}
if (checkType(c,setobj,REDIS_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
}
/* 按照集合中元素數量升序排列, 提高后面算法性能, 盡快決定元素是否是交集元素*/
qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);
/* The first thing we should output is the total number of elements...
* since this is a multi-bulk write, but at this stage we don't know
* the intersection set size, so we use a trick, append an empty object
* to the output list and save the pointer to later modify it with the
* right length */
if (!dstkey) {
replylen = addDeferredMultiBulkLength(c);
} else {
/* If we have a target key where to store the resulting set
* create this key with an empty set inside */
dstset = createIntsetObject();
}
/* Iterate all the elements of the first (smallest) set, and test
* the element against all the other sets, if at least one set does
* not include the element it is discarded */
si = setTypeInitIterator(sets[0]);
while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) {
for (j = 1; j < setnum; j++) {
if (sets[j] == sets[0]) continue;
/*
*依據不同的編碼進行相應的操作
*/
if (encoding == REDIS_ENCODING_INTSET) {
/* 編碼均為intset時,則直接進行查找 */
if (sets[j]->encoding == REDIS_ENCODING_INTSET &&
!intsetFind((intset*)sets[j]->ptr,intobj))
{
break;
/* 編碼為hash table時, 重新創建object進行比較 */
} else if (sets[j]->encoding == REDIS_ENCODING_HT) {
eleobj = createStringObjectFromLongLong(intobj);
if (!setTypeIsMember(sets[j],eleobj)) {
decrRefCount(eleobj);
break;
}
decrRefCount(eleobj);
}
} else if (encoding == REDIS_ENCODING_HT) {
/*待查集合為intset, 則可以直接安卓long類型進行查找, 否則只能object在hash table中查找*/
if (eleobj->encoding == REDIS_ENCODING_INT &&
sets[j]->encoding == REDIS_ENCODING_INTSET &&
!intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr))
{
break;
} else if (!setTypeIsMember(sets[j],eleobj)) {
break;
}
}
}
/* 查找到最后一個集合表示此元素在所有集合中均出現, 作為交集結果 */
if (j == setnum) {
if (!dstkey) {
if (encoding == REDIS_ENCODING_HT)
addReplyBulk(c,eleobj);
else
addReplyBulkLongLong(c,intobj);
cardinality++;
} else {
if (encoding == REDIS_ENCODING_INTSET) {
eleobj = createStringObjectFromLongLong(intobj);
setTypeAdd(dstset,eleobj);
decrRefCount(eleobj);
} else {
setTypeAdd(dstset,eleobj);
}
}
}
}
setTypeReleaseIterator(si);
/*判斷是否需要存儲交集結果, 並進行相應操作*/
if (dstkey) {
int deleted = dbDelete(c->db,dstkey);
if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sinterstore",
dstkey,c->db->id);
} else {
decrRefCount(dstset);
addReply(c,shared.czero);
if (deleted)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
signalModifiedKey(c->db,dstkey);
server.dirty++;
} else {
setDeferredMultiBulkLength(c,replylen,cardinality);
}
zfree(sets);
}
總結
集合的幾種操作都是比較耗時的, 使用時對於特別龐大的集合進行運算需要謹慎, 可能影響整體性能.
