SpringBoot使用RedisTemplate+Lua腳本實現Redis分布式鎖
問題:定時任務部署在多台Tomcat上,因此到達指定的定時時間時,多台機器上的定時器可能會同時啟動,造成重復數據或者程序異常等問題。
//發送消息,不能重復發送
@Scheduled(cron = "0 0/15 * * * ? ")
public void sendMsg(String userId) {
}
項目部署在Tom 1 ,Tom 2
如何控制只有一個Tomcat在同一時刻執行任務
使用分布式鎖來控制,誰搶到了鎖就讓誰執行。
一、基於Redis實現分布式鎖
package cn.pconline.pcloud.base.util;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @Description 使用RedisTemplate+Lua腳本實現Redis分布式鎖
* @Author jie.zhao
* @Date 2019/11/19 11:46
*/
@Component
public class RedisLock {
@Autowired
private StringRedisTemplate template;
private static final Long RELEASE_SUCCESS = 1L;
private static final long DEFAULT_TIMEOUT = 1000 * 10;
private static final String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
/**
* 嘗試獲取鎖 立即返回
*
* @param key
* @param value
* @param timeout
* @return
*/
public boolean lock(String key, String value, long timeout) {
return template.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
}
/**
* 以阻塞方式的獲取鎖
*
* @param key
* @param value
* @param timeout
* @return
*/
public boolean lockBlock(String key, String value, long timeout) {
long start = System.currentTimeMillis();
while (true) {
//檢測是否超時
if (System.currentTimeMillis() - start > timeout) {
return false;
}
//執行set命令
//1
Boolean absent = template.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
//其實沒必要判NULL,這里是為了程序的嚴謹而加的邏輯
if (absent == null) {
return false;
}
//是否成功獲取鎖
if (absent) {
return true;
}
}
}
public boolean unlock(String key, String value) {
// 使用Lua腳本:先判斷是否是自己設置的鎖,再執行刪除
// 使用lua腳本刪除redis中匹配value的key,可以避免由於方法執行時間過長而redis鎖自動過期失效的時候誤刪其他線程的鎖
// spring自帶的執行腳本方法中,集群模式直接拋出不支持執行腳本的異常EvalSha is not supported in cluster environment.,所以只能拿到原redis的connection來執行腳本
List<String> keys = new ArrayList<>();
keys.add(key);
List<String> args = new ArrayList<>();
args.add(value);
Long result = template.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
Object nativeConnection = connection.getNativeConnection();
// 集群模式和單機模式雖然執行腳本的方法一樣,但是沒有共同的接口,所以只能分開執行
// 集群模式
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
// 單機模式
else if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
return 0L;
}
});
//返回最終結果
return RELEASE_SUCCESS.equals(result);
}
}
使用方法:
@Scheduled(cron = "0 0/15 * * * ? ")
public void sendExamTemplateMsg() {
if (redisLock.lock(RedisKey.REDIS_JOB_SEND_KEY, RedisKey.REDIS_JOB_SEND_VALUE, 1000 * 60)) {
//....
log.info("定時輪詢考試安排通知結束 \t" + new Date());
} else {
log.info("定時輪詢考試安排,未獲取到鎖其他應用正在執行 \t" + new Date());
}
}
二、分布式鎖的要求
- 互斥性。在任意時刻,只有一個客戶端能持有鎖。
- 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
- 具有容錯性。只要大部分的Redis節點正常運行,客戶端就可以加鎖和解鎖。
- 解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。
三、存在的問題
注意!!!! 該加鎖方法僅針對單實例 Redis 可實現分布式加鎖,或者使用場景少的業務。
原因對於 Redis 集群會有一定幾率出現問題
例如:當進程1對master節點寫入了鎖,此時master節點宕機,slave節點提升為master而剛剛寫入master的鎖還未同步,此時進程2也將能夠獲取鎖成功,此時必然會導致數據不同步問題。還有另一個問題即: key 超時之后業務並沒有執行完畢但卻自動釋放鎖了,這樣就會導致並發問題。
如果需要更加健壯的Redis集群分布式鎖,推薦使用Redisson。