Redis setNX 實現分布式鎖(重復數據插入可用其來實現排他鎖)


使用Redis的 SETNX 命令可以實現分布式鎖,下文介紹其實現方法。

SETNX命令簡介

命令格式

SETNX key value

將 key 的值設為 value,當且僅當 key 不存在。
若給定的 key 已經存在,則 SETNX 不做任何動作。
SETNX 是SET if Not eXists的簡寫。

返回值

返回整數,具體為
- 1,當 key 的值被設置
- 0,當 key 的值沒被設置

例子

redis> SETNX mykey “hello”
(integer) 1
redis> SETNX mykey “hello”
(integer) 0
redis> GET mykey
“hello”
redis>

使用SETNX實現分布式鎖

多個進程執行以下Redis命令:

SETNX lock.foo <current Unix time + lock timeout + 1>

如果 SETNX 返回1,說明該進程獲得鎖,SETNX將鍵 lock.foo 的值設置為鎖的超時時間(當前時間 + 鎖的有效時間)。
如果 SETNX 返回0,說明其他進程已經獲得了鎖,進程不能進入臨界區。進程可以在一個循環中不斷地嘗試 SETNX 操作,以獲得鎖。

解決死鎖

考慮一種情況,如果進程獲得鎖后,斷開了與 Redis 的連接(可能是進程掛掉,或者網絡中斷),如果沒有有效的釋放鎖的機制,那么其他進程都會處於一直等待的狀態,即出現“死鎖”。

上面在使用 SETNX 獲得鎖時,我們將鍵 lock.foo 的值設置為鎖的有效時間,進程獲得鎖后,其他進程還會不斷的檢測鎖是否已超時,如果超時,那么等待的進程也將有機會獲得鎖。

然而,鎖超時時,我們不能簡單地使用 DEL 命令刪除鍵 lock.foo 以釋放鎖。考慮以下情況,進程P1已經首先獲得了鎖 lock.foo,然后進程P1掛掉了。進程P2,P3正在不斷地檢測鎖是否已釋放或者已超時,執行流程如下:

  • P2和P3進程讀取鍵 lock.foo 的值,檢測鎖是否已超時(通過比較當前時間和鍵 lock.foo 的值來判斷是否超時)
  • P2和P3進程發現鎖 lock.foo 已超時
  • P2執行 DEL lock.foo命令
  • P2執行 SETNX lock.foo命令,並返回1,即P2獲得鎖
  • P3執行 DEL lock.foo命令將P2剛剛設置的鍵 lock.foo 刪除(這步是由於P3剛才已檢測到鎖已超時)
  • P3執行 SETNX lock.foo命令,並返回1,即P3獲得鎖
  • P2和P3同時獲得了鎖

從上面的情況可以得知,在檢測到鎖超時后,進程不能直接簡單地執行 DEL 刪除鍵的操作以獲得鎖。

為了解決上述算法可能出現的多個進程同時獲得鎖的問題,我們再來看以下的算法。
我們同樣假設進程P1已經首先獲得了鎖 lock.foo,然后進程P1掛掉了。接下來的情況:

  • 進程P4執行 SETNX lock.foo 以嘗試獲取鎖
  • 由於進程P1已獲得了鎖,所以P4執行 SETNX lock.foo 返回0,即獲取鎖失敗
  • P4執行 GET lock.foo 來檢測鎖是否已超時,如果沒超時,則等待一段時間,再次檢測
  • 如果P4檢測到鎖已超時,即當前的時間大於鍵 lock.foo 的值,P4會執行以下操作
    GETSET lock.foo <current Unix timestamp + lock timeout + 1>
  • 由於 GETSET 操作在設置鍵的值的同時,還會返回鍵的舊值,通過比較鍵 lock.foo 的舊值是否小於當前時間,可以判斷進程是否已獲得鎖
  • 假如另一個進程P5也檢測到鎖已超時,並在P4之前執行了 GETSET 操作,那么P4的 GETSET 操作返回的是一個大於當前時間的時間戳,這樣P4就不會獲得鎖而繼續等待。注意到,即使P4接下來將鍵 lock.foo 的值設置了比P5設置的更大的值也沒影響。

另外,值得注意的是,在進程釋放鎖,即執行 DEL lock.foo 操作前,需要先判斷鎖是否已超時。如果鎖已超時,那么鎖可能已由其他進程獲得,這時直接執行 DEL lock.foo 操作會導致把其他進程已獲得的鎖釋放掉。

程序代碼

用以下python代碼來實現上述的使用 SETNX 命令作分布式鎖的算法。

LOCK_TIMEOUT = 3 lock = 0 lock_timeout = 0 lock_key = 'lock.foo' # 獲取鎖 while lock != 1: now = int(time.time()) lock_timeout = now + LOCK_TIMEOUT + 1 lock = redis_client.setnx(lock_key, lock_timeout) if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)): break else: time.sleep(0.001) # 已獲得鎖 do_job() # 釋放鎖 now = int(time.time()) if now < lock_timeout: redis_client.delete(lock_key)

java代碼:
public static boolean acquireLock(String lock) {
    // 1. 通過SETNX試圖獲取一個lock
    boolean success = false;
    Jedis jedis = pool.getResource();      
    long value = System.currentTimeMillis() + expired + 1;    
    System.out.println(value);    
    long acquired = jedis.setnx(lock, String.valueOf(value));
    //SETNX成功,則成功獲取一個鎖
    if (acquired == 1)      
        success = true;
    //SETNX失敗,說明鎖仍然被其他對象保持,檢查其是否已經超時
    else {
        long oldValue = Long.valueOf(jedis.get(lock));
 
        //超時
        if (oldValue < System.currentTimeMillis()) {
            String getValue = jedis.getSet(lock, String.valueOf(value));              
            // 獲取鎖成功
            if (Long.valueOf(getValue) == oldValue)
                success = true;
            // 已被其他進程捷足先登了
            else
                success = false;
        }
        //未超時,則直接返回失敗
        else            
            success = false;
    }        
    pool.returnResource(jedis);
    return success;      
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM