分布式鎖場景 在分布式環境下多個操作需要以原子的方式執行 首先啟一個springboot項目,再引入redis依賴包:
<!-- https://mvnrepository.com/artifa ... -starter-data-redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.2.2.RELEASE</version> </dependency> 以下是一個扣減庫存的接口作為例子:
@RestController public class IndexController { @Autowired private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock") public Stirng deductStock() { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get(key) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue.set("stock",realStock+"");//jedis.set(key,value) System.out.println(扣減成功,剩余庫存:" + realStock + ""); } else { System.out.println(扣減失敗,庫存不足!" ); } return "end"; } } 1.單實例應用場景 以上代碼使用JMeter壓測工具進行調用,設置參數為: Number Of Threads[users]:100 Ramp Up Period[in seconds]:0 Loop Count:2 用單個web調用,結果出現並發問題: <ignore_js_op> 解決方案:加入同步鎖(synchronized)
@RestController public class IndexController { @Autowired private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock") public Stirng deductStock() { synchronized(this) { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get(key) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue.set("stock",realStock+"");//jedis.set(key,value) System.out.println(扣減成功,剩余庫存:" + realStock + ""); } else { System.out.println(扣減失敗,庫存不足!" ); } return "end"; } } }
2.多實例分布式場景 以上代碼,比如有多個應用程序,用nginx做負載均衡,進行同時調用壓測 兩個程序存在同樣的扣減,出現並發現象。 第一個應用扣減結果顯示: <ignore_js_op> 第二個應用扣減結果顯示: <ignore_js_op> 解決方案:redis的setnx方法(可參考SETNX的api) 多個線程setnx調用時,有且僅有一個線程會拿到這把鎖,所以拿到鎖的執行業務代碼,最后釋放掉鎖,代碼如下:
@RestController public class IndexController { @Autowired private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock") public Stirng deductStock() { String lockkey = "lockkey"; Boolean result = stringRedisTemplate.opsForValue.setIfAbsent(lockkey,"lockvalue");//jedis.setnx if(!result) { return ""; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get(key) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue.set("stock",realStock+"");//jedis.set(key,value) System.out.println(扣減成功,剩余庫存:" + realStock + ""); } else { System.out.println(扣減失敗,庫存不足!" ); } springRedisTemplate.delete(lockkey); return "end"; } } 調用200次,壓測結果顯示還是有問題,只減掉了一部分: <ignore_js_op> 這時,加大壓測次數,結果正常了: 第一個應用扣減結果顯示: <ignore_js_op> 第二個應用扣減結果顯示: <ignore_js_op> 這個只是因為加大了調用次數,執行業務代碼需要一點時間,這段時間拒絕了很多等待獲取鎖的請求。但是,還是有問題,假如redis服務掛掉了,拋出異常了,這時鎖不會被釋放掉,出現死鎖問題,可以添加try catch處理,代碼如下:
@RestController public class IndexController { @Autowired private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock") public Stirng deductStock() { String lockkey = "lockkey"; try{ Boolean result = stringRedisTemplate.opsForValue.setIfAbsent(lockkey,"lockvalue");//jedis.setnx if(!result) { return ""; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get(key) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue.set("stock",realStock+"");//jedis.set(key,value) System.out.println(扣減成功,剩余庫存:" + realStock + ""); } else { System.out.println(扣減失敗,庫存不足!" ); } }finally{ springRedisTemplate.delete(lockkey); } return "end"; } } 這時,Redis服務掛掉導致死鎖的問題解決了,但是,如果服務器果宕機了,又會導致鎖不能被釋放的現象,所以可以設置超時時間為10s,代碼如下:
@RestController public class IndexController { @Autowired private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock") public Stirng deductStock() { String lockkey = "lockkey"; try{ Boolean result = stringRedisTemplate.opsForValue.setIfAbsent(lockkey,"lockvalue",10,TimeUnit.SECONDS);//jedis.setnx //Boolean result = stringRedisTemplate.opsForValue.setIfAbsent(lockkey,"lockvalue");//jedis.setnx //stringRedisTemplate.expire(lockkey,10,TimeUnit.SECONDS); if(!result) { return ""; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get(key) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue.set("stock",realStock+"");//jedis.set(key,value) System.out.println(扣減成功,剩余庫存:" + realStock + ""); } else { System.out.println(扣減失敗,庫存不足!" ); } }finally{ springRedisTemplate.delete(lockkey); } return "end"; } } 這時,如果有一個線程執行需要15s,當執行到10s時第二個線程進來拿到這把鎖,會出現多個線程拿到同一把鎖執行,在第一個線程執行完時會釋放掉第二個線程的鎖,以此類推…就會導致鎖的永久失效。所以,只能自己釋放自己的鎖,可以給當前線程取一個名字,代碼如下:
@RestController public class IndexController { @Autowired private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock") public Stirng deductStock() { String lockkey = "lockkey"; String clientId = UUID.randomUUID().toString(); try{ Boolean result = stringRedisTemplate.opsForValue.setIfAbsent(lockkey,clientId ,10,TimeUnit.SECONDS);//jedis.setnx //Boolean result = stringRedisTemplate.opsForValue.setIfAbsent(lockkey,"lockvalue");//jedis.setnx //stringRedisTemplate.expire(lockkey,10,TimeUnit.SECONDS); if(!result) { return ""; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get(key) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue.set("stock",realStock+"");//jedis.set(key,value) System.out.println(扣減成功,剩余庫存:" + realStock + ""); } else { System.out.println(扣減失敗,庫存不足!" ); } }finally{ springRedisTemplate.delete(lockkey); } return "end"; } }
永久失效的問題解決了,但是,如果第一個線程執行15s,還是會存在多個線程擁有同一把鎖的現象。所以,需要續期超時時間,當一個線程執行5s后對超時時間進行續期都10s,就可以解決了,續期設置可以借助redission工具。
Redission使用 Redission分布式鎖實現原理: <ignore_js_op> pom.xml
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.6.5</version> </dependency>
Application.java啟動類
@bean public Redission redission { //此為單機模式 Config config = new Config(); config.useSingleServer().setAddress("redis://120.0.0.1:6379").setDatabase(0); return (Redission)Redission.creat(config); }
最終解決以上所有問題的代碼如下:
@RestController public class IndexController { @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private Redissionredission;
@RequestMapping("/deduct_stock") public Stirng deductStock() { String lockkey = "lockkey"; //String clientId = UUID.randomUUID().toString(); RLock lock = redission.getLock(); try{ //Boolean result = stringRedisTemplate.opsForValue.setIfAbsent(lockkey,clientId ,10,TimeUnit.SECONDS);//jedis.setnx //Boolean result = stringRedisTemplate.opsForValue.setIfAbsent(lockkey,"lockvalue");//jedis.setnx //stringRedisTemplate.expire(lockkey,10,TimeUnit.SECONDS); //加鎖:redission默認超時時間為30s,每10s續期一次,也可以自己設置時間 lock.lock(60,TimeUnit.SECONDS); int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get(key) if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue.set("stock",realStock+"");//jedis.set(key,value) System.out.println(扣減成功,剩余庫存:" + realStock + ""); } else { System.out.println(扣減失敗,庫存不足!" ); } }finally{ lock.unlock(); //springRedisTemplate.delete(lockkey); } return "end"; } } 高並發分布式鎖的問題得到解決。 |
|