Redis: 用redis實現分布式鎖,秒殺案例(轉)


  分布式鎖一般有三種實現方式:1. 數據庫樂觀鎖;2. 基於Redis的分布式鎖;3. 基於ZooKeeper的分布式鎖。本篇博客將介紹第二種方式,基於Redis實現分布式鎖。雖然網上已經有各種介紹Redis分布式鎖實現的博客,然而他們的實現卻有着各種各樣的問題,為了避免誤人子弟,本篇將介紹如何正確地實現Redis分布式鎖。 

  首先,為了確保分布式鎖可用,我們至少要確保鎖的實現同時滿足以下四個條件:

  1. 互斥性。在任意時刻,只有一個客戶端能持有鎖。

  2. 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。

  3. 具有容錯性。只要大部分的Redis節點正常運行,客戶端就可以加鎖和解鎖。

  4. 解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。 


 分布式鎖的簡單實現代碼:

package com.gdut.redis.lock.test1;

import java.util.Collections;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class DistributedLock {
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
    private static final Long RELEASE_SUCCESS = 1L;

    private static void validParam(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {
        if (null == jedisPool) {
            throw new IllegalArgumentException("jedisPool obj is null");
        }

        if (null == lockKey || "".equals(lockKey)) {
            throw new IllegalArgumentException("lock key  is blank");
        }

        if (null == requestId || "".equals(requestId)) {
            throw new IllegalArgumentException("requestId is blank");
        }

        if (expireTime < 0) {
            throw new IllegalArgumentException("expireTime is not allowed less zero");
        }
    }

    /**
     * 
     * @param jedis
     * @param lockKey
     * @param requestId
     * @param expireTime
     * @return
     */
    public boolean tryLock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {

        validParam(jedisPool, lockKey, requestId, expireTime);

        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }

        return false;
    }

    /**
     * 
     * @param jedis
     * @param lockKey
     * @param requestId
     * @param expireTime
     */
    public void lock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {

        validParam(jedisPool, lockKey, requestId, expireTime);

        while (true) {
            if (tryLock(jedisPool, lockKey, requestId, expireTime)) {
                System.out.println("lock  "+ Thread.currentThread().getName()+ " requestId:" + requestId);
                return;
            }
        }
    }

    /**
     * 
     * @param jedis
     * @param lockKey
     * @param requestId
     * @return
     */
    public boolean unLock(JedisPool jedisPool, String lockKey, String requestId) {

        validParam(jedisPool, lockKey, requestId, 1);

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            Object result = jedis.eval(script, Collections.singletonList(lockKey),
                    Collections.singletonList(requestId));

            if (RELEASE_SUCCESS.equals(result)) {
                System.out.println("unlock  "+ Thread.currentThread().getName()+ " requestId:" + requestId);
                return true;
            }

        } catch (Exception e) {
            throw e;
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }

        return false;

    }

}

 說明:String redis.clients.jedis.Jedis.set(String key, String value, String nxxx, String expx, int time)  方法參數說明

  • 其中前面兩個是key,value值;
  • nxxx為模式,這里我們設置為NX,意思是說如果key不存在則插入該key對應的value並返回OK,否者什么都不做返回null;
  • 參數expx這里我們設置為PX,意思是設置key的過期時間為time 毫秒

  通過tryLock方法嘗試獲取鎖,內部是具體調用Redis的set方法,多個線程同時調用tryLock時候會同時調用set方法,但是set方法本身是保證原子性的,對應同一個key來說,多個線程調用set方法時候只有一個線程返回OK,其它線程因為key已經存在會返回null,所以返回OK的線程就相當與獲取到了鎖,其它返回null的線程則相當於獲取鎖失敗。

  另外這里我們要保證value(requestId)值唯一是為了保證只有獲取到鎖的線程才能釋放鎖,這個下面釋放鎖時候會講解。

  通過lock 方法讓使用tryLock獲取鎖失敗的線程本地自旋轉重試獲取鎖,這類似JUC里面的CAS。

  Redis有一個叫做eval的函數,支持Lua腳本執行,並且能夠保證腳本執行的原子性,也就是在執行腳本期間,其它執行redis命令的線程都會被阻塞。這里解鎖時候使用下面腳本:

if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end 

  其中keys[1]為unLock方法傳遞的key,argv[1]為unLock方法傳遞的requestId;腳本redis.call(‘get’, KEYS[1])的作用是獲取key對應的value值,這里會返回通過Lock方法傳遞的requetId, 然后看當前傳遞的RequestId是否等於key對應的值,等於則說明當前要釋放鎖的線程就是獲取鎖的線程,則繼續執行redis.call(‘del’, KEYS[1])腳本,刪除key對應的值。


 測試剛才實現的分布式鎖

  例子中使用50個線程模擬秒殺一個商品,使用–運算符來實現商品減少,從結果有序性就可以看出是否為加鎖狀態。

  模擬秒殺服務,在其中配置了jedis線程池,在初始化的時候傳給分布式鎖,供其使用。

package com.gdut.redis.lock.test1;

import java.util.UUID;

import com.gdut.redis.lock.test1.DistributedLock;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class Service1 {
    private static JedisPool pool = null;
    private DistributedLock lock = new DistributedLock();
    
    static {
        JedisPoolConfig config = new JedisPoolConfig();
        // 設置最大連接數
        config.setMaxTotal(500);
        // 設置最大空閑數
        config.setMaxIdle(100);
        // 設置最大等待時間
        config.setMaxWaitMillis(1000 * 100);
        // 在borrow一個jedis實例時,是否需要驗證,若為true,則所有jedis實例均是可用的
        config.setTestOnBorrow(true);
        pool = new JedisPool(config, "127.0.0.1", 6379, 300000);
    }
    
    public void seckill() throws InterruptedException {
        String requestId = UUID.randomUUID().toString();
        lock.lock(pool, "resource", requestId, 3000);
        lock.unLock(pool, "resource", requestId);
    }
}

模擬線程進行秒殺服務:

package com.gdut.redis.lock.test1;

import com.gdut.redis.lock.test1.Service1;

public class TaskThread extends Thread {
    private Service1 service;
    
    public TaskThread(Service1 service) {
        this.service = service;
    }
    
    @Override
    public void run() {
        try {    
            synchronized (this) {
                service.seckill();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        Service1 service = new Service1();
        for (int i = 0; i < 400; i++) {
            TaskThread thread = new TaskThread(service);
            thread.start();
        }
    }

}

console結果:

一共800行輸出,lock 和unlock的輸出都是400行,表示400個線程都獲得了鎖和釋放了鎖


 總結:

  本文使用redis單實例結合redis的set方法和eval函數實現了一個簡單的分布式鎖,但是這個實現還是明顯有問題的。雖然使用set方法設置了超時時間,以避免線程獲取到鎖后redis掛了后鎖沒有被釋放的情況,但是超時時間設置為多少合適那?如果設置太小,可能會存在線程獲取鎖后執行業務邏輯時間大於鎖超時時間,那么就會存在邏輯還沒執行完,鎖已經因為超時自動釋放了,而其他線程可能獲取到鎖,那么之前獲取鎖的線程的業務邏輯的執行就沒有保證原子性。

  另外還有一個問題是Lock方法里面是自旋調用tryLock進行重試,這就會導致像JUC中的AtomicLong一樣,在高並發下多個線程競爭同一個資源時候造成大量線程占用cpu進行重試操作。這時候其實可以隨機生成一個等待時間,等時間到后在進行重試,以減少潛在的同時對一個資源進行競爭的並發量。

 

資料:http://ifeve.com/redis-distributedlock/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM