本案例主要講解
Redis
實現分布式鎖的兩種實現方式:Jedis
實現、Redisson
實現。網上關於這方面講解太多了,Van自認為文筆沒他們好,還是用示例代碼說明。
一、jedis
實現
該方案只考慮
Redis
單機部署的場景
1.1 加鎖
1.1.1 原理
jedis.set(String key, String value, String nxxx, String expx, int time)
key
: 使用key
來當鎖,因為key
是唯一的;value
: 我傳的是唯一值(UUID
),很多童鞋可能不明白,有key
作為鎖不就夠了嗎,為什么還要用到value
?原因是分布式鎖要滿足解鈴還須系鈴人:通過給value
賦值為requestId
,我們就知道這把鎖是哪個請求加的了,在解鎖的時候要驗證value
值,不能誤解鎖;nxxx
: 這個參數我填的是NX
,意思是SET IF NOT EXIST
,即當key
不存在時,我們進行set
操作;若key
已經存在,則不做任何操作;expx
: 這個參數我傳的是PX
,意思是我們要給這個key
加一個過期的設置,具體時間由第五個參數決定;time
: 與第四個參數相呼應,代表key
的過期時間。
1.1.2 小結
set()
加入了NX
參數,可以保證如果已有key
存在,則函數不會調用成功,也就是只有一個客戶端能持有鎖,滿足互斥性;- 其次,由於我們對鎖設置了過期時間,即使鎖的持有者后續發生崩潰而沒有解鎖,鎖也會因為到了過期時間而自動解鎖(即
key
被刪除),不會發生死鎖; - 最后,因為我們將
value
賦值為requestId
,代表加鎖的客戶端請求標識,那么在客戶端在解鎖的時候就可以進行校驗是否是同一個客戶端。
1.2 釋放鎖
釋放鎖時需要驗證
value
值,也就是說我們在獲取鎖的時候需要設置一個value
,不能直接用del key
這種粗暴的方式,因為直接del key
任何客戶端都可以進行解鎖了,所以解鎖時,我們需要判斷鎖是否是自己的(基於value
值來判斷)
- 首先,寫了一個簡單
Lua
腳本代碼,作用是:獲取鎖對應的value
值,檢查是否與requestId
相等,如果相等則刪除鎖(解鎖); - 然后,將
Lua
代碼傳到jedis.eval()
方法里,並使參數KEYS[1]
賦值為lockKey
,ARGV[1]
賦值為requestId
。eval()
方法是將Lua
代碼交給Redis服務端執行。
1.3 案例(家庭多人領取獎勵的場景)
這里放出的是關鍵代碼,詳細可運行的代碼可至文末地址下載示例代碼。
1.3.1 准備
該案例模擬家庭內多人通過領取一個獎勵,但是只能有一個人能領取成功,不能重復領取(之前做過獎勵模塊的需求)
family_reward_record
表
CREATE TABLE `family_reward_record` ( `id` bigint(10) NOT NULL AUTO_INCREMENT COMMENT '主鍵id', `family_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '商品名稱', `reward_type` int(10) NOT NULL DEFAULT '1' COMMENT '商品庫存數量', `state` int(1) NOT NULL DEFAULT '0' COMMENT '商品狀態', `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '入庫時間', `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=270 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='家庭領取獎勵表(家庭內多人只能有一個人能領取成功,不能重復領取)';
application.yml
spring:
datasource:
url: jdbc:mysql://47.98.178.84:3306/dev
username: dev
password: password
driver-class-name: com.mysql.jdbc.Driver
redis:
host: 47.98.178.84
port: 6379
password: password
timeout: 2000
# mybatis
mybatis:
mapper-locations: classpath:mapper/*.xml
type-aliases-package: cn.van.mybatis.demo.entity
1.3.2 核心實現
- Jedis 單機配置類 -
RedisConfig.java
@Configuration public class RedisConfig extends CachingConfigurerSupport { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.timeout}") private int timeout; @Bean public JedisPool redisPoolFactory() { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); if (StringUtils.isEmpty(password)) { return new JedisPool(jedisPoolConfig, host, port, timeout); } return new JedisPool(jedisPoolConfig, host, port, timeout, password); } @Bean(name = "redisTemplate") public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(redisConnectionFactory); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, Visibility.ANY); objectMapper.enableDefaultTyping(DefaultTyping.NON_FINAL); Jackson2JsonRedisSerializer<Object> jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); jsonRedisSerializer.setObjectMapper(objectMapper); redisTemplate.setDefaultSerializer(jsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
- 分布式鎖工具類 -
RedisDistributedLock.java
@Component public class RedisDistributedLock { /** * 成功獲取鎖標示 */ private static final String LOCK_SUCCESS = "OK"; /** * 成功解鎖標示 */ private static final Long RELEASE_SUCCESS = 1L; @Autowired private JedisPool jedisPool; /** * redis 數據存儲過期時間 */ final int expireTime = 500; /** * 嘗試獲取分布式鎖 * @param lockKey 鎖 * @param lockValue 請求標識 * @return 是否獲取成功 */ public boolean tryLock(String lockKey, String lockValue) { Jedis jedis = null; try{ jedis = jedisPool.getResource(); String result = jedis.set(lockKey, lockValue, "NX", "PX", expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } } finally { if(jedis != null){ jedis.close(); } } return false; } /** * 釋放分布式鎖 * @param lockKey 鎖 * @param lockValue 請求標識 * @return 是否釋放成功 */ public boolean unLock(String lockKey, String lockValue) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(lockValue)); if (RELEASE_SUCCESS.equals(result)) { return true; } } finally { if(jedis != null){ jedis.close(); } } return false; } }
- 不加鎖時:模擬
familyId = 1
的家庭同時領取獎勵
@Override public HttpResult receiveAward() { Long familyId = 1L; Map<String, Object> params = new HashMap<String, Object>(16); params.put("familyId", familyId); params.put("rewardType", 1); int count = familyRewardRecordMapper.selectCountByFamilyIdAndRewardType(params); if (count == 0) { FamilyRewardRecordDO recordDO = new FamilyRewardRecordDO(familyId,1,0,LocalDateTime.now()); int num = familyRewardRecordMapper.insert(recordDO); if (num == 1) { return HttpResult.success(); } return HttpResult.failure(-1, "記錄插入失敗"); } return HttpResult.success("該記錄已存在"); }
- 加鎖的實現:模擬
familyId = 2
的家庭同時領取獎勵
@Override public HttpResult receiveAwardLock() { Long familyId = 2L; Map<String, Object> params = new HashMap<String, Object>(16); params.put("familyId", familyId); params.put("rewardType", 1); int count = familyRewardRecordMapper.selectCountByFamilyIdAndRewardType(params); if (count == 0) { // 沒有記錄則創建領取記錄 FamilyRewardRecordDO recordDO = new FamilyRewardRecordDO(familyId,1,0,LocalDateTime.now()); // 分布式鎖的key(familyId + rewardType) String lockKey = recordDO.getFamilyId() + "_" + recordDO.getRewardType(); // 分布式鎖的value(唯一值) String lockValue = createUUID(); boolean lockStatus = redisLock.tryLock(lockKey, lockValue); // 鎖被占用 if (!lockStatus) { log.info("鎖已經占用了"); return HttpResult.failure(-1,"失敗"); } // 不管多個請求,加鎖之后,只會有一個請求能拿到鎖,進行插入操作 log.info("拿到了鎖,當前時刻:{}",System.currentTimeMillis()); int num = familyRewardRecordMapper.insert(recordDO); if (num != 1) { log.info("數據插入失敗!"); return HttpResult.failure(-1, "數據插入失敗!"); } log.info("數據插入成功!准備解鎖..."); boolean unLockState = redisLock.unLock(lockKey,lockValue); if (!unLockState) { log.info("解鎖失敗!"); return HttpResult.failure(-1, "解鎖失敗!"); } log.info("解鎖成功!"); return HttpResult.success(); } log.info("該記錄已存在"); return HttpResult.success("該記錄已存在"); } private String createUUID() { UUID uuid = UUID.randomUUID(); String str = uuid.toString().replace("-", "_"); return str; }
1.3.3 測試
我采用的是
JMeter
工具進行測試,加鎖和不加鎖的情況都設置成:五次並發請求。
1.3.3.1 不加鎖
/** * 家庭成員領取獎勵(不加鎖) * @return */ @PostMapping("/receiveAward") public HttpResult receiveAward() { return redisLockService.receiveAward(); }
- 請求方式:
POST
- 請求地址:http://localhost:8080/redisLock/receiveAward
- 返回結果:插入了五條記錄
1.3.3.2 加鎖
/** * 家庭成員領取獎勵(加鎖) * @return */ @PostMapping("/receiveAwardLock") public HttpResult receiveAwardLock() { return redisLockService.receiveAwardLock(); }
- 請求方式:
POST
- 請求地址:http://localhost:8080/redisLock/receiveAwardLock
- 返回結果:只插入了一條記錄
通過對比,說明分布式鎖起作用了。
1.4 小結
我上家使用的就是這種加鎖方式,看上去很OK,實際上在Redis
集群的時候會出現問題,比如:
A
客戶端在Redis
的master
節點上拿到了鎖,但是這個加鎖的key
還沒有同步到slave
節點,master
故障,發生故障轉移,一個slave
節點升級為master
節點,B
客戶端也可以獲取同個key
的鎖,但客戶端A
也已經拿到鎖了,這就導致多個客戶端都拿到鎖。
正因為如此,Redis
作者antirez
基於分布式環境下提出了一種更高級的分布式鎖的實現方式:Redlock
。
二、Redlock
實現
2.1 原理
antirez
提出的Redlock
算法大概是這樣的:
在Redis
的分布式環境中,我們假設有N
個Redis master
。這些節點完全互相獨立,不存在主從復制或者其他集群協調機制。我們確保將在N
個實例上使用與在Redis
單實例下相同方法獲取和釋放鎖。現在我們假設有5
個Redis master
節點,同時我們需要在5
台服務器上面運行這些Redis
實例,這樣保證他們不會同時都宕掉。
2.1.1 加鎖
為了取到鎖,客戶端應該執行以下操作(RedLock
算法加鎖步驟):
- 獲取當前
Unix
時間,以毫秒為單位; - 依次嘗試從
5
個實例,使用相同的key
和具有唯一性的value
(例如UUID
)獲取鎖。當向Redis
請求獲取鎖時,客戶端應該設置一個網絡連接和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間為10
秒,則超時時間應該在5-50
毫秒之間。這樣可以避免服務器端Redis
已經掛掉的情況下,客戶端還在死死地等待響應結果。如果服務器端沒有在規定時間內響應,客戶端應該盡快嘗試去另外一個Redis
實例請求獲取鎖; - 客戶端使用當前時間減去開始獲取鎖時間(步驟
1
記錄的時間)就得到獲取鎖使用的時間。當且僅當從大多數(N/2+1
,這里是3
個節點)的Redis
節點都取到鎖,並且使用的時間小於鎖失效時間時,鎖才算獲取成功; - 如果取到了鎖,
key
的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3
計算的結果)。 - 如果因為某些原因,獲取鎖失敗(沒有在至少
N/2+1
個Redis
實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在所有的Redis
實例上進行解鎖(即便某些Redis
實例根本就沒有加鎖成功,防止某些節點獲取到鎖但是客戶端沒有得到響應而導致接下來的一段時間不能被重新獲取鎖)。
2.1.2 解鎖
向所有的Redis
實例發送釋放鎖命令即可,不用關心之前有沒有從Redis
實例成功獲取到鎖.
2.2 案例(商品超賣為例)
這部分以最常見的案例:搶購時的商品超賣(庫存數減少為負數)為例
2.2.1 准備
good
表
CREATE TABLE `good` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵id', `good_name` varchar(255) NOT NULL COMMENT '商品名稱', `good_counts` int(255) NOT NULL COMMENT '商品庫存', `create_time` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '創建時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='商品表'; -- 插入兩條測試數據 INSERT INTO `good` VALUES (1, '哇哈哈', 5, '2019-09-20 17:39:04'); INSERT INTO `good` VALUES (2, '衛龍', 5, '2019-09-20 17:39:06');
- 配置文件跟上面一樣
2.2.2 核心實現
Redisson
配置類RedissonConfig.java
我這里配置的是單機,更多配置詳見https://github.com/redisson/redisson/wiki/配置
@Configuration public class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Value("${spring.redis.password}") private String password; /** * RedissonClient,單機模式 * @return * @throws IOException */ @Bean public RedissonClient redissonSentinel() { //支持單機,主從,哨兵,集群等模式,此為單機模式 Config config = new Config(); config.useSingleServer() .setAddress("redis://" + host + ":" + port) .setPassword(password); return Redisson.create(config); } }
- 不加鎖時
@Override public HttpResult saleGoods(){ // 以指定goodId = 1:哇哈哈為例 Long goodId = 1L; GoodDO goodDO = goodMapper.selectByPrimaryKey(goodId); int goodStock = goodDO.getGoodCounts(); if (goodStock >= 1) { goodMapper.saleOneGood(goodId); } return HttpResult.success(); }
- 加鎖
@Override public HttpResult saleGoodsLock(){ // 以指定goodId = 2:衛龍為例 Long goodId = 2L; GoodDO goodDO = goodMapper.selectByPrimaryKey(goodId); int goodStock = goodDO.getGoodCounts(); String key = goodDO.getGoodName(); log.info("{}剩余總庫存,{}件", key,goodStock); // 將商品的實時庫存放在redis 中,便於讀取 stringRedisTemplate.opsForValue().set(key, Integer.toString(goodStock)); // redisson 鎖 的key String lockKey = goodDO.getId() +"_" + key; RLock lock = redissonClient.getLock(lockKey); // 設置60秒自動釋放鎖 (默認是30秒自動過期) lock.lock(60, TimeUnit.SECONDS); // 此步開始,串行銷售 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key)); // 如果緩存中庫存量大於1,可以繼續銷售 if (stock >= 1) { goodDO.setGoodCounts(stock - 1); int num = goodMapper.saleOneGood(goodId); if (num == 1) { // 減庫存成功,將緩存同步 stringRedisTemplate.opsForValue().set(key,Integer.toString((stock-1))); } log.info("{},當前庫存,{}件", key,stock); } lock.unlock(); return HttpResult.success(); }
2.3 測試
采用的是
JMeter
工具進行測試,初始化的時候兩個商品的庫存設置都是:5
;所以這里加鎖和不加鎖的情況都設置成:十次並發請求。
2.3.1 不加鎖
/** * 售賣商品(不加鎖) * @return */ @PostMapping("/saleGoods") public HttpResult saleGoods() { return redisLockService.saleGoods(); }
- 請求方式:
POST
- 請求地址:http://localhost:8080/redisLock/saleGoods
- 返回結果:
id =1
的商品庫存減為-5
2.3.2 加鎖
/** * 售賣商品(加鎖) * @return */ @PostMapping("/saleGoodsLock") public HttpResult saleGoodsLock() { return redisLockService.saleGoodsLock(); }
- 請求方式:
POST
- 請求地址:http://localhost:8080/redisLock/saleGoodsLock
- 返回結果:
id =1
的商品庫存減為0
2.3.3 小結
通過2.3.1
和2.3.2
的結果對比很明顯:前者出現了超賣情況,庫存數賣到了-5
,這是決不允許的;而加了鎖的情況后,庫存只會減少到0
,便不再銷售。
三、總結
再次說明:以上代碼不全,如需嘗試,請前往Van 的 Github 查看完整示例代碼
第一種基於Redis
的分布式鎖並不適合用於生產環境。Redisson
可用於生產環境。當然,分布式的選擇還有Zookeeper
的選項,Van后續會整理出來供大家參考。
3.1 示例源碼地址
https://github.com/vanDusty/SpringBoot-Home/tree/master/springboot-demo-lock/redis-lock