分布式鎖 & 分布式事務


分布式鎖

1、鎖:

單進程的系統中,存在多線程同時操作一個公共變量,此時需要加鎖對變量進行同步操作,保證多線程的操作線性執行消除並發修改。解決的是單進程中的多線程並發問題。

2、分布式鎖:

只要的應用場景是在集群模式的多個相同服務,可能會部署在不同機器上,解決進程間安全問題,防止多進程同時操作一個變量或者數據庫。解決的是多進程的並發問題。

 一、基於Redis實現

在Redis2.6.12版本之前,使用setnx命令設置key-value、使用expire命令設置key的過期時間獲取分布式鎖,使用del命令釋放分布式鎖,但是這種實現有如下一些問題: 

1. setnx命令設置完key-value后,還沒來得及使用expire命令設置過期時間,當前線程掛掉了,會導致當前線程設置的key一直有效,后續線程無法正常通過setnx獲取鎖,造成死鎖

這個問題很好解決,只因為這兩個不是一個原子操作。2.6.12之后的版本,set命令進行了增強SET key value [EX seconds] [PX milliseconds] [NX|XX]

  • EX second :設置鍵的過期時間為 second 秒。 SET key value EX second 效果等同於 SETEX key second value 。
  • PX millisecond :設置鍵的過期時間為 millisecond 毫秒。 SET key value PX millisecond 效果等同於 PSETEX key millisecond value 。
  • NX :只在鍵不存在時,才對鍵進行設置操作。 SET key value NX 效果等同於 SETNX key value 。
  • XX :只在鍵已經存在時,才對鍵進行設置操作。

2. 在分布式環境下,進程A通過這種實現方式獲取到了鎖,但是在獲取到鎖之后,執行被阻塞,阻塞時間大於key超時時間導致該鎖失效;之后進程B獲取到該鎖,之后進程A恢復執行,執行完成后釋放該鎖,將會把進程B的鎖也釋放掉。也就是把他人的鎖釋放掉的問題,實際上還有另一個問題就是任務完成之前key失效的問題

這個問題也很好解決,只需要在value中存放一個唯一標識符,釋放的時候判斷是不是自己的標識符即可,如果是自己的,就可以釋放

3. 為了實現高可用,將會選擇主從復制機制,但是主從復制機制是異步的,會出現數據不同步的問題,可能導致多個機器的多個線程獲取到同一個鎖。

解決方案是不采用主從復制,使用RedLock算法(官方推薦)

1.獲取當前Unix時間,以毫秒為單位

2.依次嘗試從N個實例,使用相同的key和隨機值獲取鎖。當向Redis設置鎖時,客戶端應該設置一個網絡連接和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間為10秒,則超時時間應該在5-50毫秒之間。這樣可以避免服務器端Redis已經掛掉的情況下,客戶端還在死死地等待響應結果。如果服務器端沒有在規定時間內響應,客戶端應該盡快嘗試另外一個Redis實例

3.客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖使用的時間。當且僅當從大多數(這里是3個節點)的Redis節點都取到鎖,並且使用的時間小於鎖失效時間時,鎖才算獲取成功

4.如果取到了鎖,key的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果)

5.如果因為某些原因,獲取鎖失敗(沒有在至少N/2+1個Redis實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在所有的Redis實例上進行解鎖(即便某些Redis實例根本就沒有加鎖成功)。

這里我提供兩種實現(測試的話新建一個SpringBoot工程,引入spring-boot-starter-data-redis依賴)

利用set

package com.example.demo.utils;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.StringUtils;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 要求:
 * 1. 在分布式系統環境下,一個方法或者變量同一時間只能被一個線程操作
 * 2. 具備鎖失效機制,網絡中斷或宕機無法釋放鎖時,鎖必須被刪除,防止死鎖
 * 3. 具備阻塞鎖特性,即沒有獲取到鎖,則繼續等待獲取鎖
 * 4. 具備非阻塞鎖特性,即沒有獲取到鎖,則直接返回獲取鎖失敗
 * 5. 具備可重入特性,一個線程中可以多次獲取同一把鎖
 *
 * 問題:
 * 1. setnx后發生了異常而沒有設置過期時間,導致key一直有效,造成死鎖(SEX命令代替SEXNX+EXPIRE)
 * 2. 分布式環境下,進程A獲取到了鎖,但是執行時間過長,鎖已經自動失效了,此時進程B獲取到了鎖,然后進程A執行完畢,執行del命令,把鎖釋放掉,結果把進程B的也釋放了,導致不可預知的問題(可以把value設置成自己唯一標識符,這樣只能刪除自己的鎖)
 * 3. Redis集群中主從數據同步是從庫異步讀取主庫,會出現數據不同步的問題,這樣導致多個進程獲取同一把鎖(不采用主從復制,使用RedLock算法)
 * 4. 在任務執行完之前,key失效了怎么辦?(鎖獲取成功后,注冊一個定時任務,每隔一定時間(this.internalLockLeaseTime / 3L)就去續約)
 *
 * 擴展:Redis 集群沒有使用一致性hash, 而是引入了哈希槽的概念。
 * Reds 集群有16384個哈希槽,每個key通過CRC16校驗后對16384取模來決定放置哪個槽.集群的每個節點負責一部分hash槽。這種結構很容易添加或者刪除節點,並且無論是添加刪除或者修改某一個節點,都不會造成集群不可用的狀態。
 * 使用哈希槽的好處就在於可以方便的添加或移除節點。
 * 當需要增加節點時,只需要把其他節點的某些哈希槽挪到新節點就可以了;
 * 當需要移除節點時,只需要把移除節點上的哈希槽挪到其他節點就行了;
 *
 * 兩種實現:
 * 1. 利用set操作,設置失效時間。(問題:不好控制鎖的失效時間,可能在key失效之前,同步任務沒有完成)
 * 2. 利用setnx、get、getset操作,不設置失效時間,失效時間作為value存儲
 *
 *
 * SET key value [EX seconds] [PX milliseconds] [NX|XX]
 * EX-過期時間秒;PX-過期時間毫秒;NX-key不存在則設置成功;XX-key存在則設置成功
 *
 * GETSET key value(原子操作)
 * 將給定 key 的值設為 value ,並返回 key 的舊值(old value)。
 */
@Slf4j
public class RedisLockUtils {

    // key失效時間:60000毫秒
    private static final Long EXPIRE_TIME = 60L * 1000;

    // 線程等待時間:10000毫秒
    private static final Long WAIT_MAX_TIMEOUT = 10L * 1000;

    /**
     * 獲取鎖
     * @param redis     redis服務
     * @param key       key
     * @param ownerFlag 鎖擁有者標識,防止把其他人的釋放了
     * @param operation 業務邏輯
     * @param <T>
     * @return
     */
    public static <T> T executeOperation(StringRedisTemplate redis, String key, String ownerFlag, RedisLockUtils.Operation<T> operation) {
        validParam(redis, key, ownerFlag);
        long start = System.currentTimeMillis();
        boolean locked = false;
        T resultObj;

        try {
            Random rand = new Random();
            for (;;) {
                BoundValueOperations<String, String> ops = redis.boundValueOps(key);
                // 設置key
                if (ops.setIfAbsent(ownerFlag, EXPIRE_TIME, TimeUnit.SECONDS)) {
                    locked = true;
                    break;
                }
                // 超時
                if ((System.currentTimeMillis() - start) >= WAIT_MAX_TIMEOUT){
                    break;
                }
                // 獲取不到鎖,睡眠隨機時間
                int sleepTime = rand.nextInt(1001);  // 取值:0-1000毫秒
                Thread.sleep(sleepTime);
            }
            // 執行業務邏輯
            resultObj = operation.invoke(locked);

            if (log.isDebugEnabled()) {
                log.debug("executeOperation success");
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            // 獲取鎖持有者的標識
            String v = redis.boundValueOps(key).get();
            log.info("release lock,value:{},ownerFlag:{}", v, ownerFlag);
            // 只能釋放自己的
            if (ownerFlag.equals(v)){
                redis.boundValueOps(key).expire(0L, TimeUnit.SECONDS);
            }
        }

        return resultObj;
    }

    private static void validParam(StringRedisTemplate redis, String key, String ownerFlag){
        if (redis == null){
            throw new RuntimeException("redisService can't be null");
        }
        if (StringUtils.isEmpty(key)){
            throw new RuntimeException("key can't be null");
        }
        if (StringUtils.isEmpty(ownerFlag)){
            throw new RuntimeException("ownerFlag can't be null");
        }
    }

    @FunctionalInterface
    public interface Operation<T>{
        T invoke(boolean locked);
    }
}

測試

package com.example.demo;

import com.example.demo.utils.RedisDistributeLock;
import com.example.demo.utils.RedisLockUtils;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.*;

@SpringBootTest
class DemoApplicationTests {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    // 共享變量
    private int share = 0;

    @Test
    void testOne() throws InterruptedException {
        // 線程數量
        int threadCount = 100;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < threadCount; i++){
            executorService.execute(() -> {
                try {
                    // 讓所有線程同時開始執行
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                Integer result = RedisLockUtils.executeOperation(stringRedisTemplate, "redis_lock", Thread.currentThread().getName(), locked -> {
                    if (locked) {
                        return share++;
                    }
                    return -1;
                });
                System.out.println(result);
                countDownLatch.countDown();
            });
        }
        // 為了等線程全部執行完畢才離開測試代碼塊
        countDownLatch.await();
        System.out.println("==================================");
    }
}

輸出:

利用setnx + get + getset

package com.example.demo.utils;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.Random;
import java.util.concurrent.TimeUnit;

@Slf4j
public class RedisDistributeLock {

    /**
     * 鎖超時時間,防止線程在入鎖以后,無限的執行等待
     */
    private static int EXPIRE_TIMEOUT = 60 * 1000;

    /**
     * 鎖等待時間,防止線程飢餓
     */
    private static int WAIT_TIMEOUT = 10 * 1000;

    public static <T> T executeOperation(StringRedisTemplate redisService, String lockKey, RedisDistributeLock.Operation<T> operation) {
        T resultObj;
        long start = System.currentTimeMillis();
        Random rand = new Random();
        boolean locked = false;

        try {
            for (;;) {
                long expireTime = System.currentTimeMillis() + EXPIRE_TIMEOUT + 1;
                String expireValue = String.valueOf(expireTime); // 鎖到期時間

                // 1. setnx:嘗試獲取鎖
                if (redisService.boundValueOps(lockKey).setIfAbsent(expireValue)) {
                    // 設置成功,獲取到鎖
                    locked = true;
                    break;
                }

                // 2. get:獲取已占用的鎖設置的value【走到這,說明上一步中鎖被占用】
                String currentExpireValue = redisService.boundValueOps(lockKey).get(); // redis里的時間

                // redis里的時間小於當前時間【說明已經超時,其它線程可以搶占了】
                if (currentExpireValue != null && Long.parseLong(currentExpireValue) < System.currentTimeMillis()) {
                    // 3. getset:設置新時間,返回舊時間
                    String oldExpireValue = redisService.boundValueOps(lockKey).getAndSet(expireValue);

                    // 只有一個線程能成功getset,當get的返回值和getset的返回值一樣,才能獲取到鎖,如果不一致,則證明被其它線程獲取了
                    if (oldExpireValue != null && oldExpireValue.equals(currentExpireValue)) {
                        // getset成功,獲取到鎖
                        locked = true;
                        break;
                    }
                }

                // 4. 獲取鎖超時
                if ((System.currentTimeMillis() - start) >= WAIT_TIMEOUT){
                    break;
                }
                // 獲取不到鎖,睡眠隨機時間
                int sleepTime = rand.nextInt(1001);  // 取值:0-1000毫秒
                Thread.sleep(sleepTime);
            }

            // 執行業務邏輯
            resultObj = operation.invoke(locked);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            // 5. 最后,如果未超時,則刪除鎖
            if (System.currentTimeMillis() < Long.parseLong(redisService.boundValueOps(lockKey).get())) {
                redisService.boundValueOps(lockKey).expire(0L, TimeUnit.SECONDS);
            }
        }
        return resultObj;
    }
    
    @FunctionalInterface
    public interface Operation<T>{
        T invoke(boolean locked);
    }

}

測試

package com.example.demo;

import com.example.demo.utils.RedisDistributeLock;
import com.example.demo.utils.RedisLockUtils;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.*;

@SpringBootTest
class DemoApplicationTests {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    // 共享變量
    private int share = 0;

    @Test
    void testTwo() throws InterruptedException {
        // 線程數量
        int threadCount = 100;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < threadCount; i++){
            executorService.execute(() -> {
                try {
                    // 讓所有線程同時開始執行
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                Integer result = RedisDistributeLock.executeOperation(stringRedisTemplate, "redis_lock_time", locked -> {
                    if (locked) {
                        return share++;
                    }
                    return -1;
                });
                System.out.println(result);
                countDownLatch.countDown();
            });
        }
        // 為了等線程全部執行完畢才離開測試代碼塊
        countDownLatch.await();
        System.out.println("==================================");
    }

}

輸出:

二、基於ZooKeeper的實現方式

利用zookeeper規定同一個目錄下只能有一個唯一文件名的特性。

1. 創建一個目錄mylock; 

2. 線程A想獲取鎖就在mylock目錄下創建臨時順序節點;

3. 獲取mylock目錄下所有的子節點,然后獲取比自己小的兄弟節點,如果不存在,則說明當前線程順序號最小,獲得鎖;

4. 線程B獲取所有節點,判斷自己不是最小節點,設置監聽比自己次小的節點;

5. 線程A處理完,刪除自己的節點,線程B監聽到變更事件,判斷自己是不是最小的節點,如果是則獲得鎖。

------------------------------------------------------------------------------------------------

另附:Apache的開源ZooKeeper客戶端Curator,提供了分布式鎖的實現 InterProcessMutex,acquire方法用於獲取鎖,release方法用於釋放鎖。

優點:具備高可用、可重入、阻塞鎖特性,可解決失效死鎖問題。

缺點:因為需要頻繁的創建和刪除節點,性能上不如Redis方式。

分布式事務

1、事務

解決一個會話過程中,上下文的修改對所有數據庫表的操作要么全部成功,要不全部失敗。所以應用在service層。解決的是一個會話中的操作的數據一致性。

2、分布式事務

比如一個轉賬操作:

(1)招行賬戶減少100

(2)建行賬戶增加100

這時候需要保證對兩個服務的操作全部成功或者全部回退。解決的是組合服務的數據操作的一致性問題。

ACID

數據庫事務的四大特性(ACID):原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)、持久性(Durability)。

⑴ 原子性(Atomicity)
一個事務內所有操作共同組成一個原子包,要么全部成功,要么全部失敗。

⑵ 一致性(Consistency)
數據庫事務的一致性就規定了事物提交前后,永遠只可能存在事物提交前的狀態和事物提交后的狀態,從一個一致性的狀態到另一個一致性狀態,而不可能出現中間的過程態。也就是說事物的執行結果是量子化狀態,而不是線性狀態。

⑶ 隔離性(Isolation)
事務的隔離性,基於原子性和一致性,因為事務是原子化,量子化的,所以,事務可以有多個原子包的形式並發執行,但是,每個事務互不干擾。

⑷ 持久性(Durability)
持久性,當一個事物提交之后,數據庫狀態永遠的發生了改變。

CAP

一個分布式系統不可能同時滿足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance),最多滿足其中兩項。

一致性:指的是在分布式環境下,數據在多個副本之間是否能夠保持一致的特性。比如系統的數據副本分布在不同節點上,對第一個節點的數據進行更新后,其它的節點也應該得到相應的更新。

可用性:指的是系統提供的服務必須處於一直可用的狀態。

分區容錯性:分布式系統在遇到任何網絡分區故障的時候,仍然需要保證對外提供滿足一致性和可用性的服務,除非是整個網絡環境都發生了故障。

BASE

BASE理論:是Basically Available(基本可用),Soft state(軟狀態),Eventually consistent(最終一致性)三個短語的簡寫,其核心思想是,即使無法做到強一致性,每個應用也應該采用適當的方式來達到最終一致性。

基本可用:出現不可預知的故障,允許損失部分可用性。比如發生故障的時候比正常慢了1秒。

軟狀態:允許系統在不同的節點進行數據同步的過程中存在延時。

最終一致性:系統中所有副本,在經過一段時間的同步后,最終能達到一致性的狀態。

一致性解決方案

2PC(二階段提交)

  • 第一階段是表決階段,所有參與者都將本事務能否成功的信息反饋發給協調者;
  • 第二階段是執行階段,協調者根據所有參與者的反饋,通知所有參與者,步調一致地在所有分支上提交或者回滾;

3PC(三階段提交,2PC的改進版本)

  • CanCommit:
  1. 事務詢問:協調者向參與者發送一個包含事務內容的 canCommit 請求,詢問是否可以執行事務提交操作,並等待響應。
  2. 各參與者向協調者反饋事務詢問的響應,如果參與者認為自己可以順利執行事務,就返回 Yes,否則反饋 No 響應。
  • PreCommit

  協調者在得到所有參與者的響應之后,會根據結果執行2種操作:執行事務預提交,或者中斷事務。

  1. 執行事務預提交:

    首先協調者向所有參與者節點發出 preCommit 的請求,並等待響應。然后參與者受到 preCommit 請求后,會執行事務操作,並將結果返回。最后協調者得到了Ack響應,確定下一階段是否為提交或者是終止操作。

  2. 中斷事務也分為2個步驟:

    首先協調者向所有參與者節點發出 abort 請求 。然后參與者如果收到 abort 請求或者超時了,都會中斷事務。

  • do Commit

  1. 執行提交:

    首先協調者發送提交請求,並等待Ack 響應。然后參與者收到 doCommit 請求后,執行事務並反饋事務提交結果,向協調者發送 Ack 消息。最后協調者接收 Ack 消息后,完成事務。

  2. 中斷事務:

    中斷事務是因為出現了異常,比如協調者一方出現了問題,或者是協調者與參與者之間出現了故障。

    首先協調者向所有的參與者發送中斷請求。然后參與者接收到中斷請求后,會利用其在二階段記錄的 undo 信息來執行事務回滾操作,並釋放資源。接下來參與者在完成事務回滾之后,向協調者發送 Ack 消息。最后協調者接收到所有的 Ack 消息后,中斷事務。

TCC

TCC 其實就是采用的補償機制,其核心思想是:針對每個操作,都要注冊一個與其對應的確認和補償(撤銷)操作。

  • Try 階段主要是對業務系統做檢測及資源預留
  • Confirm 階段主要是對業務系統做確認提交,Try階段執行成功並開始執行 Confirm階段時,默認 Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。
  • Cancel 階段主要是在業務執行錯誤,需要回滾的狀態下執行的業務取消,預留資源釋放。

可靠消息最終一致性

可靠消息最終一致性事務適合執行周期長且實時性要求不高的場景,引入消息機制后,同步的事務操作變為基於消息執行的異步操作,避免分布式事務中的同步阻塞操作的影響,並實現了兩個服務的解耦。

比如:上游應用業務執行成功后發送消息,消息系統保證消息的傳遞,下游應用監聽消息並執行相關業務。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM