import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisTemplate; import java.util.Random; import java.util.concurrent.TimeUnit; public class RedisLock implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(RedisLock.class); public static final String REDIS_LOCK = "RedisLock:"; private static final long DEFAULT_WAIT_LOCK_TIME_OUT = 60;//60s 有慢sql,超時時間設置長一點 private static final long DEFAULT_EXPIRE = 80;//80s 有慢sql,超時時間設置長一點 private String key; private RedisTemplate redisTemplate; public RedisLock(RedisTemplate redisTemplate,String key) { this.redisTemplate = redisTemplate; this.key = key; } /** * 等待鎖的時間,單位為s * * @param key * @param timeout s * @param seconds */ public boolean lock(String key, long timeout, TimeUnit seconds) { String lockKey = generateLockKey(key); long nanoWaitForLock = seconds.toNanos(timeout); long start = System.nanoTime(); try { while ((System.nanoTime() - start) < nanoWaitForLock) { if (redisTemplate.getConnectionFactory().getConnection().setNX(lockKey.getBytes(), new byte[0])) { redisTemplate.expire(lockKey, DEFAULT_EXPIRE, TimeUnit.SECONDS);//暫設置為80s過期,防止異常中斷鎖未釋放 if (LOGGER.isDebugEnabled()) { LOGGER.debug("add RedisLock[{}].{}", key, Thread.currentThread()); } return true; } TimeUnit.MILLISECONDS.sleep(1000 + new Random().nextInt(100));//加隨機時間防止活鎖 } } catch (Exception e) { LOGGER.error("{}", e.getMessage(), e); unlock(); } return false; } public void unlock() { try { String lockKey = generateLockKey(key); RedisConnection connection = redisTemplate.getConnectionFactory().getConnection(); connection.del(lockKey.getBytes()); connection.del(key.getBytes()); connection.close(); } catch (Exception e) { LOGGER.error("{}", e.getMessage(), e); } } private String generateLockKey(String key) { return String.format(REDIS_LOCK + "%s", key); } public boolean lock() { return lock(key, DEFAULT_WAIT_LOCK_TIME_OUT, TimeUnit.SECONDS); } @Override public void close(){ try { String lockKey = generateLockKey(key); if (LOGGER.isDebugEnabled()) { LOGGER.debug("release RedisLock[" + lockKey + "]."); } RedisConnection connection = redisTemplate.getConnectionFactory().getConnection(); connection.del(lockKey.getBytes()); connection.close(); } catch (Exception e) { LOGGER.error("{}", e.getMessage(), e); } } }
在高並發的使用場景下,如何讓redis里的數據盡量保持一致,可以采用分布式鎖。以分布式鎖的方式來保證對臨界資源的互斥讀寫。
redis使用緩存作為分布式鎖,性能非常強勁,在一些不錯的硬件上,redis可以每秒執行10w次,內網延遲不超過1ms,足夠滿足絕大部分應用的鎖定需求。
redis常用的分布式鎖的實現方式:
一、setbit / getbit
用索引號為0的第一個比特位來表示鎖定狀態,其中:0表示未獲得鎖,1表示已獲得鎖。
優勢:簡單;
劣勢:競態條件(race condition),死鎖。
獲得鎖的過程至少需要兩步:先getbit判斷,后setbit上鎖。由於不是原子操作,因此可能存在競態條件;如果一個客戶端使用setbit獲取到鎖,然后沒來得及釋放crash掉了,那么其他在等待的客戶端將永遠無法獲得該鎖,進而形成了死鎖。所以這種形式不太適合實現分布式鎖。
二、setnx / del / getset
redis官網有一篇文章專門談論了實現分布式鎖的話題。基本的原則是:采用setnx嘗試獲取鎖並判斷是否獲得了鎖,setnx設置的值是它想占用鎖的時間(預估):
- 如返回1,則該客戶端獲得鎖,把lock.foo的鍵值設置為時間值表示該鍵已被鎖定,該客戶端最后可以通過DEL lock.foo來釋放該鎖。
- 如返回0,表明該鎖已被其他客戶端取得,這時我們可以先返回或進行重試等對方完成或等待鎖超時。
通過del刪除key來釋放鎖。某個想獲得鎖的客戶端,先采用setnx嘗試獲取鎖,如果獲取失敗了,那么會通過get命令來獲得鎖的過期時間以判斷該鎖的占用是否過期。如果跟當前時間對比,發現過期,那么先執行del,然后執行setnx獲取鎖。如果整個流程就這樣,可能會產生死鎖,請參考下面的執行序列:
所以,在高並發的場景下,如果檢測到鎖過期,不能簡單地進行del並嘗試通過setnx獲得鎖。我們可以通過getset命令來避免這個問題。來看看,如果存在一個用戶user4,它通過調用getset命令如何避免這種情況的發生:
getset設置的過期時間跟上面的setnx設置的相同:
如果該命令返回的結果跟上一步通過get獲得的過期時間一致,則說明這兩步之間,沒有新的客戶端搶占了鎖,則該客戶端即獲得鎖。如果該命令返回的結果跟上一步通過get獲得的過期時間不一致,則該鎖可能已被其他客戶端搶先獲得,則本次獲取鎖失敗。
這種實現方式得益於getset命令的原子性,從而有效得避免了競態條件。並且,通過將比對鎖的過期時間作為獲取鎖邏輯的一部分,從而避免了死鎖。
三、setnx / del / expire
這是使用最多的實現方式:setnx的目的同上,用來實現嘗試獲取鎖以及判斷是否獲取到鎖的原子性,del刪除key來釋放鎖,與上面不同的是,使用redis自帶的expire命令來防止死鎖(可能出現某個客戶端獲得了鎖,但是crash了,永不釋放導致死鎖)。這算是一種比較簡單但粗暴的實現方式:因為,不管實際的情況如何,當你設置expire之后,它一定會在那個時間點刪除key。如何當時某個客戶端已獲得了鎖,正在執行臨界區內的代碼,但執行時間超過了expire的時間,將會導致另一個正在競爭該鎖的客戶端也獲得了該鎖,這個問題下面還會談到。
我們來看一下宿舍鎖的簡單實現很簡單:
通過一個while(true),在當前線程上進行阻塞等待,並通過一個計數器進行自減操作,防止永久等待。
http://www.cnblogs.com/moonandstar08/p/5682822.html
多節點的部署中,對鎖的控制,參考:
http://www.jeffkit.info/2011/07/1000/
直接貼上代碼實現,同上一篇文章一樣,都是基於AOP
定義注解,標志切入點:
package com.ns.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface RedisLock { /** * redis的key * @return */ String value(); /** * 持鎖時間,單位毫秒,默認一分鍾 */ long keepMills() default 60000; /** * 當獲取失敗時候動作 */ LockFailAction action() default LockFailAction.GIVEUP; public enum LockFailAction{ /** * 放棄 */ GIVEUP, /** * 繼續 */ CONTINUE; } /** * 睡眠時間,設置GIVEUP忽略此項 * @return */ long sleepMills() default 1000; }
切面實現:
package com.redis.aop; import java.lang.reflect.Method; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.ns.annotation.RedisLock; import com.ns.annotation.RedisLock.LockFailAction; import com.ns.redis.dao.base.BaseRedisDao; @Aspect public class RedisLockAspect extends BaseRedisDao<String, Long>{ private static final Logger log = LoggerFactory.getLogger(RedisLockAspect.class); //execution(* com.ns..*(*,..)) and @within(com.ns.annotation.RedisLock) @Pointcut("execution(* com.ns..*(..)) && @annotation(com.ns.annotation.RedisLock)") private void lockPoint(){} @Around("lockPoint()") public Object arround(ProceedingJoinPoint pjp) throws Throwable{ MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); Method method = methodSignature.getMethod(); RedisLock lockInfo = method.getAnnotation(RedisLock.class); boolean lock = false; Object obj = null; while(!lock){ long timestamp = System.currentTimeMillis()+lockInfo.keepMills(); lock = setNX(lockInfo.value(), timestamp); //得到鎖,已過期並且成功設置后舊的時間戳依然是過期的,可以認為獲取到了鎖(成功設置防止鎖競爭) long now = System.currentTimeMillis(); if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){ //得到鎖,執行方法,釋放鎖 log.info("得到鎖..."); obj = pjp.proceed(); //不加這一行,對於只能執行一次的定時任務,時間差上不能保證另一個一定正好放棄 if(lockInfo.action().equals(LockFailAction.CONTINUE)){ delete(lockInfo.value()); } }else{ if(lockInfo.action().equals(LockFailAction.CONTINUE)){ log.info("稍后重新請求鎖..."); Thread.currentThread().sleep(lockInfo.sleepMills()); }else{ log.info("放棄鎖..."); break; } } } return obj; } public boolean setNX(String key,Long value){ return valueOperations.setIfAbsent(key, value); } public long getLock(String key){ return valueOperations.get(key); } public Long getSet(String key,Long value){ return valueOperations.getAndSet(key, value); } public void releaseLock(String key){ delete(key); } }
Python的一個實現
LOCK_TIMEOUT = 3 lock = 0 lock_timeout = 0 lock_key = 'lock.foo' # 獲取鎖 while lock != 1: now = int(time.time()) lock_timeout = now + LOCK_TIMEOUT + 1 lock = redis_client.setnx(lock_key, lock_timeout) if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)): break else: time.sleep(0.001) # 已獲得鎖 do_job() # 釋放鎖 now = int(time.time()) if now < lock_timeout: redis_client.delete(lock_key)
http://blog.csdn.net/lihao21/article/details/49104695
以上有些代碼只符合我現在的項目場景,根據實際需要進行調整
http://www.tuicool.com/articles/EzaM7by