原文鏈接:https://blog.csdn.net/sinat_25295611/article/details/80420086
https://www.cnblogs.com/yangzhilong/p/7605807.html
業務場景:在電商項目中,往往會有這樣的一個功能設計,當用戶下單后一段時間沒有付款,系統就會在超時后關閉該訂單。
通常我們會做一個定時任務每分鍾來檢查前半小時的訂單,將沒有付款的訂單列表查詢出來,然后對訂單中的商品進行庫存的恢復,然后將該訂單設置為無效。
比如我們這里使用Spring Schedule
的方式做一個定時任務:
注:打開Spring Schedule 的自動注解掃描,在Spring配置中添加
<task:annotation-driven/>
@Component @Slf4j public class CloseOrderTask { @Autowired private IOrderService iOrderService; @Scheduled(cron = "0 */1 * * * ? ") public void closeOrderTaskV1() { log.info("定時任務啟動"); //執行關閉訂單的操作 iOrderService.closeOrder(); log.info("定時任務結束"); } }
在單服務器下這樣執行並沒有問題,但是隨着業務量的增多,勢必會演進成集群模式,在同一時刻有多個服務執行一個定時任務就會帶來問題,首先是服務器資源的浪費,同時會帶來業務邏輯的混亂,如果定時任務是做的數據庫操作將會帶來很大的風險。
Redis分布式鎖
下面分析一下分布式情況下定時任務的解決方案
通常使用Redis作為分布式鎖來解決這類問題,Redis分布式鎖流程如下:
Redis分布式鎖v1版本:
//注意:以下為了測試方便,定時時間都設置為10s @Scheduled(cron = "0/10 * * * * ? ") public void closeOrderTaskV1() { log.info("定時任務啟動"); long lockTime = 5000;//5秒 Long lockKeyResult = RedisShardedPoolUtil.setnx(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTime)); //如果獲得了分布式鎖,執行關單業務 if (lockKeyResult != null && lockKeyResult.intValue() == 1) { closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK); }else { log.info("沒有獲得分布式鎖"); } log.info("定時任務結束================================"); } //關閉訂單,並釋放鎖 private void closeOrder(String lockName) { RedisShardedPoolUtil.expire(lockName,50); //鎖住50秒 log.info("線程{} 獲取鎖 {}",Thread.currentThread().getName(),lockName); //模擬執行關單操作 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } //主動關閉鎖 RedisShardedPoolUtil.del(lockName); log.info("線程{} 釋放鎖 {}",Thread.currentThread().getName(),lockName); }
(由於我電腦配置比較低,開2個IDEA進程調試會比較卡,所以一個項目在IDEA調試,另外一個打成war放在tomcat運行,打包命令mvn clean package -Dmaven.test.skip=true -Pdev
)
tomcat1調試日志
tomcat2日志
此時分布式鎖已經生效,在集群環境下不會同時出現2個任務同時執行的情況,但是這時又引出了另外一個問題,
我們的邏輯是先setnx
獲取分布式鎖(此時該鎖沒有設置過期時間,即不會過期),然后expire
設置過期鎖過期時間,如果在獲取鎖和設置過期時間之間,服務器(tomcat)掛了就會出現鎖永遠都不會過期的情況!
- 在正常關閉tomcat的情況下(shutdown),我們可以通過@PreDestory執行刪除鎖邏輯,如下
@PreDestroy public void delCloseLock(){ RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK); log.info("Tomcat shut down 釋放鎖 {}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK); }
-
在tomcat被kill或意外終止時,以上方法並不管用
Redis分布式鎖v2版本 :
我們將
setnx
未獲取到鎖的情況進行重新設計,為的是防止v1
版本死鎖的產生,當第一次未獲取到鎖時,取出lockKey
中存放的過期時間,與當前時間進行對比,若已超時則通過getset
操作重置獲取鎖並更新過期時間,若第一次取出時未達到過期時間,說明還在上次任務執行的有效時間范圍內,可能就需要等這一段時間,通常過期時間設置為2~5秒,不會太長。
以上則是在超時的基礎上防止死鎖的產生,以下為代碼實現:
//注意:以下為了測試方便,定時時間都設置為10s
@Scheduled(cron = "0/10 * * * * ? ")
public void closeOrderTaskV2() {
log.info("定時任務啟動");
long lockTime = 5000; //5s
Long lockKeyResult = RedisShardedPoolUtil.setnx(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTime));
//如果獲得了分布式鎖,執行關單業務
if (lockKeyResult != null && lockKeyResult.intValue() == 1) {
closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}else {
String lockValue1 = RedisShardedPoolUtil.get(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
//查到鎖的值並與當前時間比較檢查其是否已經超時,若超時則可以重新獲取鎖
if (lockValue1 != null && System.currentTimeMillis() > Long.valueOf(lockValue1)) {
//通過用當前時間戳getset操作會給對應的key設置新的值並返回舊值,這是一個原子操作
//redis返回nil,則說明該值已經無效
String lockValue2 = RedisShardedPoolUtil.getSet(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTime));
if (lockValue2 == null || StringUtils.equals(lockValue1, lockValue2)) {
//獲取鎖成功
closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
} else {
log.info("沒有獲得分布式鎖:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}
}
log.info("沒有獲得分布式鎖:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}
log.info("定時任務結束================================");
}
至此,我們的這個分布式鎖是沒有問題了。
下面介紹一下使用Redisson
這個框架來實現分布式鎖。
Redisson實現分布式鎖
Redisson是架設在Redis基礎上的一個Java駐內存數據網格(In-Memory Data Grid) ,其功能十分強大,解決很多分布式架構中的問題,附上其GitHub的WIKI地址:https://github.com/redisson/redisson/wiki
官方文檔:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
增加tryLock方法,建議后面去掉DistributedLocker接口和其實現類,直接在RedissLockUtil中注入RedissonClient實現類(簡單但會丟失接口帶來的靈活性)。
1、引用redisson的pom
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.5.0</version> </dependency>
2、定義Lock的接口定義類
import java.util.concurrent.TimeUnit; import org.redisson.api.RLock; public interface DistributedLocker { RLock lock(String lockKey); RLock lock(String lockKey, int timeout); RLock lock(String lockKey, TimeUnit unit, int timeout); boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime); void unlock(String lockKey); void unlock(RLock lock); }
3、Lock接口實現類
import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import java.util.concurrent.TimeUnit; public class RedissonDistributedLocker implements DistributedLocker { private RedissonClient redissonClient; @Override public RLock lock(String lockKey) { RLock lock = redissonClient.getLock(lockKey); lock.lock(); return lock; } @Override public RLock lock(String lockKey, int leaseTime) { RLock lock = redissonClient.getLock(lockKey); lock.lock(leaseTime, TimeUnit.SECONDS); return lock; } @Override public RLock lock(String lockKey, TimeUnit unit ,int timeout) { RLock lock = redissonClient.getLock(lockKey); lock.lock(timeout, unit); return lock; } @Override public boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) { RLock lock = redissonClient.getLock(lockKey); try { return lock.tryLock(waitTime, leaseTime, unit); } catch (InterruptedException e) { return false; } } @Override public void unlock(String lockKey) { RLock lock = redissonClient.getLock(lockKey); lock.unlock(); } @Override public void unlock(RLock lock) { lock.unlock(); } public void setRedissonClient(RedissonClient redissonClient) { this.redissonClient = redissonClient; } }
4、redisson屬性裝配類
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "redisson") @ConditionalOnProperty("redisson.password") public class RedissonProperties { private int timeout = 3000; private String address; private String password; private int database = 0; private int connectionPoolSize = 64; private int connectionMinimumIdleSize=10; private int slaveConnectionPoolSize = 250; private int masterConnectionPoolSize = 250; private String[] sentinelAddresses; private String masterName; public int getTimeout() { return timeout; } public void setTimeout(int timeout) { this.timeout = timeout; } public int getSlaveConnectionPoolSize() { return slaveConnectionPoolSize; } public void setSlaveConnectionPoolSize(int slaveConnectionPoolSize) { this.slaveConnectionPoolSize = slaveConnectionPoolSize; } public int getMasterConnectionPoolSize() { return masterConnectionPoolSize; } public void setMasterConnectionPoolSize(int masterConnectionPoolSize) { this.masterConnectionPoolSize = masterConnectionPoolSize; } public String[] getSentinelAddresses() { return sentinelAddresses; } public void setSentinelAddresses(String sentinelAddresses) { this.sentinelAddresses = sentinelAddresses.split(","); } public String getMasterName() { return masterName; } public void setMasterName(String masterName) { this.masterName = masterName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public int getConnectionPoolSize() { return connectionPoolSize; } public void setConnectionPoolSize(int connectionPoolSize) { this.connectionPoolSize = connectionPoolSize; } public int getConnectionMinimumIdleSize() { return connectionMinimumIdleSize; } public void setConnectionMinimumIdleSize(int connectionMinimumIdleSize) { this.connectionMinimumIdleSize = connectionMinimumIdleSize; } public int getDatabase() { return database; } public void setDatabase(int database) { this.database = database; } public void setSentinelAddresses(String[] sentinelAddresses) { this.sentinelAddresses = sentinelAddresses; } }
5、SpringBoot自動裝配類
import org.apache.commons.lang3.StringUtils; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SentinelServersConfig; import org.redisson.config.SingleServerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.longge.lock.DistributedLocker; import com.longge.lock.RedissonDistributedLocker; import com.longge.lock.RedissonProperties; import com.longge.utils.RedissLockUtil; @Configuration @ConditionalOnClass(Config.class) @EnableConfigurationProperties(RedissonProperties.class) public class RedissonAutoConfiguration { @Autowired private RedissonProperties redssionProperties; /** * 哨兵模式自動裝配 * @return */ @Bean @ConditionalOnProperty(name="redisson.master-name") RedissonClient redissonSentinel() { Config config = new Config(); SentinelServersConfig serverConfig = config.useSentinelServers().addSentinelAddress(redssionProperties.getSentinelAddresses()) .setMasterName(redssionProperties.getMasterName()) .setTimeout(redssionProperties.getTimeout()) .setMasterConnectionPoolSize(redssionProperties.getMasterConnectionPoolSize()) .setSlaveConnectionPoolSize(redssionProperties.getSlaveConnectionPoolSize()); if(StringUtils.isNotBlank(redssionProperties.getPassword())) { serverConfig.setPassword(redssionProperties.getPassword()); } return Redisson.create(config); } /** * 單機模式自動裝配 * @return */ @Bean @ConditionalOnProperty(name="redisson.address") RedissonClient redissonSingle() { Config config = new Config(); SingleServerConfig serverConfig = config.useSingleServer() .setAddress(redssionProperties.getAddress()) .setTimeout(redssionProperties.getTimeout()) .setConnectionPoolSize(redssionProperties.getConnectionPoolSize()) .setConnectionMinimumIdleSize(redssionProperties.getConnectionMinimumIdleSize()); if(StringUtils.isNotBlank(redssionProperties.getPassword())) { serverConfig.setPassword(redssionProperties.getPassword()); } return Redisson.create(config); } /** * 裝配locker類,並將實例注入到RedissLockUtil中 * @return */ @Bean DistributedLocker distributedLocker(RedissonClient redissonClient) { DistributedLocker locker = new RedissonDistributedLocker(); locker.setRedissonClient(redissonClient); RedissLockUtil.setLocker(locker); return locker; } }
6、Lock幫助類
import java.util.concurrent.TimeUnit; import org.redisson.api.RLock; import DistributedLocker; /** * redis分布式鎖幫助類 * @author yangzhilong * */ public class RedissLockUtil { private static DistributedLocker redissLock; public static void setLocker(DistributedLocker locker) { redissLock = locker; } /** * 加鎖 * @param lockKey * @return */ public static RLock lock(String lockKey) { return redissLock.lock(lockKey); } /** * 釋放鎖 * @param lockKey */ public static void unlock(String lockKey) { redissLock.unlock(lockKey); } /** * 釋放鎖 * @param lock */ public static void unlock(RLock lock) { redissLock.unlock(lock); } /** * 帶超時的鎖 * @param lockKey * @param timeout 超時時間 單位:秒 */ public static RLock lock(String lockKey, int timeout) { return redissLock.lock(lockKey, timeout); } /** * 帶超時的鎖 * @param lockKey * @param unit 時間單位 * @param timeout 超時時間 */ public static RLock lock(String lockKey, TimeUnit unit ,int timeout) { return redissLock.lock(lockKey, unit, timeout); } /** * 嘗試獲取鎖 * @param lockKey * @param waitTime 最多等待時間 * @param leaseTime 上鎖后自動釋放鎖時間 * @return */ public static boolean tryLock(String lockKey, int waitTime, int leaseTime) { return redissLock.tryLock(lockKey, TimeUnit.SECONDS, waitTime, leaseTime); } /** * 嘗試獲取鎖 * @param lockKey * @param unit 時間單位 * @param waitTime 最多等待時間 * @param leaseTime 上鎖后自動釋放鎖時間 * @return */ public static boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) { return redissLock.tryLock(lockKey, unit, waitTime, leaseTime); } }
屬性文件實例:
1、單機模式
# redisson lock redisson.address=redis://10.18.75.115:6379 redisson.password=
這里如果不加redis://前綴會報URI構建錯誤,
Caused by: java.net.URISyntaxException: Illegal character in scheme name at index 0
其次,在redis進行連接的時候如果不對密碼進行空判斷,會出現AUTH校驗失敗的情況。
Caused by: org.redisson.client.RedisException: ERR Client sent AUTH, but no password is set. channel
2、哨兵模式
redisson.master-name=mymaster redisson.password=xxxx redisson.sentinel-addresses=10.47.91.83:26379,10.47.91.83:26380,10.47.91.83:26381
更多的配置信息可以去官網查看
初始化完成之后就可以來寫分布式鎖了,使用完Redisson
實現分布鎖之后就會發現一切是那么的簡便:
//使用Redisson實現分布式鎖 @Scheduled(cron = "0/10 * * * * ? ") public void closeOrderTaskV3() { log.info("定時任務啟動"); RLock lock = redissonManager.getRedisson().getLock(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK); boolean getLock = false; try { //todo 若任務執行時間過短,則有可能在等鎖的過程中2個服務任務都會獲取到鎖,這與實際需要的功能不一致,故需要將waitTime設置為0 if (getLock = lock.tryLock(0, 5, TimeUnit.SECONDS)) { int hour = Integer.parseInt(PropertiesUtil.getProperty("close.redis.lock.time","2")); iOrderService.closeOrder(hour); } else { log.info("Redisson分布式鎖沒有獲取到鎖:{},ThreadName :{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName()); } } catch (InterruptedException e) { log.error("Redisson 獲取分布式鎖異常",e); }finally { if (!getLock) { return; } lock.unlock(); log.info("Redisson分布式鎖釋放鎖:{},ThreadName :{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName()); } }
以上就是Redisson的分布式鎖實現代碼,下面來分析一下:
1.RLock lock = redissonManager.getRedisson().getLock(String lockName);
RLock
繼承自java.util.concurrent.locks.Lock
,可以將其理解為一個重入鎖,需要手動加鎖和釋放鎖
來看它其中的一個方法:tryLock(long waitTime, long leaseTime, TimeUnit unit)
2.getLock = lock.tryLock(0, 5, TimeUnit.SECONDS)
通過tryLock()
的參數可以看出,在獲取該鎖時如果被其他線程先拿到鎖就會進入等待,等待waitTime
時間,如果還沒用機會獲取到鎖就放棄,返回false;若獲得了鎖,除非是調用unlock
釋放,那么會一直持有鎖,直到超過leaseTime
指定的時間。
以上就是Redisson實現分布式鎖的核心方法,有人可能要問,那怎么確定拿的是同一把鎖,分布式鎖在哪?
這就是Redisson的強大之處,其底層還是使用的Redis來作分布式鎖,在我們的RedissonManager
中已經指定了Redis實例,Redisson會進行托管,其原理與我們手動實現Redis分布式鎖類似。