一個Redis實現的分布式鎖


 

 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class RedisLock implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisLock.class);
    public static final String REDIS_LOCK = "RedisLock:";


    private static final long DEFAULT_WAIT_LOCK_TIME_OUT = 60;//60s 有慢sql,超時時間設置長一點
    private static final long DEFAULT_EXPIRE = 80;//80s 有慢sql,超時時間設置長一點
    private String key;
    private RedisTemplate redisTemplate;

    public RedisLock(RedisTemplate redisTemplate,String key) {
        this.redisTemplate = redisTemplate;
        this.key = key;
    }

    /**
     * 等待鎖的時間,單位為s
     *
     * @param key
     * @param timeout s
     * @param seconds
     */
    public boolean lock(String key, long timeout, TimeUnit seconds) {
        String lockKey = generateLockKey(key);
        long nanoWaitForLock = seconds.toNanos(timeout);
        long start = System.nanoTime();

        try {
            while ((System.nanoTime() - start) < nanoWaitForLock) {
                if (redisTemplate.getConnectionFactory().getConnection().setNX(lockKey.getBytes(), new byte[0])) {
                    redisTemplate.expire(lockKey, DEFAULT_EXPIRE, TimeUnit.SECONDS);//暫設置為80s過期,防止異常中斷鎖未釋放
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("add RedisLock[{}].{}", key, Thread.currentThread());
                    }
                    return true;
                }
                TimeUnit.MILLISECONDS.sleep(1000 + new Random().nextInt(100));//加隨機時間防止活鎖
            }
        } catch (Exception e) {
            LOGGER.error("{}", e.getMessage(), e);
            unlock();
        }
        return false;
    }

    public void unlock() {
        try {
            String lockKey = generateLockKey(key);
            RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
            connection.del(lockKey.getBytes());
            connection.del(key.getBytes());
            connection.close();
        } catch (Exception e) {
            LOGGER.error("{}", e.getMessage(), e);
        }
    }

    private String generateLockKey(String key) {
        return String.format(REDIS_LOCK + "%s", key);
    }

    public boolean lock() {
        return lock(key, DEFAULT_WAIT_LOCK_TIME_OUT, TimeUnit.SECONDS);
    }

    @Override
    public void close(){
        try {
            String lockKey = generateLockKey(key);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("release RedisLock[" + lockKey + "].");
            }
            RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
            connection.del(lockKey.getBytes());
            connection.close();
        } catch (Exception e) {
            LOGGER.error("{}", e.getMessage(), e);
        }
    }
}

 

 

在高並發的使用場景下,如何讓redis里的數據盡量保持一致,可以采用分布式鎖。以分布式鎖的方式來保證對臨界資源的互斥讀寫。

   redis使用緩存作為分布式鎖,性能非常強勁,在一些不錯的硬件上,redis可以每秒執行10w次,內網延遲不超過1ms,足夠滿足絕大部分應用的鎖定需求。

   redis常用的分布式鎖的實現方式:

一、setbit / getbit

   用索引號為0的第一個比特位來表示鎖定狀態,其中:0表示未獲得鎖,1表示已獲得鎖。

   優勢:簡單;

   劣勢:競態條件(race condition),死鎖。

   獲得鎖的過程至少需要兩步:先getbit判斷,后setbit上鎖。由於不是原子操作,因此可能存在競態條件;如果一個客戶端使用setbit獲取到鎖,然后沒來得及釋放crash掉了,那么其他在等待的客戶端將永遠無法獲得該鎖,進而形成了死鎖。所以這種形式不太適合實現分布式鎖。

二、setnx / del / getset

  redis官網有一篇文章專門談論了實現分布式鎖的話題。基本的原則是:采用setnx嘗試獲取鎖並判斷是否獲得了鎖,setnx設置的值是它想占用鎖的時間(預估):

  • 如返回1,則該客戶端獲得鎖,把lock.foo的鍵值設置為時間值表示該鍵已被鎖定,該客戶端最后可以通過DEL lock.foo來釋放該鎖。
  • 如返回0,表明該鎖已被其他客戶端取得,這時我們可以先返回或進行重試等對方完成或等待鎖超時。

  通過del刪除key來釋放鎖。某個想獲得鎖的客戶端,先采用setnx嘗試獲取鎖,如果獲取失敗了,那么會通過get命令來獲得鎖的過期時間以判斷該鎖的占用是否過期。如果跟當前時間對比,發現過期,那么先執行del,然后執行setnx獲取鎖。如果整個流程就這樣,可能會產生死鎖,請參考下面的執行序列:

   所以,在高並發的場景下,如果檢測到鎖過期,不能簡單地進行del並嘗試通過setnx獲得鎖。我們可以通過getset命令來避免這個問題。來看看,如果存在一個用戶user4,它通過調用getset命令如何避免這種情況的發生:

 getset設置的過期時間跟上面的setnx設置的相同:

   如果該命令返回的結果跟上一步通過get獲得的過期時間一致,則說明這兩步之間,沒有新的客戶端搶占了鎖,則該客戶端即獲得鎖。如果該命令返回的結果跟上一步通過get獲得的過期時間不一致,則該鎖可能已被其他客戶端搶先獲得,則本次獲取鎖失敗。

   這種實現方式得益於getset命令的原子性,從而有效得避免了競態條件。並且,通過將比對鎖的過期時間作為獲取鎖邏輯的一部分,從而避免了死鎖。

三、setnx / del / expire

   這是使用最多的實現方式:setnx的目的同上,用來實現嘗試獲取鎖以及判斷是否獲取到鎖的原子性,del刪除key來釋放鎖,與上面不同的是,使用redis自帶的expire命令來防止死鎖(可能出現某個客戶端獲得了鎖,但是crash了,永不釋放導致死鎖)。這算是一種比較簡單但粗暴的實現方式:因為,不管實際的情況如何,當你設置expire之后,它一定會在那個時間點刪除key。如何當時某個客戶端已獲得了鎖,正在執行臨界區內的代碼,但執行時間超過了expire的時間,將會導致另一個正在競爭該鎖的客戶端也獲得了該鎖,這個問題下面還會談到。

  我們來看一下宿舍鎖的簡單實現很簡單:

通過一個while(true),在當前線程上進行阻塞等待,並通過一個計數器進行自減操作,防止永久等待。 

http://www.cnblogs.com/moonandstar08/p/5682822.html

 

多節點的部署中,對鎖的控制,參考:

http://www.jeffkit.info/2011/07/1000/

直接貼上代碼實現,同上一篇文章一樣,都是基於AOP

定義注解,標志切入點:

package com.ns.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {
    /**
     * redis的key
     * @return
     */
    String value();
    /**
     * 持鎖時間,單位毫秒,默認一分鍾
     */
    long keepMills() default 60000;
    /**
     * 當獲取失敗時候動作
     */
    LockFailAction action() default LockFailAction.GIVEUP;
    
    public enum LockFailAction{
        /**
         * 放棄
         */
        GIVEUP,
        /**
         * 繼續
         */
        CONTINUE;
    }
    /**
     * 睡眠時間,設置GIVEUP忽略此項
     * @return
     */
    long sleepMills() default 1000;
}

 

切面實現:

package com.redis.aop;
import java.lang.reflect.Method;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.ns.annotation.RedisLock;
import com.ns.annotation.RedisLock.LockFailAction;
import com.ns.redis.dao.base.BaseRedisDao;
@Aspect
public class RedisLockAspect extends BaseRedisDao<String, Long>{
  private static final Logger log = LoggerFactory.getLogger(RedisLockAspect.class);
  //execution(* com.ns..*(*,..)) and @within(com.ns.annotation.RedisLock)
  
  @Pointcut("execution(* com.ns..*(..)) && @annotation(com.ns.annotation.RedisLock)")
  private void lockPoint(){}
  @Around("lockPoint()")
  public Object arround(ProceedingJoinPoint pjp) throws Throwable{
    MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
    Method method = methodSignature.getMethod();
    RedisLock lockInfo = method.getAnnotation(RedisLock.class);
    boolean lock = false;
    Object obj = null;
    while(!lock){
      long timestamp = System.currentTimeMillis()+lockInfo.keepMills();
      lock = setNX(lockInfo.value(), timestamp);
      //得到鎖,已過期並且成功設置后舊的時間戳依然是過期的,可以認為獲取到了鎖(成功設置防止鎖競爭)
      long now = System.currentTimeMillis();
      if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){
        //得到鎖,執行方法,釋放鎖
        log.info("得到鎖...");
        obj = pjp.proceed();
        //不加這一行,對於只能執行一次的定時任務,時間差上不能保證另一個一定正好放棄
        if(lockInfo.action().equals(LockFailAction.CONTINUE)){
          delete(lockInfo.value());
        }
      }else{
        if(lockInfo.action().equals(LockFailAction.CONTINUE)){
          log.info("稍后重新請求鎖...");
          Thread.currentThread().sleep(lockInfo.sleepMills());
        }else{
          log.info("放棄鎖...");
          break;
        }
      }
    }
    return obj;
  }
  public boolean setNX(String key,Long value){
    return valueOperations.setIfAbsent(key, value);
  }
  public long getLock(String key){
    return valueOperations.get(key);
  }
  public Long getSet(String key,Long value){
    return valueOperations.getAndSet(key, value);
  }
  public void releaseLock(String key){
    delete(key);
  }
}

Python的一個實現

LOCK_TIMEOUT = 3
lock = 0
lock_timeout = 0
lock_key = 'lock.foo'

# 獲取鎖
while lock != 1:
    now = int(time.time())
    lock_timeout = now + LOCK_TIMEOUT + 1
    lock = redis_client.setnx(lock_key, lock_timeout)
    if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)):
        break
    else:
        time.sleep(0.001)

# 已獲得鎖
do_job()

# 釋放鎖
now = int(time.time())
if now < lock_timeout:
    redis_client.delete(lock_key)

http://blog.csdn.net/lihao21/article/details/49104695

以上有些代碼只符合我現在的項目場景,根據實際需要進行調整

http://www.tuicool.com/articles/EzaM7by

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM