業務場景:高並發場景下的減庫存代碼實現
方案一:使用JVM或JDK級別的鎖【synchronized】
問題:使用synchronized的加鎖,如果是單機環境的話沒有問題,但是對於集群/分布式環境則會出問題,對於跨tomcat就會鎖不住。
@RestController public class IndexControlelr { @Autowired private Redisson redisson; @Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping("/deduct_stock") public String deductStock() { //以下的代碼高並發場景下有問題 synchronized(this) { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //獲取redis值 jedis.setnx(key.value) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("", realStock + ""); //設置redis值 jedis.set(key.value) Sysytem.out.printLn("扣減成功,剩余庫存:" + realStock); } else { Sysytem.out.printLn("扣減失敗,庫存不足"); } } return "end"; } }
方案二:為了解決方案一的問題,使用redis分布式鎖的SETNX命令可以解決剛剛方案一的問題。
使用格式:setnx key value 將key的值設為value,當且僅當key不存在。若給定的key已存在,則SETNX不做任何操作。
問題:會出現死鎖,就是當程序執行一般,中間的代碼出現異常導致無法釋放這把鎖,此時就會出現死鎖的現象。
//使用redis分布式鎖 @RequestMapping("/deduct_stock") public String deductStock() { String lockKey = "lockKey"; boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"xl"); if(!result){ return "error_code"; }
//如果執行到這里下面的代碼拋異常則無法完成釋放鎖,死鎖產生 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //獲取redis值 jedis.setnx(key.value) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("", realStock + ""); //設置redis值 jedis.set(key.value) Sysytem.out.printLn("扣減成功,剩余庫存:" + realStock); } else { Sysytem.out.printLn("扣減失敗,庫存不足"); } //釋放鎖 stringRedisTemplate.delete(lockKey); return "end"; }
方案三:為了解決方案二的問題,設置key和操作時間+try ...catch...finally釋放鎖
問題:時間問題,高並發場景下此代碼將就可以用了,但是會出現自己加的鎖會被別人釋放掉。
@RequestMapping("/deduct_stock") public String deductStock() { String lockKey = "lockKey"; try { //解決:給鎖加一個超時時間, //boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"xl"); //stringRedisTemplate.expire(lockKey,10,TimeUnit.SECONDS); //設置key和操作時間==保證原子性 將上面的兩行合並為下面的一行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"xl",10,TimeUnit.SECONDS); if(!result){ return "error_code"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //獲取redis值 jedis.setnx(key.value) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("", realStock + ""); //設置redis值 jedis.set(key.value) Sysytem.out.printLn("扣減成功,剩余庫存:" + realStock); } else { Sysytem.out.printLn("扣減失敗,庫存不足"); } //解決釋放鎖的問題使用finally釋放鎖 }funally { //釋放鎖 stringRedisTemplate.delete(lockKey); } return "end"; }
方案四:為了解決方案三的問題,生成UUID,將這個UUID設置到鎖對應的value里面,自己加的鎖只能自己釋放,並在后台啟動一個分線程每個一段時間(一般這個時間是你設置的超時時間的1/3,超時時間一般默認為30s)去檢查一下主線程是否還持有這把鎖,如果還持有這把鎖的話就把設置的超時時間順延30s
問題:代碼量太大了
@RequestMapping("/deduct_stock") public String deductStock() { String lockKey = "lockKey"; //生成一個UUID String clientId = UUID.randomUUID().toString(); try { //boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"xl"); //stringRedisTemplate.expire(lockKey,10,TimeUnit.SECONDS); //將UUID設置到鎖對應的value里面 //這里的時間還是會存在問題==解決,在后台起一個分線程:起一個定時任務每10s(不超過30s,設置值的1/3時間)檢查一下主線程是否還持有這把鎖, //如果還持有把這個超時時間延長(重新設置30s),====>問題代碼量太大了 //解決方案:reddison框架底層的原理就是我們現在寫的這些邏輯。 //使用:pom直接引入依賴包即可,可以支持很多redis架構模式(主從,哨兵,高並發等) boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,clientId,30,TimeUnit.SECONDS); if(!result){ return "error_code"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //獲取redis值 jedis.setnx(key.value) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("", realStock + ""); //設置redis值 jedis.set(key.value) Sysytem.out.printLn("扣減成功,剩余庫存:" + realStock); } else { Sysytem.out.printLn("扣減失敗,庫存不足"); } //解決釋放鎖的問題使用finally釋放鎖 }funally { //判斷一下這把鎖是不是自己加的鎖(線程id) if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){ stringRedisTemplate.delete(lockKey); } } return "end"; } }
方案五:為了解決方案四的問題。使用Redission框架
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class,args); } @Bean public Redisson redisson() { Config config = new Config(); config.useSingleServer().setAddress("redis://localhost:8769").setDatabase(0); return (Redisson) Redisson.create(config); } } ----------------------------------------------------------------------------------------------------- @RequestMapping("/deduct_stock") public String deductStock() { String lockKey = "lockKey"; RLock redissonLock = redisson.getLock(lockKey); try { //加鎖默認的超時時間30s redissonLock.lock(); //setIfAbsent(lockKey,clientId,30,TimeUnit.SECONDS); int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //獲取redis值 jedis.setnx(key.value) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("", realStock + ""); //設置redis值 jedis.set(key.value) Sysytem.out.printLn("扣減成功,剩余庫存:" + realStock); } else { Sysytem.out.printLn("扣減失敗,庫存不足"); } }funally { //釋放鎖 redissonLock.unlock(); } } return "end"; }
Redission 的源碼剖析
Lua腳本具有原子性,所以redis把上面的一整串字符串當作一條命令來執行,要么成功,要么失敗(解決了分布式一致性的問題)。
Redission分布式鎖實現原理
Redis集群架構一般是滿足AP,redis主節點獲取到鎖之后會立馬返回給客戶端。QPS理論上是10萬,但是一般達不到10萬,才幾萬。
zookeeper是CP:一致性,主節點獲取到鎖之后先同步給從節點,半數以上的從節點獲取到鎖之后再返回給客戶端。
****一般會使用zookeeper來做分布式鎖,redis做緩存,但是還是有很多的公司選擇使用redis來做分布式鎖,因為redis的性能高(對並發要求比較高的情況)。