Java分布式鎖之數據庫方式實現


之前的文章《Java分布式鎖實現》中列舉了分布式鎖的3種實現方式,分別是基於數據庫實現,基於緩存實現和基於zookeeper實現。三種實現方式各有可取之處,本篇文章就詳細講解一下Java分布式鎖之基於數據庫的實現方式,也是最簡單最易理解的實現方式。

首先,先來闡述下“鎖”的概念,鎖作為一種安全防御工具,既能上鎖防止別人打開,又能讓持有鑰匙的人打開鎖,這是鎖的基本功能。那再來說一下“分布式鎖”,分布式鎖是在分布式系統(多個獨立運行系統)內的鎖,相對來說,這把鎖的安全級別以及作用范圍更大,所以從設計上就要考慮更多東西。

現在來說,怎么基於數據庫實現這把分布式鎖。其實說白了就是,把鎖作為數據資源存入數據庫,當持有這把鎖的訪問者來決定是否開鎖。

以下詳細講解了在多個應用服務里,怎樣用數據庫去實現分布式鎖。

結合案例:

1.客戶app取出交易(同一個客戶在某一個時間點只能對某種資產做取現操作)

2.交易重試補償(交易過程服務宕機,掃描重試補償)

一、數據庫的設計

數據庫鎖表的表結構如下:

 
field type comment
ID bigint 主鍵
OUTER_SERIAL_NO varchar 流水號
CUST_NO char 客戶號
SOURCE_CODE varchar 鎖操作
THREAD_NO varchar 線程號
STATUS char 鎖狀態
REMARK varchar 備注
CREATED_AT timestamp 創建時間
UPDATED_AT timestamp 更新時間

作為鎖的必要屬性有5個:系統流水號,客戶號,鎖操作,線程號和鎖狀態,下面來解釋一下每種屬性

流水號:鎖的具體指向,比如可以是產品,可以是交易流水號(后面會說到交易同步鎖、交易補償鎖的使用方式)

客戶號:客戶的唯一標識

鎖操作:客戶的某種操作,比如客戶取現操作,取現補償重試操作

線程號:當前操作線程的線程號,比如取當前線程的uuid

鎖狀態:P處理中,F失敗,Y成功

二、代碼設計

代碼的目錄結構如下: 

主要貼一下鎖操作的核心代碼實現:

鎖接口定義:DbLockManager.java

/**
 * 鎖接口 <br>
 * 
 * @Author fugaoyang
 *
 */
public interface DbLockManager {

    /**
     * 加鎖
     */
    boolean lock(String outerSerialNo, String custNo, LockSource source);

    /**
     * 解鎖
     */
    void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus);

}
View Code

鎖接口實現類:DbLockManagerImpl.java

/**
 * 
 * 數據庫鎖實現<br>
 * 
 * @author fugaoyang
 *
 */
@Service
public class DbLockManagerImpl implements DbLockManager {

    private final Logger LOG = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private DbSyncLockMapper lockMapper;

    @Transactional
    public boolean lock(String outerSerialNo, String custNo, LockSource source) {

        boolean isLock = false;
        TradeSyncLock lock = null;
        try {
            lock = lockMapper.find(outerSerialNo, custNo, source.getCode());

            if (null == lock) {
                lock = new TradeSyncLock();
                createLock(lock, outerSerialNo, custNo, source);

                int num = lockMapper.insert(lock);
                if (num == 1) {
                    isLock = true;
                }

                LOG.info(ThreadLogUtils.getLogPrefix() + "加入鎖,客戶號[{}],鎖類型[{}]", custNo, source.getCode());
                return isLock;
            }

            // 根據交易類型進行加鎖
            isLock = switchSynsLock(lock, source);
            LOG.info(ThreadLogUtils.getLogPrefix() + "更新鎖,客戶號[{}],鎖類型[{}]", custNo, source.getCode());

        } catch (Exception e) {
            LOG.error(ThreadLogUtils.getLogPrefix() + "交易加鎖異常, 客戶號:" + custNo, e);
        }
        return isLock;
    }

    @Transactional
    public void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus) {

        try {
            TradeSyncLock lock = lockMapper.find(outerSerialNo, custNo, source.getCode());

            if (null != lock) {
                lockMapper.update(lock.getId(), targetStatus.getName(), LockStatus.P.getName(),
                        ThreadLogUtils.getCurrThreadUuid(), ThreadLogUtils.getCurrThreadUuid());
            }

            LOG.info(ThreadLogUtils.getLogPrefix() + "釋放鎖,客戶號[{}],鎖類型[{}]", custNo, source.getCode());
        } catch (Exception e) {
            LOG.error(ThreadLogUtils.getLogPrefix() + "釋放鎖異常, 客戶號:{}", custNo, e);
        }
    }

    /**
     * 匹配加鎖
     */
    private boolean switchSynsLock(TradeSyncLock lock, LockSource source) {
        boolean isLock = false;

        switch (source) {
        case WITHDRAW:
            ;
            isLock = tradeSynsLock(lock);
            break;
        case WITHDRAW_RETRY:
            ;
            isLock = retrySynsLock(lock);
            break;
        default:
            ;
        }
        return isLock;
    }

    /**
     * 交易同步鎖
     */
    private boolean tradeSynsLock(TradeSyncLock lock) {
        // 處理中的不加鎖,即不執行交易操作
        if (LockStatus.P.getName().equals(lock.getStatus())) {
            return false;
        }

        int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.S.getName(),
                ThreadLogUtils.getCurrThreadUuid(), null);
        if (num == 1) {
            return true;
        }
        return false;
    }

    /**
     * 補償同步鎖
     */
    private boolean retrySynsLock(TradeSyncLock lock) {
        // 處理中或處理完成的不加鎖,即不執行補償操作
        if (LockStatus.P.getName().equals(lock.getStatus()) || LockStatus.S.getName().equals(lock.getStatus())) {
            return false;
        }

        int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.F.getName(),
                ThreadLogUtils.getCurrThreadUuid(), null);
        if (num == 1) {
            return true;
        }
        return false;
    }

    private void createLock(TradeSyncLock lock, String outerSerialNo, String custNo, LockSource source) {
        lock.setOuterSerialNo(outerSerialNo);
        lock.setCustNo(custNo);
        lock.setSourceCode(source.getCode());
        lock.setThreadNo(ThreadLogUtils.getCurrThreadUuid());
        lock.setStatus(LockStatus.P.getName());
        lock.setRemark(source.getDesc());
    }

}
View Code

獲取當前線程號以及打印uuid工具類ThreadLogUtils.Java

/**
 * 
 * 線程處理<br>
 * 
 * @author fugaoyang
 *
 */
public class ThreadLogUtils {

    private static ThreadLogUtils instance = null;

    private ThreadLogUtils() {
        setInstance(this);
    }

    // 初始化標志
    private static final Object __noop = new Object();
    private static ThreadLocal<Object> __flag = new InheritableThreadLocal<Object>() {
        @Override
        protected Object initialValue() {
            return null;
        }
    };

    // 當前線程的UUID信息,主要用於打印日志;
    private static ThreadLocal<String> currLogUuid = new InheritableThreadLocal<String>() {
        @Override
        protected String initialValue() {
            return UUID.randomUUID().toString()/* .toUpperCase() */;
        }
    };

    private static ThreadLocal<String> currThreadUuid = new ThreadLocal<String>() {
        @Override
        protected String initialValue() {
            return UUIDGenerator.getUuid();
        }
    };

    public static void clear(Boolean isNew) {
        if (isNew) {

            currLogUuid.remove();

            __flag.remove();

            currThreadUuid.remove();

        }
    }

    public static String getCurrLogUuid() {
        if (!isInitialized()) {
            throw new IllegalStateException("TLS未初始化");
        }

        return currLogUuid.get();
    }

    public static String getCurrThreadUuid() {
        return currThreadUuid.get();
    }

    public static void clearCurrThreadUuid() {
        currThreadUuid.remove();
    }

    public static String getLogPrefix() {
        if (!isInitialized()) {
            return "";
        }

        return "<uuid=" + getCurrLogUuid() + ">";
    }

    private static boolean isInitialized() {
        return __flag.get() != null;
    }

    /**
     * 初始化上下文,如果已經初始化則返回false,否則返回true<br/>
     *
     * @return
     */
    public static boolean initialize() {
        if (isInitialized()) {
            return false;
        }

        __flag.set(__noop);
        return true;
    }

    private static void setInstance(ThreadLogUtils instance) {
        ThreadLogUtils.instance = instance;
    }

    public static ThreadLogUtils getInstance() {
        return instance;
    }

}
View Code

兩種鎖的實現的大致思路如下:

1.交易同步鎖

當一個客戶在app取現,第一次進入時,會插入一條當前線程,狀態是P,操作是取現的鎖,取現成功后根據當前線程號會更新成功;

當一個客戶同時多個取現操作時,只有一個取現操作會加鎖成功,其它會加鎖失敗;

當一個客戶已經在取現中,這時數據庫已經有一條狀態P的鎖,該客戶同時又做了取現,這個取現動作會嘗試加鎖而退出;

2.交易重試補償鎖

1.當一個客戶取現加鎖成功,因調用第三方支付接口超時時,后台會對該筆交易重新發起重試打款操作,這時會新加一條當前交易流水號,當前線程號,狀態是P,操作是取現重試的鎖,重試的支付結果是成功的話,更新該條鎖數據為Y狀態,否則更新該條數據為F狀態;

2.當重試支付失敗后,再去重試打款時,發現鎖的狀態是F,這時把F更新為P,繼續重試,根據重試結果更新鎖狀態。

上面實現的是一個最基本的數據庫分布式鎖,滿足的並發量也是基於數據庫所能扛得住的,性能基本可以滿足普通的交易量。

后續可以優化的部分:

1.當一個用戶同時多次獲取lock時,因為目前是用的樂觀鎖,只會有一個加鎖成功,可以優化成加入while(true)循環獲取lock,當失敗次數到達指定次數時退出,當前的操作結束。

2.當鎖表數據量隨着時間增大時,可以考慮按用戶對鎖表進行分表分庫,以減小數據庫方面的壓力。

3.對鎖的操作可以抽象出來,作為抽象實現,比如具體的取現操作只關心取現這個業務實現。

 

因為時間有限,寫的比較倉促,希望大家有問題可以提出,相互探討~~

完整示例代碼后續會更新到github。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM