背景:我們系統有一個下傳單據接口由於上游推送重復單據[產生異步任務],消費任務的時候是多線程並發執行,導致我們的數據庫有很多重復的臟數據,數據庫由於業務原因無法加唯一性索引。
解決方案:使用redis的setnx命令實現分布式鎖。
原理:setnx---> 這種加鎖的思路是,如果 key 不存在,將 key 設置為 value,返回true。如果 key 已存在,則 SETNX 不做任何動作,返回false。
鎖的實現
- 鎖的key為目標數據的唯一鍵,value為鎖的期望超時時間點;
-
首先進行一次setnx命令,嘗試獲取鎖,如果獲取成功,則設置鎖的最終超時時間(以防在當前進程獲取鎖后奔潰導致鎖無法釋放);如果獲取鎖失敗,則檢查當前的鎖是否超時,如果發現沒有超時,則獲取鎖失敗;如果發現鎖已經超時(即鎖的超時時間小於等於當前時間),則再次嘗試獲取鎖,取到后判斷下當前的超時時間和之前的超時時間是否相等,如果相等則說明當前的客戶端是排隊等待的線程里的第一個嘗試獲取鎖的,讓它獲取成功即可。
1 package com.atlas.sys.service; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.data.redis.core.RedisTemplate; 6 import org.springframework.stereotype.Component; 7 8 import javax.annotation.Resource; 9 import java.time.Instant; 10 import java.util.concurrent.TimeUnit; 11 12 /** 13 * @program: atlas-parent 14 * @description: 分布式鎖 15 * @author: siming.wang 16 * @create: 2018-12-14 14:15 17 **/ 18 @Component 19 public class RedisDistributionLock { 20 21 private static final Logger logger = LoggerFactory.getLogger(RedisDistributionLock.class); 22 23 //key的TTL,一天 24 private static final int finalDefaultTTLwithKey = 100; 25 26 //鎖默認超時時間,20秒 27 private static final long defaultExpireTime = 10 * 1000; 28 29 private static final boolean Success = true; 30 31 @Resource( name = "redisTemplate") 32 private RedisTemplate<String, String> redisTemplateForGeneralize; 33 34 /** 35 * 加鎖,鎖默認超時時間20秒 36 * @param resource 37 * @return 38 */ 39 public boolean lock(String resource) { 40 return this.lock(resource, defaultExpireTime); 41 } 42 43 /** 44 * 加鎖,同時設置鎖超時時間 45 * @param key 分布式鎖的key 46 * @param expireTime 單位是ms 47 * @return 48 */ 49 public boolean lock(String key, long expireTime) { 50 51 logger.debug("redis lock debug, start. key:[{}], expireTime:[{}]",key,expireTime); 52 long now = Instant.now().toEpochMilli(); 53 long lockExpireTime = now + expireTime; 54 55 //setnx 56 boolean executeResult = redisTemplateForGeneralize.opsForValue().setIfAbsent(key,String.valueOf(lockExpireTime)); 57 logger.debug("redis lock debug, setnx. key:[{}], expireTime:[{}], executeResult:[{}]", key, expireTime,executeResult); 58 59 //取鎖成功,為key設置expire 60 if (executeResult == Success) { 61 redisTemplateForGeneralize.expire(key,finalDefaultTTLwithKey, TimeUnit.SECONDS); 62 return true; 63 } 64 //沒有取到鎖,繼續流程 65 else { 66 Object valueFromRedis = this.getKeyWithRetry(key, 3); 67 // 避免獲取鎖失敗,同時對方釋放鎖后,造成NPE 68 if (valueFromRedis != null) { 69 //已存在的鎖超時時間 70 long oldExpireTime = Long.parseLong((String) valueFromRedis); 71 logger.debug("redis lock debug, key already seted. key:[{}], oldExpireTime:[{}]", key, oldExpireTime); 72 //鎖過期時間小於當前時間,鎖已經超時,重新取鎖 73 if (oldExpireTime <= now) { 74 logger.debug("redis lock debug, lock time expired. key:[{}], oldExpireTime:[{}], now:[{}]", key, oldExpireTime, now); 75 String valueFromRedis2 = redisTemplateForGeneralize.opsForValue().getAndSet(key, String.valueOf(lockExpireTime)); 76 long currentExpireTime = Long.parseLong(valueFromRedis2); 77 //判斷currentExpireTime與oldExpireTime是否相等 78 if (currentExpireTime == oldExpireTime) { 79 //相等,則取鎖成功 80 logger.debug("redis lock debug, getSet. key:[{}], currentExpireTime:[{}], oldExpireTime:[{}], lockExpireTime:[{}]", key, currentExpireTime, oldExpireTime, lockExpireTime); 81 redisTemplateForGeneralize.expire(key, finalDefaultTTLwithKey, TimeUnit.SECONDS); 82 return true; 83 } else { 84 //不相等,取鎖失敗 85 return false; 86 } 87 } 88 } else { 89 logger.warn("redis lock,lock have been release. key:[{}]", key); 90 return false; 91 } 92 } 93 return false; 94 } 95 96 private Object getKeyWithRetry(String key, int retryTimes) { 97 int failTime = 0; 98 while (failTime < retryTimes) { 99 try { 100 return redisTemplateForGeneralize.opsForValue().get(key); 101 } catch (Exception e) { 102 failTime++; 103 if (failTime >= retryTimes) { 104 throw e; 105 } 106 } 107 } 108 return null; 109 } 110 111 /** 112 * 解鎖 113 * @param key 114 * @return 115 */ 116 public boolean unlock(String key) { 117 logger.debug("redis unlock debug, start. resource:[{}].",key); 118 redisTemplateForGeneralize.delete(key); 119 return Success; 120 } 121 }
到這里就可以完成分布式鎖工具的搭建。
如果需要更優雅的實現方式,可以考慮用aop:
1 package com.atlas.sys.annoation; 2 3 import java.lang.annotation.ElementType; 4 import java.lang.annotation.Retention; 5 import java.lang.annotation.RetentionPolicy; 6 import java.lang.annotation.Target; 7 8 /** 9 * @program: atlas-parent 10 * @description: 11 * @author: siming.wang 12 * @create: 2018-12-14 14:21 13 **/ 14 @Retention(RetentionPolicy.RUNTIME) 15 @Target(ElementType.METHOD) 16 public @interface RedisLockAnnoation { 17 18 String keyPrefix() default ""; 19 20 /** 21 * 要鎖定的key中包含的屬性 22 */ 23 String[] keys() default {}; 24 25 /** 26 * 是否阻塞鎖; 27 * 1. true:獲取不到鎖,阻塞一定時間; 28 * 2. false:獲取不到鎖,立即返回 29 */ 30 boolean isSpin() default false; 31 32 /** 33 * 超時時間 34 */ 35 int expireTime() default 10000; 36 37 /** 38 * 等待時間 39 */ 40 int waitTime() default 50; 41 42 /** 43 * 獲取不到鎖的等待時間 44 */ 45 int retryTimes() default 20; 46 }
以及:
1 package com.atlas.sys.listener; 2 3 import com.atlas.model.download.DownloadReturnOther; 4 import com.atlas.sys.annoation.RedisLockAnnoation; 5 import com.atlas.sys.service.RedisDistributionLock; 6 import org.apache.commons.lang3.StringUtils; 7 import org.apache.commons.lang3.reflect.MethodUtils; 8 import org.aspectj.lang.ProceedingJoinPoint; 9 import org.aspectj.lang.annotation.Around; 10 import org.aspectj.lang.annotation.Aspect; 11 import org.aspectj.lang.reflect.MethodSignature; 12 import org.slf4j.Logger; 13 import org.slf4j.LoggerFactory; 14 import org.springframework.stereotype.Component; 15 16 import javax.annotation.Resource; 17 import java.lang.reflect.Method; 18 import java.lang.reflect.Parameter; 19 20 /** 21 * @program: atlas-parent 22 * @description: 23 * @author: siming.wang 24 * @create: 2018-12-14 14:22 25 **/ 26 @Component 27 @Aspect 28 public class RedisLockAdvice { 29 30 private static final Logger logger = LoggerFactory.getLogger(RedisLockAdvice.class); 31 32 @Resource 33 private RedisDistributionLock redisDistributionLock; 34 35 @Around("@annotation(redisLockAnnoation)") 36 // @Around("execution(* com.atlas.ibd.service.IbdAsnReceiptReturnService.insertIbdAsnReceipt())") 37 public Object processAround(ProceedingJoinPoint pjp,RedisLockAnnoation redisLockAnnoation) throws Throwable { 38 //獲取方法上的注解對象 39 String methodName = pjp.getSignature().getName(); 40 Class<?> classTarget = pjp.getTarget().getClass(); 41 Class<?>[] par = ((MethodSignature) pjp.getSignature()).getParameterTypes(); 42 Method objMethod = classTarget.getMethod(methodName, par); 43 redisLockAnnoation = objMethod.getDeclaredAnnotation(RedisLockAnnoation.class); 44 45 //拼裝分布式鎖的key 46 String[] keys = redisLockAnnoation.keys(); 47 Object[] args = pjp.getArgs(); 48 Object arg = args[0]; 49 StringBuilder temp = new StringBuilder(); 50 temp.append(redisLockAnnoation.keyPrefix()); 51 for (String key : keys) { 52 String getMethod = "get" + StringUtils.capitalize(key); 53 temp.append(MethodUtils.invokeExactMethod(arg, getMethod)).append("_"); 54 } 55 String redisKey = StringUtils.removeEnd(temp.toString(), "_"); 56 57 //執行分布式鎖的邏輯 58 if (redisLockAnnoation.isSpin()) { 59 //阻塞鎖 60 int lockRetryTime = 0; 61 try { 62 while (!redisDistributionLock.lock(redisKey, redisLockAnnoation.expireTime())) { 63 if (lockRetryTime++ > redisLockAnnoation.retryTimes()) { 64 logger.error("lock exception. key:{}, lockRetryTime:{}", redisKey, lockRetryTime); 65 throw new RuntimeException("lock exception. key:{"+redisKey+"}, lockRetryTime:{"+lockRetryTime+"}"); 66 } 67 } 68 return pjp.proceed(); 69 } finally { 70 redisDistributionLock.unlock(redisKey); 71 } 72 } else { 73 //非阻塞鎖 74 try { 75 if (!redisDistributionLock.lock(redisKey)) { 76 logger.error("lock exception. key:{}", redisKey); 77 throw new RuntimeException("lock exception. key:{"+redisKey+"}"); 78 } 79 return pjp.proceed(); 80 } finally { 81 // Thread.sleep(2000); 82 // redisDistributionLock.unlock(redisKey); 83 } 84 } 85 } 86 }
這樣在你需要用鎖的方法上使用這個注解,並根據你的參數值設置需要鎖定的key。
后記(自己項目所踩的坑):
由於需要在注解上拿到鎖key的值,所以我把注解放在了可以拿到鎖值的service層,controller拿不到鎖《或者拿鎖太繁雜》,這樣這個切面的環繞就是我的service,但是commit操作又是在controller層,所以把解鎖的代碼從aop拿掉了,直接在controller層commit之后再去unlock鎖

由於沒有實現按照請求來區分鎖是哪個線程加的,導致其他線程也可以解鎖,所以這里在catch異常處沒有unlock鎖,這樣會導致死鎖。
有一個方法(low):判斷exception的類型,是業務錯誤則需要把鎖給unlock,不是業務錯誤則說明是未獲取到鎖,則不允許釋放鎖。
還有另一種最優雅的實現方式是:解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。setnx第二個為value,我們傳的是requestId,很多童鞋可能不明白,有key作為鎖不就夠了嗎,為什么還要用到value?原因就是我們在上面講到可靠性時,分布式鎖要滿足第四個條件解鈴還須系鈴人,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據。requestId可以使用UUID.randomUUID().toString()方法生成。
寫在最后:最后還是強烈推薦大家使用github上7k多星星的開源好項目:redisson https://github.com/redisson/redisson 關於redis相關的他都幫你實現了😄
