分布式鎖原理及實現(轉)


什么是分布式鎖?  

控制分布式架構中多個模塊訪問的優先級

要介紹分布式鎖,首先要提到與分布式鎖相對應的是線程鎖、進程鎖。

  線程鎖:主要用來給方法、代碼塊加鎖。當某個方法或代碼使用鎖,在同一時刻僅有一個線程執行該方法或該代碼段。線程鎖只在同一JVM中有效果,因為線程鎖的實現在根本上是依靠線程之間共享內存實現的,比如synchronized是共享對象頭,顯示鎖Lock是共享某個變量(state)。

  進程鎖:為了控制同一操作系統中多個進程訪問某個共享資源,因為進程具有獨立性,各個進程無法訪問其他進程的資源,因此無法通過synchronized等線程鎖實現進程鎖。

  分布式鎖:當多個進程不在同一個系統中,用分布式鎖控制多個進程對資源的訪問。


什么情況下需要使用分布式鎖

 比如:

   高並發下爭奪共享資源,比如秒殺對於庫存這種共享資源需要用到分布式鎖,如果不用分布式鎖很可能造成超賣。

分布式鎖也是鎖

  • 在單體應用的時候,如果多個線程要訪問共享資源的時候,我們通常線程間加鎖的機制,在某一個時刻,只有一個線程可以對這個資源進行操作,其他線程需要等待鎖的釋放,Java中也有一些處理鎖的機制,比如synchronized。

  • 而到了分布式的環境中,當某個資源可以被多個系統訪問使用到的時候,為了保證大家訪問這個數據是一致性的,那么就要求再同一個時刻,只能被一個系統使用,這時候線程之間的鎖機制就無法起到作用了,因為分布式環境中,系統是會部署到不同的機器上面的,那么就需要【分布式鎖】了。

什么時候需要使用分布式鎖

 總結來看,當有多個客戶端需要訪問並操作同一個資源,還需要保持這個資源一致性的時候,就需要使用【分布式鎖】,讓多客戶端互斥的對共享資源進行訪問。

 舉個例子來說明一下:

  • 有多個批處理任務,兩台機器同時處理,如果不加任何控制的話,很有可能同一個批處理被兩台機器分別處理一遍;如果使用分布式鎖,在領取任務的時候,一個任務只會被一台機器領到,這樣就不會造成任務的重復執行;
  • 再多思考一些,如果A/B兩台機器,任務1被A機器領取到進行處理,在處理到一半的時候,A機器掛掉了,那么這個批處理任務也就無法順利執行了,除非A機器可以恢復。 

這時候就可以知道分布式鎖需要做哪些工作了

  • 排他性:在同一時間只會有一個客戶端能獲取到鎖,其它客戶端無法同時獲取;
  • 避免死鎖:鎖在一段時間內有效,超過這個時間后會被釋放(正常釋放或異常釋放);
  • 高可用:獲取或釋放鎖的機制必須高可用且性能佳。

使用場景

  當你的后端服務是以集群形式存在的時候,是一定需要分布式鎖的。集群與分布式不同,而這里的分布式與分布式鎖也不是同一回事兒。集群可以指多台服務器實現了同樣的需求,比如有三台Tomcat,都負責查詢模塊;而分布式指多台服務器各自不同的功能點,多台功能的整合對外是一個完整的服務,比如一台Tomcat負責查詢,一台負責下單。

  說回集群,當后端集群要去訪問同一個資源的時候,就需要對該資源加鎖,保證同一時刻只能有一個對象來修改該資源數據,如果不加鎖會導致什么情況呢?

舉一個例子:

  有兩個線程(分別叫T1,T2)做的都是同樣的事情,拿到一個叫做A的資源,然后對其進行+1操作。由於線程之間是不會互相通信的,於是就有可能出現下面這種情況:

    T1拿到A,讀入內存,此時A值為T;

    T2拿到A,讀入內存,此時A值為T;

    T1進行+1操作,此時A實際值為T+1;

    T2進行+1操作,此時A的實際值仍然為T+1;

  然而,此時A經過兩個線程執行+1操作,應該為T+2才對的,所以可以看出,如果沒有分布式鎖,就會出現數據不一致的問題。如果是上面這種簡單的計算還好,如果是你的銀行賬戶,沒用分布式鎖,此時有兩個人給你打錢,結果只有其中一個人的到賬了,另一個人的被作為無主錢財被銀行充公了,肯定是不行的吧。

  所以,保障數據一致性和准確性就是分布式鎖的重要性。


 分布式解決方案

  針對分布式鎖的實現,目前比較常用的有以下幾種方案:

    1:基於數據庫實現分布式鎖 

    2:基於緩存(redis,memcached,tair)實現分布式鎖

      3:基於Zookeeper實現分布式鎖

在分析這幾種實現方案之前我們先來想一下,我們需要的分布式鎖應該是怎么樣的?(這里以方法鎖為例,資源鎖同理)

  • 可以保證在分布式部署的應用集群中,同一個方法在同一時間只能被一台機器上的一個線程執行。
  • 這把鎖要是一把可重入鎖(避免死鎖)
  • 這把鎖最好是一把阻塞鎖(根據業務需求考慮要不要這條)
  • 有高可用的獲取鎖和釋放鎖功能
  • 獲取鎖和釋放鎖的性能要好

基於數據庫實現分布式鎖

基於數據庫表

  要實現分布式鎖,最簡單的方式可能就是直接創建一張鎖表,然后通過操作該表中的數據來實現了。

  當我們要鎖住某個方法或資源時,我們就在該表中增加一條記錄,想要釋放鎖的時候就刪除這條記錄。

  創建這樣一張數據庫表:

CREATE TABLE `database_lock` (
    `id` BIGINT NOT NULL AUTO_INCREMENT,
    `resource` int NOT NULL COMMENT '鎖定的資源',
    `description` varchar(1024) NOT NULL DEFAULT "" COMMENT '描述',
    PRIMARY KEY (`id`),
    UNIQUE KEY `uiq_idx_resource` (`resource`) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='數據庫分布式鎖表';

 當我們想要獲得鎖時,可以插入一條數據:

INSERT INTO database_lock(resource, description) VALUES (1, 'lock');

注意:在表database_lock中,resource字段做了唯一性約束,這樣如果有多個請求同時提交到數據庫的話,數據庫可以保證只有一個操作可以成功(其它的會報錯:ERROR 1062 (23000): Duplicate entry ‘1’ for key ‘uiq_idx_resource’),那么那么我們就可以認為操作成功的那個請求獲得了鎖。

當需要釋放鎖的時,可以刪除這條數據:

DELETE FROM database_lock WHERE resource=1;

這種實現方式非常的簡單,但是需要注意以下幾點:

(1)這種鎖沒有失效時間,一旦釋放鎖的操作失敗就會導致鎖記錄一直在數據庫中,其它線程無法獲得鎖。這個缺陷也很好解決,比如可以做一個定時任務去定時清理。
(2)這種鎖的可靠性依賴於數據庫。建議設置備庫,避免單點,進一步提高可靠性。
(3)這種鎖是非阻塞的,因為插入數據失敗之后會直接報錯,想要獲得鎖就需要再次操作。如果需要阻塞式的,可以弄個for循環、while循環之類的,直至INSERT成功再返回。
(4)這種鎖也是非可重入的,因為同一個線程在沒有釋放鎖之前無法再次獲得鎖,因為數據庫中已經存在同一份記錄了。想要實現可重入鎖,可以在數據庫中添加一些字段,比如獲得鎖的主機信息、線程信息(5)等,那么在再次獲得鎖的時候可以先查詢數據,如果當前的主機信息和線程信息等能被查到的話,可以直接把鎖分配給它。

樂觀鎖  

  顧名思義,系統認為數據的更新在大多數情況下是不會產生沖突的,只在數據庫更新操作提交的時候才對數據作沖突檢測。如果檢測的結果出現了與預期數據不一致的情況,則返回失敗信息。

  樂觀鎖大多數是基於數據版本(version)的記錄機制實現的。何謂數據版本號?即為數據增加一個版本標識,在基於數據庫表的版本解決方案中,一般是通過為數據庫表添加一個 “version”字段來實現讀取出數據時,將此版本號一同讀出,之后更新時,對此版本號加1。在更新過程中,會對版本號進行比較,如果是一致的,沒有發生改變,則會成功執行本次操作;如果版本號不一致,則會更新失敗。

  為了更好的理解數據庫樂觀鎖在實際項目中的使用,這里就列舉一個典型的電商庫存的例子。一個電商平台都會存在商品的庫存,當用戶進行購買的時候就會對庫存進行操作(庫存減1代表已經賣出了一件)。我們將這個庫存模型用下面的一張表optimistic_lock來表述,參考如下:

CREATE TABLE `optimistic_lock` (
    `id` BIGINT NOT NULL AUTO_INCREMENT,
    `resource` int NOT NULL COMMENT '鎖定的資源',
    `version` int NOT NULL COMMENT '版本信息',
    `created_at` datetime COMMENT '創建時間',
    `updated_at` datetime COMMENT '更新時間',
    `deleted_at` datetime COMMENT '刪除時間', 
    PRIMARY KEY (`id`),
    UNIQUE KEY `uiq_idx_resource` (`resource`) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='數據庫分布式鎖表';

其中:id表示主鍵;resource表示具體操作的資源,在這里也就是特指庫存;version表示版本號。

在使用樂觀鎖之前要確保表中有相應的數據,比如:

INSERT INTO optimistic_lock(resource, version, created_at, updated_at) VALUES(20, 1, CURTIME(), CURTIME());

 如果只是一個線程進行操作,數據庫本身就能保證操作的正確性。主要步驟如下:

  STEP1 - 獲取資源:SELECT resource FROM optimistic_lock WHERE id = 1
  STEP2 - 執行業務邏輯
  STEP3 - 更新資源:UPDATE optimistic_lock SET resource = resource -1 WHERE id = 1

  然而在並發的情況下就會產生一些意想不到的問題:比如兩個線程同時購買一件商品,在數據庫層面實際操作應該是庫存(resource)減2,但是由於是高並發的情況,第一個線程執行之后(執行了STEP1、STEP2但是還沒有完成STEP3),第二個線程在購買相同的商品(執行STEP1),此時查詢出的庫存並沒有完成減1的動作,那么最終會導致2個線程購買的商品卻出現庫存只減1的情況。

 在引入了version字段之后,那么具體的操作就會演變成下面的內容:

  STEP1 - 獲取資源: SELECT resource, version FROM optimistic_lock WHERE id = 1
  STEP2 - 執行業務邏輯
  STEP3 - 更新資源:UPDATE optimistic_lock SET resource = resource -1, version = version + 1 WHERE id = 1 AND version = oldVersion

  其實,借助更新時間戳(updated_at)也可以實現樂觀鎖,和采用version字段的方式相似:更新操作執行前線獲取記錄當前的更新時間,在提交更新時,檢測當前更新時間是否與更新開始時獲取的更新時間戳相等。

  樂觀鎖的優點比較明顯,由於在檢測數據沖突時並不依賴數據庫本身的鎖機制,不會影響請求的性能,當產生並發且並發量較小的時候只有少部分請求會失敗。缺點是需要對表的設計增加額外的字段,增加了數據庫的冗余,另外,當應用並發量高的時候,version值在頻繁變化,則會導致大量請求失敗,影響系統的可用性。我們通過上述sql語句還可以看到,數據庫鎖都是作用於同一行數據記錄上,這就導致一個明顯的缺點,在一些特殊場景,如大促、秒殺等活動開展的時候,大量的請求同時請求同一條記錄的行鎖,會對數據庫產生很大的寫壓力。所以綜合數據庫樂觀鎖的優缺點,樂觀鎖比較適合並發量不高,並且寫操作不頻繁的場景。

 

悲觀鎖

  除了可以通過增刪操作數據庫表中的記錄以外,我們還可以借助數據庫中自帶的鎖來實現分布式鎖。在查詢語句后面增加FOR UPDATE,數據庫會在查詢過程中給數據庫表增加悲觀鎖,也稱排他鎖。當某條記錄被加上悲觀鎖之后,其它線程也就無法再改行上增加悲觀鎖。

  悲觀鎖,與樂觀鎖相反,總是假設最壞的情況,它認為數據的更新在大多數情況下是會產生沖突的。

  在使用悲觀鎖的同時,我們需要注意一下鎖的級別。MySQL InnoDB引起在加鎖的時候,只有明確地指定主鍵(或索引)的才會執行行鎖 (只鎖住被選取的數據),否則MySQL 將會執行表鎖(將整個數據表單給鎖住)。

  在使用悲觀鎖時,我們必須關閉MySQL數據庫的自動提交屬性(參考下面的示例),因為MySQL默認使用autocommit模式,也就是說,當你執行一個更新操作后,MySQL會立刻將結果進行提交。

mysql> SET AUTOCOMMIT = 0;
Query OK, 0 rows affected (0.00 sec)

  這樣在使用FOR UPDATE獲得鎖之后可以執行相應的業務邏輯,執行完之后再使用COMMIT來釋放鎖。

  我們不妨沿用前面的database_lock表來具體表述一下用法。假設有一線程A需要獲得鎖並執行相應的操作,那么它的具體步驟如下:

    STEP1 - 獲取鎖:SELECT * FROM database_lock WHERE id = 1 FOR UPDATE;。
    STEP2 - 執行業務邏輯。
    STEP3 - 釋放鎖:COMMIT。
  

  如果另一個線程B在線程A釋放鎖之前執行STEP1,那么它會被阻塞,直至線程A釋放鎖之后才能繼續。注意,如果線程A長時間未釋放鎖,那么線程B會報錯,參考如下(lock wait time可以通過innodb_lock_wait_timeout來進行配置):

ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction

 上面的示例中演示了指定主鍵並且能查詢到數據的過程(觸發行鎖),如果查不到數據那么也就無從“鎖”起了。

 如果未指定主鍵(或者索引)且能查詢到數據,那么就會觸發表鎖,比如STEP1改為執行(這里的version只是當做一個普通的字段來使用,與上面的樂觀鎖無關):

SELECT * FROM database_lock WHERE description='lock' FOR UPDATE;

 或者主鍵不明確也會觸發表鎖,又比如STEP1改為執行:

SELECT * FROM database_lock WHERE id>0 FOR UPDATE;

  注意,雖然我們可以顯示使用行級鎖(指定可查詢的主鍵或索引),但是MySQL會對查詢進行優化,即便在條件中使用了索引字段,但是否真的使用索引來檢索數據是由MySQL通過判斷不同執行計划的代價來決定的,如果MySQL認為全表掃描效率更高,比如對一些很小的表,它有可能不會使用索引,在這種情況下InnoDB將使用表鎖,而不是行鎖。

  在悲觀鎖中,每一次行數據的訪問都是獨占的,只有當正在訪問該行數據的請求事務提交以后,其他請求才能依次訪問該數據,否則將阻塞等待鎖的獲取。悲觀鎖可以嚴格保證數據訪問的安全。但是缺點也明顯,即每次請求都會額外產生加鎖的開銷且未獲取到鎖的請求將會阻塞等待鎖的獲取,在高並發環境下,容易造成大量請求阻塞,影響系統可用性。另外,悲觀鎖使用不當還可能產生死鎖的情況。

 還有一個問題,就是我們要使用排他鎖來進行分布式鎖的lock,那么一個排他鎖長時間不提交,就會占用數據庫連接。一旦類似的連接變得多了,就可能把數據庫連接池撐爆

總結

  總結一下使用數據庫來實現分布式鎖的方式,這兩種方式都是依賴數據庫的一張表,一種是通過表中的記錄的存在情況確定當前是否有鎖存在,另外一種是通過數據庫的排他鎖來實現分布式鎖。

數據庫實現分布式鎖的優點

  直接借助數據庫,容易理解。

數據庫實現分布式鎖的缺點

  會有各種各樣的問題,在解決問題的過程中會使整個方案變得越來越復雜。

  操作數據庫需要一定的開銷,性能問題需要考慮。

  使用數據庫的行級鎖並不一定靠譜,尤其是當我們的鎖表並不大的時候。


 二:基於Zookeeper實現分布式鎖

 ZooKeeper是一個為分布式應用提供一致性服務的開源組件,它內部是一個分層的文件系統目錄樹結構,規定同一個目錄下只能有一個唯一文件名。基於ZooKeeper實現分布式鎖的步驟如下:

(1)創建一個目錄mylock;
(2)線程A想獲取鎖就在mylock目錄下創建臨時順序節點;
(3)獲取mylock目錄下所有的子節點,然后獲取比自己小的兄弟節點,如果不存在,則說明當前線程順序號最小,獲得鎖;
(4)線程B獲取所有節點,判斷自己不是最小節點,設置監聽比自己次小的節點;
(5)線程A處理完,刪除自己的節點,線程B監聽到變更事件,判斷自己是不是最小的節點,如果是則獲得鎖。

可以直接使用zookeeper第三方庫Curator客戶端,這個客戶端中封裝了一個可重入的鎖服務。

  

Curator提供的InterProcessMutex是分布式鎖的實現。acquire方法用戶獲取鎖,release方法用於釋放鎖。

  使用ZK實現的分布式鎖好像完全符合了本文開頭我們對一個分布式鎖的所有期望。但是,其實並不是,Zookeeper實現的分布式鎖其實存在一個缺點,那就是性能上可能並沒有緩存服務那么高。因為每次在創建鎖和釋放鎖的過程中,都要動態創建、銷毀瞬時節點來實現鎖功能。ZK中創建和刪除節點只能通過Leader服務器來執行,然后將數據同不到所有的Follower機器上。

  其實,使用Zookeeper也有可能帶來並發問題,只是並不常見而已。考慮這樣的情況,由於網絡抖動,客戶端可ZK集群的session連接斷了,那么zk以為客戶端掛了,就會刪除臨時節點,這時候其他客戶端就可以獲取到分布式鎖了。就可能產生並發問題。這個問題不常見是因為zk有重試機制,一旦zk集群檢測不到客戶端的心跳,就會重試,Curator客戶端支持多種重試策略。多次重試之后還不行的話才會刪除臨時節點。(所以,選擇一個合適的重試策略也比較重要,要在鎖的粒度和並發之間找一個平衡。)

 

也可以使用創建臨時節點的方式實現分布式鎖,例子:

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ZooKeeperLock implements Watcher {

    private ZooKeeper zk = null;
    private String rootLockNode;            // 鎖的根節點  locks
    private String lockName;                // 競爭資源,用來生成子節點名稱  test1
    private String currentLock;             // 當前鎖
    private String waitLock;                // 等待的鎖(前一個鎖)
    private CountDownLatch countDownLatch;  // 計數器(用來在加鎖失敗時阻塞加鎖線程)
    private int sessionTimeout = 30000;     // 超時時間
    
    // 1. 構造器中創建ZK鏈接,創建鎖的根節點
    public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) {
        this.rootLockNode = rootLockNode;
        this.lockName = lockName;
        try {
            // 創建連接,zkAddress格式為:IP:PORT
            zk = new ZooKeeper(zkAddress,this.sessionTimeout,this);
            // 檢測鎖的根節點是否存在,不存在則創建
            Stat stat = zk.exists(rootLockNode,false);
            if (null == stat) {
                zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    
    // 2. 加鎖方法,先嘗試加鎖,不能加鎖則等待上一個鎖的釋放
    public boolean lock() {
        if (this.tryLock()) {
            System.out.println("線程【" + Thread.currentThread().getName() + "】加鎖(" + this.currentLock + ")成功!");
            return true;
        } else {
            return waitOtherLock(this.waitLock, this.sessionTimeout);
        }
    }
    
    public boolean tryLock() {
        // 分隔符
        String split = "_lock_";
        if (this.lockName.contains("_lock_")) {
            throw new RuntimeException("lockName can't contains '_lock_' ");
        }
        try {
            // 創建鎖節點(臨時有序節點)
            this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("線程【" + Thread.currentThread().getName() 
                        + "】創建鎖節點(" + this.currentLock + ")成功,開始競爭...");
            // 取所有子節點
            List<String> nodes = zk.getChildren(this.rootLockNode, false);
            // 取所有競爭lockName的鎖
            List<String> lockNodes = new ArrayList<String>();
            for (String nodeName : nodes) {
                if (nodeName.split(split)[0].equals(this.lockName)) {
                    lockNodes.add(nodeName);
                }
            }
            Collections.sort(lockNodes);
            // 取最小節點與當前鎖節點比對加鎖
            String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0);
            if (this.currentLock.equals(currentLockPath)) {
                return true;
            }
            // 加鎖失敗,設置前一節點為等待鎖節點
            String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1);
            int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1;
            this.waitLock = lockNodes.get(preNodeIndex);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    private boolean waitOtherLock(String waitLock, int sessionTimeout) {
        boolean islock = false;
        try {
            // 監聽等待鎖節點
            String waitLockNode = this.rootLockNode + "/" + waitLock;
            Stat stat = zk.exists(waitLockNode,true); // watcher等待的節點
            if (null != stat) {
                System.out.println("線程【" + Thread.currentThread().getName() 
                            + "】鎖(" + this.currentLock + ")加鎖失敗,等待鎖(" + waitLockNode + ")釋放...");
                // 設置計數器,使用計數器阻塞線程
                this.countDownLatch = new CountDownLatch(1);
                islock = this.countDownLatch.await(sessionTimeout,TimeUnit.MILLISECONDS);
                this.countDownLatch = null;
                if (islock) {
                    System.out.println("線程【" + Thread.currentThread().getName() + "】鎖(" 
                                + this.currentLock + ")加鎖成功,鎖(" + waitLockNode + ")已經釋放");
                } else {
                    System.out.println("線程【" + Thread.currentThread().getName() + "】鎖(" 
                                + this.currentLock + ")加鎖失敗...");
                }
            } else {
                islock = true;
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return islock;
    }
    
    // 3. 釋放鎖
    public void unlock() throws InterruptedException {
        try {
            Stat stat = zk.exists(this.currentLock,false);
            if (null != stat) {
                System.out.println("線程【" + Thread.currentThread().getName() + "】釋放鎖 " + this.currentLock);
                zk.delete(this.currentLock, -1);
                this.currentLock = null;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        } finally {
            zk.close();
        }
    }
    
    // 4. 監聽器回調
    @Override
    public void process(WatchedEvent watchedEvent) {
        if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) {
            // 計數器減一,恢復線程操作
            this.countDownLatch.countDown();
        }
    }

}
public class Test {
    public static void doSomething() {
        System.out.println("線程【" + Thread.currentThread().getName() + "】正在運行...");
    }

    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            public void run() {
                ZooKeeperLock lock = null;
                lock = new ZooKeeperLock("127.0.0.1:2181","/locks", "test1");
                if (lock.lock()) {    // 獲取zookeeper的節點鎖
                    doSomething();
                    try {
                        Thread.sleep(1000);
                        lock.unlock();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(runnable);
            t.start();
        }
    }
}

console結果:

 

 


 三:基於緩存實現分布式鎖

   相比較於基於數據庫實現分布式鎖的方案來說,基於緩存來實現在性能方面會表現的更好一點。而且很多緩存是可以集群部署的,可以解決單點問題。

  目前有很多成熟的緩存產品,包括Redis,memcached以及我們公司內部的Tair。

  這里以Tair為例來分析下使用緩存實現分布式鎖的方案。關於Redis和memcached在網絡上有很多相關的文章,並且也有一些成熟的框架及算法可以直接使用。  

基於Tair的實現分布式鎖其實和Redis類似,其中主要的實現方式是使用TairManager.put方法來實現。

以上實現方式同樣存在幾個問題:

1、這把鎖沒有失效時間,一旦解鎖操作失敗,就會導致鎖記錄一直在tair中,其他線程無法再獲得到鎖。

2、這把鎖只能是非阻塞的,無論成功還是失敗都直接返回。

3、這把鎖是非重入的,一個線程獲得鎖之后,在釋放鎖之前,無法再次獲得該鎖,因為使用到的key在tair中已經存在。無法再執行put操作。

當然,同樣有方式可以解決。

  • 沒有失效時間?tair的put方法支持傳入失效時間,到達時間之后數據會自動刪除。
  • 非阻塞?while重復執行。
  • 非可重入?在一個線程獲取到鎖之后,把當前主機信息和線程信息保存起來,下次再獲取之前先檢查自己是不是當前鎖的擁有者。

  但是,失效時間我設置多長時間為好?如何設置的失效時間太短,方法沒等執行完,鎖就自動釋放了,那么就會產生並發問題。如果設置的時間太長,其他獲取鎖的線程就可能要平白的多等一段時間。這個問題使用數據庫實現分布式鎖同樣存在

總結

  可以使用緩存來代替數據庫來實現分布式鎖,這個可以提供更好的性能,同時,很多緩存服務都是集群部署的,可以避免單點問題。並且很多緩存服務都提供了可以用來實現分布式鎖的方法,比如Tair的put方法,redis的setnx方法等。並且,這些緩存服務也都提供了對數據的過期自動刪除的支持,可以直接設置超時時間來控制鎖的釋放。

使用緩存實現分布式鎖的優點

  性能好,實現起來較為方便。

使用緩存實現分布式鎖的缺點

  通過超時時間來控制鎖的失效時間並不是十分的靠譜。


 三種方案的比較

  上面幾種方式,哪種方式都無法做到完美。就像CAP一樣,在復雜性、可靠性、性能等方面無法同時滿足,所以,根據不同的應用場景選擇最適合自己的才是王道。

  從理解的難易程度角度(從低到高)

    數據庫 > 緩存 > Zookeeper

  從實現的復雜性角度(從低到高)

    Zookeeper >= 緩存 > 數據庫

  從性能角度(從高到低)

    緩存 > Zookeeper >= 數據庫

  從可靠性角度(從高到低)

    Zookeeper > 緩存 > 數據庫

 

轉自:http://www.hollischuang.com/archives/1716


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM