Redis分布式鎖—Redisson+RLock可重入鎖實現篇


前言

平時的工作中,由於生產環境中的項目是需要部署在多台服務器中的,所以經常會面臨解決分布式場景下數據一致性的問題,那么就需要引入分布式鎖來解決這一問題。

針對分布式鎖的實現,目前比較常用的就如下幾種方案:

  1. 基於數據庫實現分布式鎖
  2. 基於 Redis 實現分布式鎖 【本文】
  3. 基於 Zookeeper 實現分布式鎖

接下來這個系列文章會跟大家一塊探討這三種方案,本篇為 Redis 實現分布式鎖篇。

Redis分布式環境搭建推薦:基於Docker的Redis集群搭建

Redis分布式鎖一覽

說到 Redis 鎖,能搜到的,或者說常用的無非就下面這兩個:

  • setNX + Lua腳本
  • Redisson + RLock可重入鎖 【本文】

接下來我們一一探索這兩個的實現,本文為 Redisson + RLock可重入鎖 實現篇。

1、setNX+Lua實現方式

跳轉鏈接:https://www.cnblogs.com/niceyoo/p/13711149.html

2、Redisson介紹

Redisson 是 java 的 Redis 客戶端之一,是 Redis 官網推薦的 java 語言實現分布式鎖的項目。

Redisson 提供了一些 api 方便操作 Redis。因為本文主要以鎖為主,所以接下來我們主要關注鎖相關的類,以下是 Redisson 中提供的多樣化的鎖:

  • 可重入鎖(Reentrant Lock)
  • 公平鎖(Fair Lock)
  • 聯鎖(MultiLock)
  • 紅鎖(RedLock)
  • 讀寫鎖(ReadWriteLock)
  • 信號量(Semaphore) 等等

總之,管你了解不了解,反正 Redisson 就是提供了一堆鎖... 也是目前大部分公司使用 Redis 分布式鎖最常用的一種方式。

本文中 Redisson 分布式鎖的實現是基於 RLock 接口,而 RLock 鎖接口實現源碼主要是 RedissonLock 這個類,而源碼中加鎖、釋放鎖等操作都是使用 Lua 腳本來完成的,並且封裝的非常完善,開箱即用。

接下來主要以 Redisson 實現 RLock 可重入鎖為主。

代碼中實現過程

一起來看看在代碼中 Redisson 怎么實現分布式鎖的,然后再對具體的方法進行解釋。

源碼地址:https://github.com/niceyoo/redis-redlock

篇幅限制,文中代碼不全,請以上方源碼鏈接為主。

代碼大致邏輯:首先會涉及數據庫 2 個表,order2(訂單表)、stock(庫存表),controller層會提供一個創建訂單的接口,創建訂單之前,先獲取 RedLock 分布式鎖,獲取鎖成功后,在一個事務下減庫存,創建訂單;最后通過創建大於庫存的並發數模擬是否出現超賣的情況。

代碼環境:SpringBoot2.2.2.RELEASE + Spring Data JPA + Redisson

1)Maven 依賴 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>redis-redlock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>redis-redlock</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
        </dependency>

        <!-- redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.11.1</version>
        </dependency>

        <!-- Gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>

        <!-- JPA -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <!-- Mysql Connector -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
        </dependency>

        <!-- 數據庫連接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.20</version>
        </dependency>

        <!-- Hutool工具包 -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>4.6.8</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

redisson、MySQL 等相關依賴。

2)application.yml 配置文件
server:
  port: 6666
  servlet:
    context-path: /

spring:
  # 數據源
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/redis_demo?useUnicode=true&characterEncoding=utf-8&useSSL=false
    username: root
    password: 123456
    type: com.alibaba.druid.pool.DruidDataSource
    driverClassName: com.mysql.jdbc.Driver
    logSlowSql: true
  jpa:
    # 顯示sql
    show-sql: false
    # 自動生成表結構
    generate-ddl: true
    hibernate:
      ddl-auto: update
  redis:
    redis:
      cluster:
        nodes: 10.211.55.4:6379, 10.211.55.4:6380, 10.211.55.4:6381
      lettuce:
        pool:
          min-idle: 0
          max-idle: 8
          max-active: 20

# 日志
logging:
  # 輸出級別
  level:
    root: info
  file:
    # 指定路徑
    path: redis-logs
    # 最大保存天數
    max-history: 7
    # 每個文件最大大小
    max-size: 5MB

配置redis,指定數據庫地址。

3)Redisson配置類 RedissonConfig.java
/**
 * redisson配置類
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
       Config config = new Config();
       config.useClusterServers()
               .setScanInterval(2000)
               .addNodeAddress("redis://10.211.55.4:6379""redis://redis://10.211.55.4:6380")
               .addNodeAddress("redis://redis://10.211.55.4:6381");
       RedissonClient redisson = Redisson.create(config);
       return redisson;
    }

}
4)StockServerImpl 庫存實現類,其他參考源碼
import com.example.redisredlock.bean.Stock;
import com.example.redisredlock.dao.StockDao;
import com.example.redisredlock.server.StockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@Transactional
public class StockServerImpl implements StockService {

    @Autowired
    private StockDao stockDao;

    @Override
    public StockDao getRepository() {
        return stockDao;
    }

    /**
     * 減庫存
     *
     * @param productId
     * @return
     */
    @Override
    public boolean decrease(String productId) {
        Stock one = stockDao.getOne(productId);
        int stockNum = one.getStockNum() - 1;
        one.setStockNum(stockNum);
        stockDao.saveAndFlush(one);
        return true;
    }
}

庫存實現類,就一個接口,完成對庫存的-1操作。

5)OrderServerImpl 訂單實現類(核心代碼)
package com.example.redisredlock.server.impl;

import com.example.redisredlock.bean.Order;
import com.example.redisredlock.dao.OrderDao;
import com.example.redisredlock.server.OrderServer;
import com.example.redisredlock.server.StockService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
@Transactional
public class OrderServerImpl implements OrderServer {

    /**
     * 庫存service
     */
    @Resource
    private StockService stockService;

    /**
     * 訂單order dao
     */
    @Resource
    private OrderDao orderDao;

    @Override
    public OrderDao getRepository() {
        return orderDao;
    }

    @Resource
    private RedissonClient redissonClient;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean createOrder(String userId, String productId) {

        /**  如果不加鎖,必然超賣 **/
        RLock lock = redissonClient.getLock("stock:" + productId);

        try {
            lock.lock(10, TimeUnit.SECONDS);

            int stock = stockService.get(productId).getStockNum();
            log.info("剩余庫存:{}", stock);
            if (stock <= 0) {
                return false;
            }

            String orderNo = UUID.randomUUID().toString().replace("-""").toUpperCase();
       /** 減庫存操作 **/
            if (stockService.decrease(productId)) {
                Order order = new Order();
                order.setUserId(userId);
                order.setProductId(productId);
                order.setOrderNo(orderNo);
                Date now = new Date();
                order.setCreateTime(now);
                order.setUpdateTime(now);
                orderDao.save(order);
                return true;
            }

        } catch (Exception ex) {
            log.error("下單失敗", ex);
        } finally {
            lock.unlock();
        }

        return false;
    }
}
6)Order 訂單實體類
@Data
@Entity
@Table(name = "order2")
public class Order extends BaseEntity {

    private static final long serialVersionUID = 1L;

    /**
     * 訂單編號
     */
    private String orderNo;

    /**
     * 下單用戶id
     */
    private String userId;

    /**
     * 產品id
     */
    private String productId;

}
7)Stock 庫存實體類
@Data
@Entity
@Table(name = "stock")
public class Stock extends BaseEntity {

    private static final long serialVersionUID = 1L;

    /**
     * 用產品id,設置為庫存id
     */

    /**
     * 庫存數量
     */
    private Integer stockNum;

}
8)OrderController 訂單接口
package com.example.redisredlock.controller;

import com.example.redisredlock.bean.Order;
import com.example.redisredlock.server.OrderServer;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

/**
 * @author niceyoo
 */
@RestController
@RequestMapping("/order")
public class OrderController {

    @Resource
    private OrderServer orderServer;

    @PostMapping("/createOrder")
    public boolean createOrder(Order order) {
        return orderServer.createOrder(order.getUserId(), order.getProductId());
    }

}

表結構說明及接口測試部分

因為項目中使用 Spring Data JPA,所以會自動創建數據庫表結構,大致為:

stock(庫存表)

id(商品id) stock_num(庫存數量) create_time(創建時間) update_time(更新時間)
1234 100 xxxx xxxx

order2(訂單表)

id(訂單id) order_no(訂單號) user_id(用戶id) product_id(商品id)
xxxx xxxx xxxx 1234

如下是詳細表結構+數據:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for order2
-- ----------------------------
DROP TABLE IF EXISTS `order2`;
CREATE TABLE `order2` (
  `id` varchar(64) NOT NULL,
  `create_time` datetime(6) DEFAULT NULL,
  `update_time` datetime(6) DEFAULT NULL,
  `order_no` varchar(255) DEFAULT NULL,
  `user_id` varchar(64) DEFAULT NULL,
  `product_id` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for stock
-- ----------------------------
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
  `id` varchar(255) NOT NULL,
  `create_time` datetime(6) DEFAULT NULL,
  `update_time` datetime(6) DEFAULT NULL,
  `stock_num` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of stock
-- ----------------------------
BEGIN;
INSERT INTO `stock` VALUES ('1234''2020-09-21 21:38:09.000000''2020-09-22 08:32:17.883000', 0);
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;

創建訂單的過程就是消耗庫存表 stock_num 的過程,如果沒有分布式鎖的情況下,在高並發下很容易出現商品超賣的情況,所以引入了分布式鎖的概念,如下是在庫存100,並發1000的情況下,測試超賣情況:

JMeter 模擬進程截圖

image-20200922091853399

JMeter 調用接口截圖

image-20200922092802604

stock 庫存表截圖

image-20200922092229658

訂單表截圖

WX20200922-092627@2x

加了鎖之后並沒有出現超賣情況。

核心代碼說明

整個 demo 核心代碼在創建訂單 createOrder() 加鎖的過程,如下:

@Override
@Transactional(rollbackFor = Exception.class)
public boolean createOrder(String userId, String productId) {

    //  如果不加鎖,必然超賣
    RLock lock = redissonClient.getLock("stock:" + productId);

    try {
        lock.lock(10, TimeUnit.SECONDS);

        int stock = stockService.get(productId).getStockNum();
        log.info("剩余庫存:{}", stock);
        if (stock <= 0) {
            return false;
        }

        String orderNo = UUID.randomUUID().toString().replace("-""").toUpperCase();

        if (stockService.decrease(productId)) {
            Order order = new Order();
            order.setUserId(userId);
            order.setProductId(productId);
            order.setOrderNo(orderNo);
            Date now = new Date();
            order.setCreateTime(now);
            order.setUpdateTime(now);
            orderDao.save(order);
            return true;
        }

    } catch (Exception ex) {
        log.error("下單失敗", ex);
    } finally {
        lock.unlock();
    }

    return false;
}

去除業務邏輯,加鎖框架結構為:

RLock lock = redissonClient.getLock("xxx");

lock.lock();

try {
    ...
} finally {
    lock.unlock();
}

關於 RedLock 中的方法

因為 RLock 本身繼承自 Lock 接口,如下分為兩部分展示:

public interface RLock extends Lock, RLockAsync {

    //----------------------Lock接口方法-----------------------

    /**
     * 加鎖 鎖的有效期默認30秒
     */
    void lock();
    
    /**
     * tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false .
     */
    boolean tryLock();
    
    /**
     * tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區別在於這個方法在拿不到鎖時會等待一定的時間,
     * 在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true
     *
     * @param time 等待時間
     * @param unit 時間單位 小時、分、秒、毫秒等
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
    /**
     * 解鎖
     */
    void unlock();
    
    /**
     * 中斷鎖 表示該鎖可以被中斷 假如A和B同時調這個方法,A獲取鎖,B為獲取鎖,那么B線程可以通過
     * Thread.currentThread().interrupt(); 方法真正中斷該線程
     */
    void lockInterruptibly();

    //----------------------RLock接口方法-----------------------
    /**
     * 加鎖 上面是默認30秒這里可以手動設置鎖的有效時間
     *
     * @param leaseTime 鎖有效時間
     * @param unit      時間單位 小時、分、秒、毫秒等
     */
    void lock(long leaseTime, TimeUnit unit);
    
    /**
     * 這里比上面多一個參數,多添加一個鎖的有效時間
     *
     * @param waitTime  等待時間
     * @param leaseTime 鎖有效時間
     * @param unit      時間單位 小時、分、秒、毫秒等
     */
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    
    /**
     * 檢驗該鎖是否被線程使用,如果被使用返回True
     */
    boolean isLocked();
    
    /**
     * 檢查當前線程是否獲得此鎖(這個和上面的區別就是該方法可以判斷是否當前線程獲得此鎖,而不是此鎖是否被線程占有)
     * 這個比上面那個實用
     */
    boolean isHeldByCurrentThread();
    
    /**
     * 中斷鎖 和上面中斷鎖差不多,只是這里如果獲得鎖成功,添加鎖的有效時間
     * @param leaseTime  鎖有效時間
     * @param unit       時間單位 小時、分、秒、毫秒等
     */
    void lockInterruptibly(long leaseTime, TimeUnit unit);  
}

1、加鎖

首先重點在 getLock() 方法,到底是怎么拿到分布式鎖的,我們點進該方法:

public RLock getLock(String name) {
    return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

調用 getLock() 方法后實際返回一個 RedissonLock 對象,此時就有點呼應了,文章前面提到的 Redisson 普通的鎖實現源碼主要是 RedissonLock 這個類,而源碼中加鎖、釋放鎖等操作都是使用 Lua 腳本來完成的,封裝的非常完善,開箱即用。

在 RedissonLock 對象中,主要實現 lock() 方法,而 lock() 方法主要調用 tryAcquire() 方法:

image-20200922094539784

tryAcquire() 方法又繼續調用 tryAcquireAsync() 方法:

image-20200922094802951

到這,由於 leaseTime == -1,於是又調用 tryLockInnerAsync()方法,感覺有點無限套娃那種感覺了...

咳咳,不過這個方法是最關鍵的了:

image-20200922100756048

這個方法就有點意思了,看到了一些熟悉的東西,還記得上一篇里的 Lua 腳本嗎?

我們來分析一下這個部分的 Lua 腳本:

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
     "if (redis.call('exists', KEYS[1]) == 0) then " +
         "redis.call('hset', KEYS[1], ARGV[2], 1); " +
         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
         "return nil; " +
     "end; " +
     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
         "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
         "return nil; " +
     "end; " +
     "return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

腳本里,一共是有兩個參數 KEYS[1]、通過后面的參數可以得知: KEYS[1] 為 getName(),ARGV[2] 為 getLockName(threadId)。

假設傳遞加鎖參數時傳入的 name 值為 "niceyoo",

假設線程調用的 ID 為 thread-1,

假設 RedissonLock 類的成員變量 UUID 類型的 id 值為 32063ed-98522fc-80287ap,

結合 getLockName(threadId)) 方法:

protected String getLockName(long threadId) {
    return this.id + ":" + threadId;
}

即,KEYS[1] = niceyoo,ARGV[2] = 32063ed-98522fc-80287ap:thread-1

然后將假設值帶入語句中:

  1. 判斷是否存在名為 “niceyoo” 的 key;
  2. 如果沒有,則在其下設置一個字段為 “32063ed-98522fc-80287ap:thread-1”,值為 1 的鍵值對 ,並設置它的過期時間,也就是第一個 if 語句體;
  3. 如果存在,則進一步判斷 “32063ed-98522fc-80287ap:thread-1” 是否存在,若存在,則其值加 1,並重新設置過期時間,這個過程可以看做鎖重入;
  4. 返回 “niceyoo” 的生存時間(毫秒);

如果放在鎖這個場景下就是,key 表示當前線程名稱,argv 為當前獲得鎖的線程,所有競爭這把鎖的線程都要判斷這個 key 下有沒有自己的,也就是上邊那些 if 判斷,如果沒有就不能獲得鎖,如果有,則進入重入鎖,字段值+1。

2、解鎖

解鎖調用的是 unlockInnerAsync() 方法:

image-20200922103915505

該方法同樣還是調用的 Lua 腳本實現的。

同樣還是假設 name=niceyoo,假設線程 ID 是 thread-1

同理,我們可以得到:

KEYS[1] 是 getName(),即 KEYS[1]=niceyoo,

KEYS[2] 是 getChannelName(),即 KEYS[2]=redisson_lock__channel:{niceyoo},

ARGV[1] 是 LockPubSub.unlockMessage,即ARGV[1]=0,

ARGV[2] 是生存時間,

ARGV[3] 是 getLockName(threadId),即 ARGV[3]=32063ed-98522fc-80287ap:thread-1

因此,上面腳本的意思是:

  1. 判斷是否存在 name 為 “niceyoo” 的key;

  2. 如果不存在,向 Channel 中廣播一條消息,廣播的內容是0,並返回1

  3. 如果存在,進一步判斷字段 32063ed-98522fc-80287ap:thread-1 是否存在

  4. 若字段不存在,返回空,若字段存在,則字段值減1

  5. 若減完以后,字段值仍大於0,則返回0

  6. 減完后,若字段值小於或等於0,則廣播一條消息,廣播內容是0,並返回1;

可以猜測,廣播0表示資源可用,即通知那些等待獲取鎖的線程現在可以獲得鎖了。

3、加鎖解鎖小結

4、其他補充

4.1 lock() 方法

通常在獲得 RLock 時,需要調用 lock() 方法,那么設置過期時間跟不設置有啥區別:

RLock lock = redissonClient.getLock("xxx");

/*最常見的使用方法*/
lock.lock();

如果沒有設置過期時間,默認還是會有一個30秒的過期時間,等價於:

RLock lock = redissonClient.getLock("xxx");

/*支持過期解鎖,30秒之后自動釋放鎖,無須調用unlock方法手動解鎖*/
lock.lock(30, TimeUnit.SECONDS);
4.1 tryLock() 方法

有的小伙在在獲取分布式鎖時,使用的是 tryLock() 方法,跟 lock() 方法有啥區別:

RLock lock = redissonClient.getLock("xxx");

/*嘗試加鎖,最多等待10秒,上鎖以后10秒自動解鎖,返回true表示加鎖成功*/
if(lock.tryLock(10,10, TimeUnit.SECONDS)){
 xxx
}

首先我們來看一下 tryLock() 方法源碼:

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    //1、 獲取鎖同時獲取成功的情況下,和lock(...)方法是一樣的 直接返回True,獲取鎖False再往下走
    if (ttl == null) {
        return true;
    }
    //2、如果超過了嘗試獲取鎖的等待時間,當然返回false 了。
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        acquireFailed(threadId);
        return false;
    }

    // 3、訂閱監聽redis消息,並且創建RedissonLockEntry,其中RedissonLockEntry中比較關鍵的是一個 Semaphore屬性對象,用來控制本地的鎖請求的信號量同步,返回的是netty框架的Future實現。
    final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    //  阻塞等待subscribe的future的結果對象,如果subscribe方法調用超過了time,說明已經超過了客戶端設置的最大wait time,則直接返回false,取消訂閱,不再繼續申請鎖了。
    //  只有await返回true,才進入循環嘗試獲取鎖
    if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                @Override
                public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                    if (subscribeFuture.isSuccess()) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                }
            });
        }
        acquireFailed(threadId);
        return false;
    }

   //4、如果沒有超過嘗試獲取鎖的等待時間,那么通過While一直獲取鎖。最終只會有兩種結果
    //1)、在等待時間內獲取鎖成功 返回true。2)等待時間結束了還沒有獲取到鎖那么返回false
    while (true) {
        long currentTime = System.currentTimeMillis();
        ttl = tryAcquire(leaseTime, unit, threadId);
        // 獲取鎖成功
        if (ttl == null) {
            return true;
        }
       //   獲取鎖失敗
        time -= System.currentTimeMillis() - currentTime;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
    }
}

tryLock() 方法是申請鎖並返回鎖有效期還剩的時間,如果為空說明鎖未被其他線程申請,那么就直接獲取鎖並返回,如果獲取到時間,則進入等待競爭邏輯。

tryLock() 方法一般用於特定滿足需求的場合,但不建議作為一般需求的分布式鎖,一般分布式鎖建議用 lock(long leaseTime, TimeUnit unit) 方法。因為從性能上考慮,在高並發情況下后者效率是前者的好幾倍。

Redis分布式鎖的缺點

在上一節中我們提到了 「setNX+Lua腳本」實現分布式鎖在集群模式下的缺陷,

我們再來回顧一下,通常我們為了實現 Redis 的高可用,一般都會搭建 Redis 的集群模式,比如給 Redis 節點掛載一個或多個 slave 從節點,然后采用哨兵模式進行主從切換。但由於 Redis 的主從模式是異步的,所以可能會在數據同步過程中,master 主節點宕機,slave 從節點來不及數據同步就被選舉為 master 主節點,從而導致數據丟失,大致過程如下:

  1. 用戶在 Redis 的 master 主節點上獲取了鎖;
  2. master 主節點宕機了,存儲鎖的 key 還沒有來得及同步到 slave 從節點上;
  3. slave 從節點升級為 master 主節點;
  4. 用戶從新的 master 主節點獲取到了對應同一個資源的鎖,同把鎖獲取兩次。

ok,然后為了解決這個問題,Redis 作者提出了 RedLock 算法,步驟如下(五步):

在下面的示例中,我們假設有 5 個完全獨立的 Redis Master 節點,他們分別運行在 5 台服務器中,可以保證他們不會同時宕機。

  1. 獲取當前 Unix 時間,以毫秒為單位。
  2. 依次嘗試從 N 個實例,使用相同的 key 和隨機值獲取鎖。在步驟 2,當向 Redis 設置鎖時,客戶端應該設置一個網絡連接和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間為 10 秒,則超時時間應該在 5-50 毫秒之間。這樣可以避免服務器端 Redis 已經掛掉的情況下,客戶端還在死死地等待響應結果。如果服務器端沒有在規定時間內響應,客戶端應該盡快嘗試另外一個 Redis 實例。
  3. 客戶端使用當前時間減去開始獲取鎖時間(步驟 1 記錄的時間)就得到獲取鎖使用的時間。當且僅當從大多數(這里是 3 個節點)的 Redis 節點都取到鎖,並且使用的時間小於鎖失效時間時,鎖才算獲取成功。
  4. 如果取到了鎖,key 的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟 3 計算的結果)。
  5. 如果因為某些原因,獲取鎖失敗(沒有在至少 N/2+1 個Redis實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在所有的 Redis 實例上進行解鎖(即便某些 Redis 實例根本就沒有加鎖成功)。

到這,基本看出來,只要是大多數的 Redis 節點可以正常工作,就可以保證 Redlock 的正常工作。這樣就可以解決前面單點 Redis 的情況下我們討論的節點掛掉,由於異步通信,導致鎖失效的問題。

但是細想后, Redlock 還是存在如下問題:

假設一共有5個Redis節點:A, B, C, D, E。設想發生了如下的事件序列:

  1. 客戶端1 成功鎖住了 A, B, C,獲取鎖成功(但 D 和 E 沒有鎖住)。
  2. 節點 C 崩潰重啟了,但客戶端1在 C 上加的鎖沒有持久化下來,丟失了。
  3. 節點 C 重啟后,客戶端2 鎖住了 C, D, E,獲取鎖成功。
  4. 這樣,客戶端1 和 客戶端2 同時獲得了鎖(針對同一資源)。

哎,還是不能解決故障重啟后帶來的鎖的安全性問題...

針對節點重后引發的鎖失效問題,Redis 作者又提出了 延遲重啟 的概念,大致就是說,一個節點崩潰后,不要立刻重啟他,而是等到一定的時間后再重啟,等待的時間應該大於鎖的過期時間,采用這種方式,就可以保證這個節點在重啟前所參與的鎖都過期,聽上去感覺 延遲重啟 解決了這個問題...

但是,還是有個問題,節點重啟后,在等待的時間內,這個節點對外是不工作的。那么如果大多數節點都掛了,進入了等待,就會導致系統的不可用,因為系統在過期時間內任何鎖都無法加鎖成功...

巴拉巴拉那么多,關於 Redis 分布式鎖的缺點顯然進入了一個無解的步驟,包括后來的 神仙打架事件(Redis 作者 antirez 和 分布式領域專家 Martin Kleppmann)...

總之,首先我們要明確使用分布式鎖的目的是什么?

無外乎就是保證同一時間內只有一個客戶端可以對共享資源進行操作,也就是共享資源的原子性操作。

總之,在 Redis 分布式鎖的實現上還有很多問題等待解決,我們需要認識到這些問題並清楚如何正確實現一個 Redis 分布式鎖,然后在工作中合理的選擇和正確的使用分布式鎖。

目前我們項目中也有在用分布式鎖,也有用到 Redis 實現分布式鎖的場景,然后有的小伙伴就可能問,啊,你們就不怕出現上邊提到的那種問題嗎~

其實實現分布式鎖,從中間件上來選,也有 Zookeeper 可選,並且 Zookeeper 可靠性比 Redis 強太多,但是效率是低了點,如果並發量不是特別大,追求可靠性,那么肯定首選 Zookeeper。

如果是為了效率,就首選 Redis 實現。

好了,之后一起探索 Zookeeper 實現分布式鎖。

Redis分布式鎖:https://www.cnblogs.com/niceyoo/category/1615830.html

最近換工作了,一切都是重新開始,新的系統、新的人、新的業務… 未來路很長,大家一起加油。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM