redis學習筆記(八): multi


redis實現了對"事務"的支持,核心函數都在這里
摘抄對於事務的定義:是指作為單個邏輯工作單元執行的一系列操作,要么完全地執行,要么完全地不執行
它的4個特性:原子性、一致性、隔離性、持久性
redis在事務的執行中並沒有提供回滾操作,它會按順序執行完隊列中的所有命令而不管中間是否有命令出錯(當然,執行出錯的命令會打印出錯信息),所以一致性沒辦法保障。

相關的command:

struct redisCommand redisCommandTable[] = {
    ...
    {"multi",    multiCommand,   1,"rsF",0,NULL,0,0,0,0,0},    //標識事務的開始
    {"exec",     execCommand,    1,"sM",    0,NULL,0,0,0,0,0},    //事務的提交(commit)
    {"discard",  discardCommand, 1,"rsF",0,NULL,0,0,0,0,0},    //取消事務(不是回滾)
    ...
    {"watch",    watchCommand,  -2,"rsF",0,NULL,1,-1,1,0,0},
    {"unwatch",  unwatchCommand, 1,"rsF",0,NULL,0,0,0,0,0},
    ...
}

初始化一個multi state,就是簡單地將commands指針設置為空,count設置為0。代碼如下:

/* Client state initialization for MULTI/EXEC */
void initClientMultiState(redisClient *c) {
    c->mstate.commands = NULL;
    c->mstate.count = 0;
}

queueMultiCommand用來將命令加入待執行隊列,MULTI到EXEC之間的命令都是通過它來入隊的,核心代碼如下:

/* 把待執行的command加入到隊列。
 * 每次來一個新的command就要realloc一次空間,而且只增加一個command的大小。
 * 為什么不用預分配再適當擴大的辦法?類似於sdsMakeRoomFor的做法?
 */
void queueMultiCommand(redisClient *c) {
    multiCmd *mc;
    int j;

    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    c->mstate.count++;
}

discardTransaction用來取消某一個事務,discard命令、exec返回之前都會調用它。它除了釋放mstate.commands數組之外,最后還會unwatch all keys

void discardTransaction(redisClient *c) {
    freeClientMultiState(c);
    initClientMultiState(c);
    c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);
    unwatchAllKeys(c);
}

flagTransaction用來把一個進入multi狀態的client打上標記:REDIS_DIRTY_EXEC

/* Flag the transacation as DIRTY_EXEC so that EXEC will fail.
 * Should be called every time there is an error while queueing a command. */
/* 只在processCommand中發現是不合法的命令時會被調用 
 * 問題是,在processCommand當中,調用這個函數之后都返回OK了,不會進行后面的處理(比如執行或者queue),打這個標記有什么作用?
 * EXEC在執行之前會檢查是否有這個標記,只要有這個標記,就返回錯誤並取消事務
 * 所以,事務中的一系列命令,只要有一個命令的格式錯誤,其它的全部不執行
 */
void flagTransaction(redisClient *c) {
    if (c->flags & REDIS_MULTI)
        c->flags |= REDIS_DIRTY_EXEC;
}

multiCommand是multi命令的處理入口,它只是檢查狀態之后,簡單地將client標記為REDIS_MULTI狀態

void multiCommand(redisClient *c) {
    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    c->flags |= REDIS_MULTI;
    addReply(c,shared.ok);
}

discard命令的處理函數

void discardCommand(redisClient *c) {
    if (!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"DISCARD without MULTI");
        return;
    }
    discardTransaction(c);
    addReply(c,shared.ok);
}

exec命令的處理函數

void execCommand(redisClient *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */

    /* 必須是MULTI狀態 */
    if (!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }

    /* Check if we need to abort the EXEC because:
     * 1) Some WATCHed key was touched.
     * 2) There was a previous error while queueing commands.
     * A failed EXEC in the first case returns a multi bulk nil object
     * (technically it is not an error but a special behavior), while
     * in the second an EXECABORT error is returned. */
    /* 執行之前,檢查一下異常狀態。其中REDIS_DIRTY_CAS會在touchWatchedKey中被打上,這樣就實現了原子操作 */
    if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
        addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        discardTransaction(c);
        goto handle_monitor;
    }

    /* Exec all the queued commands */
    /* 為什么不做unwatch就會浪費cpu?
     * 因為下面要執行的一系列命令可能會修改某些key,如果不unwatch all,就可能會做一些不必要的touchWatchedKey操作?
     */
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    /* 記錄原始(當前)的cmd相關指針 */
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    /* 首先給客戶返回要執行的命令數量 */
    addReplyMultiBulkLen(c,c->mstate.count);
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        /* Propagate a MULTI request once we encounter the first write op.
         * This way we'll deliver the MULTI/..../EXEC block as a whole and
         * both the AOF and the replication link will have the same consistency
         * and atomicity guarantees. */
        /* 如果有寫命令,只進行一次propagate,保證AOF和replication的一致性和原子性 */
        if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
            execCommandPropagateMulti(c);
            must_propagate = 1;
        }

        /* 執行命令,就算該命令執行失敗也不會回滾而是繼續執行下一條 */
        call(c,REDIS_CALL_FULL);

        /* Commands may alter argc/argv, restore mstate. */
        /* 命令的執行過程可能會修改參數,記錄新的參數內容 */
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    /* 恢復 */
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    /* 執行完成之后,結束事務 */
    discardTransaction(c);
    /* Make sure the EXEC command will be propagated as well if MULTI
     * was already propagated. */
    /* 如果執行過propagate,dirty計數加1 */
    if (must_propagate) server.dirty++;

handle_monitor:
    /* Send EXEC to clients waiting data from MONITOR. We do it here
     * since the natural order of commands execution is actually:
     * MUTLI, EXEC, ... commands inside transaction ...
     * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
     * table, and we do it here with correct ordering. */
    /* 如果有client在monitor上等待輸出(監控?),將這次的命令信息(不是MULTI...EXEC之間執行的命令,MULTI...EXEC之間的命令在上面執行call的時候會發到monitor)發送給相應的client */
    if (listLength(server.monitors) && !server.loading)
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}

上面的部分是multi命令執行需要的所有相關函數。但是僅僅只有上面的部分的話,也只是實現了一種"批量處理"的方式,還不能算是事務。下面提到的watch就是用來保證原子性。

代碼中,對WATCH的注釋是: CAS alike for MULTI/EXEC。

CAS應該是Compare and Swap,是一種實現樂觀鎖的機制,它的原理:認為位置 V 應該包含值 A;如果包含該值,則將 B 放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可

具體到用watch機制來保證操作的原子性(下面這個加1的操作可以用incr一條命令實現,這里只是為了演示):

1. watch key
2. val = get key
3. val = val + 1
4. MULTI
5. set key value
6. EXEC
對於上面的這一系列操作,如果在EXEC命令之前,有其它client修改了key對應的value,那么這一次的EXEC是不會執行的,需要重新執行上面的所有步驟(EXEC結束時會unwatch all keys)。
所以redis里事務的原子性必須要依靠watch來保證。

watch的實現中使用了下面這個結構體,用來將key和db進行關聯

/* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
 *
 * The implementation uses a per-DB hash table mapping keys to list of clients
 * WATCHing those keys, so that given a key that is going to be modified
 * we can mark all the associated clients as dirty.
 *
 * Also every client contains a list of WATCHed keys so that's possible to
 * un-watch such keys when the client is freed or when UNWATCH is called. */

/* In the client->watched_keys list we need to use watchedKey structures
 * as in order to identify a key in Redis we need both the key name and the
 * DB */
/* redisClient結構中用list來組織該client watch的所有keys
 * redisDB結構中用dict來組織watch某一個key的所有client列表
 */
typedef struct watchedKey {
    robj *key;
    redisDb *db;
} watchedKey;

watch key的核心操作

/* 
 * 1. 如果client的watched_keys列表上已經有了這個key,直接返回
 * 2. 如果沒有,則加到相應的db中,再加到client的watched_keys列表上
 * 3. 增加這個key的引用計數
 */
void watchForKey(redisClient *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* Check if we are already watching for this key */
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    /* This key is not already watched in this DB. Let's add it */
    clients = dictFetchValue(c->db->watched_keys,key);
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    listAddNodeTail(clients,c);
    /* Add the new key to the list of keys watched by this client */
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

unwatchAllKeys用來unwatch某個client所有watched keys

/* Unwatch all the keys watched by this client. To clean the EXEC dirty
 * flag is up to the caller. */
/* unwatch某個client watch過的所有keys,主要操作:
 * 1. 從db中watched_keys上相應key上的client列表中移除該client
 * 2. 從該client的watched_keys中移除所有元素
 */
void unwatchAllKeys(redisClient *c) {
    listIter li;
    listNode *ln;

    if (listLength(c->watched_keys) == 0) return;
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        list *clients;
        watchedKey *wk;

        /* Lookup the watched key -> clients list and remove the client
         * from the list */
        wk = listNodeValue(ln);
        clients = dictFetchValue(wk->db->watched_keys, wk->key);
        redisAssertWithInfo(c,NULL,clients != NULL);
        listDelNode(clients,listSearchKey(clients,c));
        /* Kill the entry at all if this was the only client */
        if (listLength(clients) == 0)
            dictDelete(wk->db->watched_keys, wk->key);
        /* Remove this watched key from the client->watched list */
        listDelNode(c->watched_keys,ln);
        decrRefCount(wk->key);
        zfree(wk);
    }
}

touchWatchedKey函數是保證原子性的一部分操作:

/* touchWatchedKey只會被signalModifiedKey調用,所以應該是某個key對應的value被改的時候會走到這里? 
 * 它只是簡單地打標記,在執行EXEC命令時如果有這個標記,EXEC會直接失敗。用於保證事務操作的原子性
 */
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    if (dictSize(db->watched_keys) == 0) return;
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
    /* Check if we are already watching for this key */
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        redisClient *c = listNodeValue(ln);

        c->flags |= REDIS_DIRTY_CAS;
    }
}

在將db的內容寫到磁盤上時,會調用touchWatchedKeysOnFlush

/* On FLUSHDB or FLUSHALL all the watched keys that are present before the
 * flush but will be deleted as effect of the flushing operation should
 * be touched. "dbid" is the DB that's getting the flush. -1 if it is
 * a FLUSHALL operation (all the DBs flushed). */
void touchWatchedKeysOnFlush(int dbid) {
    listIter li1, li2;
    listNode *ln;

    /* For every client, check all the waited keys */
    listRewind(server.clients,&li1);
    while((ln = listNext(&li1))) {
        redisClient *c = listNodeValue(ln);
        listRewind(c->watched_keys,&li2);
        while((ln = listNext(&li2))) {
            watchedKey *wk = listNodeValue(ln);

            /* For every watched key matching the specified DB, if the
             * key exists, mark the client as dirty, as the key will be
             * removed. */
            if (dbid == -1 || wk->db->id == dbid) {
                if (dictFind(wk->db->dict, wk->key->ptr) != NULL)
                    c->flags |= REDIS_DIRTY_CAS;
            }
        }
    }
}

最后,watch/unwatch命令的入口

/* watch命令的入口函數 */
void watchCommand(redisClient *c) {
    int j;

    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
}

/* unwatch命令的入口函數 */
void unwatchCommand(redisClient *c) {
    unwatchAllKeys(c);
    c->flags &= (~REDIS_DIRTY_CAS);
    addReply(c,shared.ok);
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM