java實現分布式鎖


1.前言

大多數互聯網系統是分布式部署的,分布式部署解決了高並發高可用的問題,但是由此帶來了數據一致性問題。

當某個資源在多系統之間,被共享操作的時候,為了保證這個資源數據是一致的,那么就必須要求在同一時刻只能被一個客戶端操作,不能並發的執行,否者就會出現同一時刻有客戶端寫,別的客戶端在讀,兩者訪問到的數據就不一致了。

 

2.我們為什么需要分布式鎖

在單機時代,雖然不需要分布式鎖,但也面臨過類似的問題,只不過在單機的情況下,如果有多個線程要同時訪問某個共享資源的時候,我們可以采用線程間加鎖的機制,即當某個線程獲取到這個資源后,就立即對這個資源進行加鎖,當使用完資源之后,再解鎖,其它線程就可以接着使用了。例如,在JAVA中,甚至專門提供了一些處理鎖機制的一些API(synchronize/Lock等)。

但是到了分布式系統的時代,這種線程之間的鎖機制,就沒作用了,應用程序會有多份,並且部署在不同的機器上,這些資源已經不是在同一進程的不同線程間共享,而是屬於多進程之間共享的資源。

因此,為了解決這個問題,我們就必須引入「分布式鎖」。

分布式鎖,是指在分布式的部署環境下,通過鎖機制來讓多客戶端互斥的對共享資源進行訪問。

分布式鎖要滿足哪些要求呢?

排他性:在同一時間只會有一個客戶端能獲取到鎖,其它客戶端無法獲取

避免死鎖:這把鎖在一段有限的時間之后,一定會被釋放(正常釋放或異常釋放)

高可用:獲取或釋放鎖的機制必須高可用且性能佳

而且最好是可重入鎖。

 

3.分布式鎖的實現方式有哪些

目前主流的有三種,從實現的復雜度上來看,從上往下難度依次增加:

基於數據庫實現

基於Redis實現

基於ZooKeeper實現

無論哪種方式,其實都不完美,依舊要根據咱們業務的實際場景來選擇。

 

方案1 基於數據庫實現

基於數據庫來做分布式鎖的話,通常有兩種做法:

基於數據庫的樂觀鎖

基於數據庫的悲觀鎖

我們先來看一下如何基於「樂觀鎖」來實現:

樂觀鎖機制其實就是在數據庫表中引入一個版本號(version)字段來實現的。

當我們要從數據庫中讀取數據的時候,同時把這個version字段也讀出來,如果要對讀出來的數據進行更新后寫回數據庫,則需要將version加1,同時將新的數據與新的version更新到數據表中,且必須在更新的時候同時檢查目前數據庫里version值是不是之前的那個version,如果是,則正常更新。如果不是,則更新失敗,說明在這個過程中有其它的進程去更新過數據了。

   樂觀鎖通常實現基於數據版本(version)的記錄機制實現的,比如有一張紅包表(t_bonus),有一個字段(left_count)記錄禮物的剩余個數,用戶每領取一個獎品,對應的left_count減1,在並發的情況下如何要保證left_count不為負數,樂觀鎖的實現方式為在紅包表上添加一個版本號字段(version),默認為0。

異常實現流程

-- 可能會發生的異常情況
-- 線程1查詢,當前left_count為1,則有記錄
select * from t_bonus where id = 10001 and left_count > 0

-- 線程2查詢,當前left_count為1,也有記錄
select * from t_bonus where id = 10001 and left_count > 0

-- 線程1完成領取記錄,修改left_count為0,
update t_bonus set left_count = left_count - 1 where id = 10001

-- 線程2完成領取記錄,修改left_count為-1,產生臟數據
update t_bonus set left_count = left_count - 1 where id = 10001

 

通過樂觀鎖實現

-- 添加版本號控制字段
ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus;

-- 線程1查詢,當前left_count為1,則有記錄,當前版本號為1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

-- 線程2查詢,當前left_count為1,有記錄,當前版本號為1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

-- 線程1,更新完成后當前的version為1235,update狀態為1,更新成功
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

-- 線程2,更新由於當前的version為1235,udpate狀態為0,更新失敗,再針對相關業務做異常處理
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

 

悲觀鎖」的實現:

悲觀鎖利用數據庫的行鎖來進行鎖定指定行,

通常用"SELECT * FROM TABLE_NAME WHERE id=id_value FOR UPDATE" 來獲取數據。

如果能獲取到數據,則加鎖成功, 如果獲取失敗,說明鎖已經被別的程序占用了,自己則獲取鎖失敗。

/**
     * 消費以后更新銀行余額
     * @param bankId 銀行卡號
     * @param cost 消費金額
     * @return
     */
    public boolean consume(Long bankId, Integer cost){
        //先鎖定銀行賬戶
        BankAccount product = query("SELECT * FROM bank_account WHERE bank_id=#{bankId} FOR UPDATE", bankId);
        if (product.getNumber() > 0) {
            int updateCnt = update("UPDATE tb_product_stock SET number=#{cost} WHERE product_id=#{productId}", cost, bankId);
            if(updateCnt > 0){    //更新庫存成功
                return true;
            }
        }
        return false;
    }

 

方案二:基於Redis的分布式鎖

用到的部分redis指令

SETNX命令(SET if Not eXists)
語法:SETNX key value
功能:原子性操作,當且僅當 key 不存在,將 key 的值設為 value ,並返回1;若給定的 key 已經存在,則 SETNX 不做任何動作,並返回0。
Expire命令
語法:expire(key, expireTime)
功能:key設置過期時間
GETSET命令
語法:GETSET key value
功能:將給定 key 的值設為 value ,並返回 key 的舊值 (old value),當 key 存在但不是字符串類型時,返回一個錯誤,當key不存在時,返回nil。
GET命令
語法:GET key
功能:返回 key 所關聯的字符串值,如果 key 不存在那么返回特殊值 nil 。
DEL命令
語法:DEL key [KEY …]
功能:刪除給定的一個或多個 key ,不存在的 key 會被忽略。
第一種:使用redis的setnx()、expire()方法,用於分布式鎖
  1. setnx(lockkey, 1) 如果返回0,則說明占位失敗;如果返回1,則說明占位成功
  2. expire()命令對lockkey設置超時時間,為的是避免死鎖問題。
  3. 執行完業務代碼后,可以通過delete命令刪除key。
這個方案其實是可以解決日常工作中的需求的,但從技術方案的探討上來說,可能還有一些可以完善的地方。比如,如果在第一步setnx執行成功后,
在expire()命令執行成功前,發生了宕機的現象,那么就依然會出現死鎖的問題

第二種:使用redis的setnx()、get()、getset()方法,用於分布式鎖,解決死鎖問題   設置了key的過期時間

  1. setnx(lockkey, 當前時間+過期超時時間) ,如果返回1,則獲取鎖成功;如果返回0則沒有獲取到鎖,轉向2。
  2. get(lockkey)獲取值oldExpireTime ,並將這個value值與當前的系統時間進行比較,如果小於當前系統時間,則認為這個鎖已經超時,可以允許別的請求重新獲取,轉向3。
  3. 計算newExpireTime=當前時間+過期超時時間,然后getset(lockkey, newExpireTime) 會返回當前lockkey的值currentExpireTime。
  4. 判斷currentExpireTime與oldExpireTime 是否相等,如果相等,說明當前getset設置成功,獲取到了鎖。如果不相等,說明這個鎖又被別的請求獲取走了,那么當前請求可以直接返回失敗,或者繼續重試。
  5. 在獲取到鎖之后,當前線程可以開始自己的業務處理,當處理完畢后,比較自己的處理時間和對於鎖設置的超時時間,如果小於鎖設置的超時時間,則直接執行delete釋放鎖;如果大於鎖設置的超時時間,則不需要再鎖進行處理。

代碼示例

復制代碼
import cn.com.tpig.cache.redis.RedisService;
import cn.com.tpig.utils.SpringUtils;

/**
 * Created by IDEA
 * User: shma1664
 * Date: 2016-08-16 14:01
 * Desc: redis分布式鎖
 */
public final class RedisLockUtil {

    private static final int defaultExpire = 60;

    private RedisLockUtil() {
        //
    }

    /**
     * 加鎖
     * @param key redis key
     * @param expire 過期時間,單位秒
     * @return true:加鎖成功,false,加鎖失敗
     */
    public static boolean lock(String key, int expire) {

        RedisService redisService = SpringUtils.getBean(RedisService.class);
        long status = redisService.setnx(key, "1");

        if(status == 1) {
            redisService.expire(key, expire);
            return true;
        }

        return false;
    }

    public static boolean lock(String key) {
        return lock2(key, defaultExpire);
    }

    /**
     * 加鎖
     * @param key redis key
     * @param expire 過期時間,單位秒
     * @return true:加鎖成功,false,加鎖失敗
     */
    public static boolean lock2(String key, int expire) {

        RedisService redisService = SpringUtils.getBean(RedisService.class);

        long value = System.currentTimeMillis() + expire;
        long status = redisService.setnx(key, String.valueOf(value));

        if(status == 1) {
            return true;
        }
        long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
        if(oldExpireTime < System.currentTimeMillis()) {
            //超時
            long newExpireTime = System.currentTimeMillis() + expire;
            long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime)));
            if(currentExpireTime == oldExpireTime) {
                return true;
            }
        }
        return false;
    }

    public static void unLock1(String key) {
        RedisService redisService = SpringUtils.getBean(RedisService.class);
        redisService.del(key);
    }

    public static void unLock2(String key) {    
        RedisService redisService = SpringUtils.getBean(RedisService.class);    
        long oldExpireTime = Long.parseLong(redisService.get(key, "0"));   
        if(oldExpireTime > System.currentTimeMillis()) {        
            redisService.del(key);    
        }
   }

}

public void drawRedPacket(long userId) {
    String key = "draw.redpacket.userid:" + userId;

    boolean lock = RedisLockUtil.lock2(key, 60);
    if(lock) {
        try {
            //領取操作
        } finally {
            //釋放鎖
            RedisLockUtil.unLock(key);
        }
    } else {
        new RuntimeException("重復領取獎勵");
    }
}
View Code

 

 第三種  使用redis的setnx()、get()、getset()方法,用於分布式鎖,解決死鎖問題   設置了key的值為過期時間   生產環境驗證過

package com.differ.edibase.plugins.lock.redis;

import com.differ.edibase.infrastructure.component.cache.Cacher;
import com.differ.edibase.infrastructure.utils.SpringResolveManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Transaction;

/**
 * redis實現簡單的分布式鎖
 *
 * @author 
 * @since 
 */
@Component
@Scope("prototype")
public class RedisLock {

    // region 屬性

    /**
     * 緩存
     */
    @Autowired
    protected Cacher cacher;
    /**
     * 默認等待時間
     */
    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
    /**
     * 鎖鍵
     */
    private String lockKey = "edi.redis.lock";
    /**
     * 鎖超時時間,防止線程在入鎖以后,無限的執行等待
     */
    private int expireMsecs = 60 * 1000;
    /**
     * 鎖等待時間,防止線程飢餓
     */
    private int timeoutMsecs = 10 * 1000;

    private volatile boolean locked = false;

    // endregion

    // region 構造器

    /**
     * 構造器
     */
    public RedisLock() {
    }

    /**
     * 構造器
     *
     * @param lockKey 鎖鍵
     */
    public RedisLock(String lockKey) {
        this.lockKey = lockKey + "_lock";
    }

    /**
     * 構造器
     *
     * @param lockKey      鎖鍵
     * @param timeoutMsecs 鎖等待時間
     */
    public RedisLock(String lockKey, int timeoutMsecs) {
        this(lockKey);
        this.timeoutMsecs = timeoutMsecs;
    }

    /**
     * 構造器
     *
     * @param lockKey      鎖鍵
     * @param timeoutMsecs 鎖等待時間
     * @param expireMsecs  鎖超時時間
     */
    public RedisLock(String lockKey, int timeoutMsecs, int expireMsecs) {
        this(lockKey, timeoutMsecs);
        this.expireMsecs = expireMsecs;
    }

    // endregion

    // region 鎖具體方法

    /**
     * 獲得 lock.
     * 實現思路: 主要是使用了redis 的setnx命令,緩存了鎖.
     * reids緩存的key是鎖的key,所有的共享, value是鎖的到期時間(注意:這里把過期時間放在value了,沒有時間上設置其超時時間)
     * 執行過程:
     * 1.通過setnx嘗試設置某個key的值,成功(當前沒有這個鎖)則返回,成功獲得鎖
     * 2.鎖已經存在則獲取鎖的到期時間,和當前時間比較,超時的話,則設置新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout > 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            // 鎖到期時間
            String expiresStr = String.valueOf(expires);
            if (this.cacher.setNx(this.lockKey, expiresStr) == 1) {
                this.locked = true;
                return true;
            }
            // redis里的時間
            String currentValueStr = this.cacher.get(this.lockKey);
            // 判斷是否為空,不為空的情況下,如果被其他線程設置了值,則第二個條件判斷是過不去的
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                // 獲取上一個鎖到期時間,並設置現在的鎖到期時間,只有一個線程才能獲取上一個線上的設置時間,因為jedis.getSet是同步的
                String oldValueStr = this.cacher.getSet(this.lockKey, expiresStr);
                // 防止誤刪(覆蓋,因為key是相同的)了他人的鎖——這里達不到效果,這里值會被覆蓋,但是因為什么相差了很少的時間,所以可以接受
                // [分布式的情況下]:如過這個時候,多個線程恰好都到了這里,但是只有一個線程的設置值和當前值相同,他才有權利獲取鎖
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
        }
        return false;
    }

    /**
     * Acqurired lock release.
     */
    public synchronized void unlock() {
        if (this.locked) {
            this.cacher.delete(this.lockKey);
            this.locked = false;
        }
    }

    // endregion

    // region 獲取RedisLock

    /**
     * 獲取RedisLock
     *
     * @param lockKey 鎖鍵
     * @return RedisLock
     */
    public static RedisLock get(String lockKey) {
        Object[] obj = new Object[] { lockKey };
        return SpringResolveManager.resolve(RedisLock.class, obj);
    }

    /**
     * 獲取RedisLock
     *
     * @param lockKey      鎖鍵
     * @param timeoutMsecs 鎖等待時間
     * @return RedisLock
     */
    public static RedisLock get(String lockKey, int timeoutMsecs) {
        Object[] obj = new Object[] { lockKey, timeoutMsecs };
        return SpringResolveManager.resolve(RedisLock.class, obj);
    }

    /**
     * 獲取RedisLock
     *
     * @param lockKey      鎖鍵
     * @param timeoutMsecs 鎖等待時間
     * @param expireMsecs  鎖超時時間
     * @return RedisLock
     */
    public static RedisLock get(String lockKey, int timeoutMsecs, int expireMsecs) {
        Object[] obj = new Object[] { lockKey, timeoutMsecs, expireMsecs };
        return SpringResolveManager.resolve(RedisLock.class, obj);
    }

    // endregion

}
View Code

具體使用

        // 加鎖獲取緩存
        RedisLock redisLock = RedisLock.get(this.key);
        try {
            if (redisLock.lock()) {
                     //做自己的業務
                }
            }
        } catch (Exception ex) {
             //記錄異常日志
        } finally {
             //釋放鎖
            redisLock.unlock();
        }
View Code

 

 

方案三 :基於Zookeeper的分布式鎖

利用節點名稱的唯一性來實現獨占鎖

    ZooKeeper機制規定同一個目錄下只能有一個唯一的文件名,zookeeper上的一個znode看作是一把鎖,通過createznode的方式來實現。所有客戶端都去創建/lock/${lock_name}_lock節點,最終成功創建的那個客戶端也即擁有了這把鎖,創建失敗的可以選擇監聽繼續等待,還是放棄拋出異常實現獨占鎖。

ZK具體實現分布式鎖,可以看

https://www.cnblogs.com/lijiasnong/p/9952494.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM