1.redis的應用場景
- 商品秒殺
- 點贊等
現在有一個減少商品的場景,我們很容易能寫出其代碼
@Controller
@ResponseBody
public class Test {
@Autowired
private StringRedisTemplate redisTemplate;
@RequestMapping("/redis")
public String deductSt0ck(){
int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
if (stock>0){
stock--;
redisTemplate.opsForValue().set("a",stock+"");
System.out.println("扣除成功,剩余:" + stock);
}else {
System.out.println("扣除失敗,剩余:" + stock);
}
return "end";
}
}
但是有一個問題,該程序單機下線程不安全。不過可以解決:加鎖
@Controller
@ResponseBody
public class Test {
@Autowired
private StringRedisTemplate redisTemplate;
@RequestMapping("/redis")
public String deductSt0ck(){
synchronized(this){
if (!aBoolean){
return "error";
}
int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
if (stock>0){
stock--;
redisTemplate.opsForValue().set("a",stock+"");
System.out.println("扣除成功,剩余:" + stock);
}else {
System.out.println("扣除失敗,剩余:" + stock);
}
return "end";
}
}
}
加鎖解決了單機的線程安全的問題,但是在集群的情況下線程依舊不安全,因為集群的情況下有多個服務器同時運行那么依然會產生線程安全問題;

因為在同一時間有兩個jvm運行,其中一個jvm的鎖肯定不會影響另一個jvm。故此時就需要用到redis的分布式鎖。
2.redis的分布式鎖
Redis 是一個單進程單線程的非關系型數據庫,而 Redis 鎖的實質便是讓並行的多個線程在Redis 內部以串行的方式執行
redis的(SETNX key value)語句
- 將 key 的值設為 value ,當且僅當 key 不存在。
- 若給定的 key 已經存在,則 SETNX 不做任何動作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
故可以通過redis自身特性,及其setnx操作來實現其分布式鎖。有代碼
@Controller
@ResponseBody
public class Test {
@Autowired
private StringRedisTemplate redisTemplate;
@RequestMapping("/redis")
public String deductSt0ck(){
String lock = "lock";
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, "wf");
if (!aBoolean){
return "error";
}
int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
if (stock>0){
stock--;
redisTemplate.opsForValue().set("a",stock+"");
System.out.println("扣除成功,剩余:" + stock);
}else {
System.out.println("扣除失敗,剩余:" + stock);
}
redisTemplate.delete(lock);
return "end";
}
}
該方法可以規避在單機狀態下的安全問題但這個代碼依然有問題,當程序加鎖后,如果程序在解鎖前出現異常,導致方法退出就不會把加的鎖解開,會導致出現死鎖,可以使用try--finally解決異常的問題。 還有在加鎖后如果該程序突然掛斷那依然會形成死鎖,這個問題可以通過給key設置超時時間來解決
@Controller
@ResponseBody
public class Test {
@Autowired
private StringRedisTemplate redisTemplate;
@RequestMapping("/redis")
public String deductSt0ck(){
String lock = "lock";
String value = String.valueOf(UUID.randomUUID());
try {
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, value,10, TimeUnit.SECONDS);
if (!aBoolean){
return "error";
}
int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
if (stock>0){
stock--;
redisTemplate.opsForValue().set("a",stock+"");
System.out.println("扣除成功,剩余:" + stock);
}else {
System.out.println("扣除失敗,剩余:" + stock);
}
}finally {
redisTemplate.delete(lock);
}
return "end";
}
}
到這里,看似問題已經解決了,但是確實代碼在分布式高並發的情況下依然有問題,就是在加鎖后,如果代碼出現問題,導致在還沒有執行完成業務邏輯的情況下,達到了設置到超時時間,就會導致鎖失效,從而使其他的線程獲得執行權限,而在其他的線程加完鎖后,第一個線程突然執行完成釋放第二個線程加的鎖。使下一個線程獲得執行權限,加鎖又被第二個線程釋放/。這樣鏈式這些下去就會導致鎖失效。
這個問題可以通過每次給線程設置value值不同的鎖,在釋放鎖使判斷,如果value與設置到相同就釋放鎖,負責不釋放。
@Controller
@ResponseBody
public class Test {
@Autowired
private StringRedisTemplate redisTemplate;
@RequestMapping("/redis")
public String deductSt0ck(){
String lock = "lock";
String value = String.valueOf(UUID.randomUUID());
try {
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, value,10, TimeUnit.SECONDS);
if (!aBoolean){
return "error";
}
int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
if (stock>0){
stock--;
redisTemplate.opsForValue().set("a",stock+"");
System.out.println("扣除成功,剩余:" + stock);
}else {
System.out.println("扣除失敗,剩余:" + stock);
}
}finally {
if (value.equals(redisTemplate.opsForValue().get(lock))){
redisTemplate.delete(lock);
}
}
return "end";
}
}
到這里看似代碼已經完美了。但是上敘方案還有問題沒有解決,就是在第二個線程加鎖執行時,如果第一個線程的主邏輯沒有執行完成,那么就會導致兩個線程進行同時執行業務邏輯,此時線程依然不安全。而要解決該問題就需要在程序還未結束,鎖到達超時時間時延長程鎖的超時時間,及寫一個子線程,檢測如果只要程序沒有執行完成就不停的延長鎖的過期時間
@Controller
@ResponseBody
public class Test {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private Redisson redisson;
@RequestMapping("/redis2")
public String deductStock(){
String lock = "lock";
String value = String.valueOf(UUID.randomUUID());
try {
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, value,10, TimeUnit.SECONDS);
if (!aBoolean){
return "error";
}
new Thread(()->{
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
if (value.equals(redisTemplate.opsForValue().get(lock))){
redisTemplate.expire(lock,15,TimeUnit.SECONDS);
}
}
},5000,5000);
}).start();
int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
if (stock>0){
stock--;
redisTemplate.opsForValue().set("a",stock+"");
System.out.println("扣除成功,剩余:" + stock);
}else {
System.out.println("扣除失敗,剩余:" + stock);
}
}finally {
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (value.equals(redisTemplate.opsForValue().get(lock))){
redisTemplate.delete(lock);
}
}
return "ok";
}
}
3.通過redisson框架實現redis分布式鎖
@Controller
@ResponseBody
public class Test {
@Autowired
private StringRedisTemplate redisTemplate;
@RequestMapping("/redis")
public String deductSt0ck(){
String lock = "lock";
RLock rLock = redisson.getLock(lock);
try {
rLock.lock(30,TimeUnit.SECONDS);
int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
if (stock>0){
stock--;
redisTemplate.opsForValue().set("a",stock+"");
System.out.println("扣除成功,剩余:" + stock);
}else {
System.out.println("扣除失敗,剩余:" + stock);
}
} finally {
rLock.unlock();
}
return "end";
}
}
redisson默認實現了以上所有的東西,

Redis 主從架構失效問題
以上架構還存在問題:有這樣一種場景,即線程1成功上鎖后,但是主服務器還未來的及復制到從服務器便發生了宕機,此時從服務器被選擇為主服務器,由於其沒有鎖記錄,其他線程便可以進行上鎖,此時便發生了線程安全問題。
Red Lock 算法也可以解決上面存在的問題,其算法思想為:*使用多台 Redis Master ,節點完全獨立,節點間不需要進行數據同步,因為 Master-Slave 架構一旦 Master 發生故障時數據沒有復制到 Slave,被選為 Master 的 Slave 就丟掉了鎖,另一個客戶端就可以再次拿到鎖,鎖通過 setNX(原子操作) 命令設置,在有效時間內當獲得鎖的數量大於 (n/2+1) 代表成功,失敗后需要向所有節點發送釋放鎖的消息。*
