
上面的代碼更新庫存的數據,存在多線程的問題,第一種方法使用synchronized關鍵字修飾的語句塊代碼,但是性能較低,並且還是存在問題的
在分布式的場景下,當前庫存系統部署在多個tomcat上,即使加了同步鎖,也會存在問題,一個線程訪問tomcat1,另外一個線程同時訪問tomcat2,兩個都是進行減少庫存操作也是存在問題的,synchronized同步不能跨jvm

上面的代碼在一個jvm進程下面解決多線程是沒有問題的,但是在分布式環境下部署多個tomcat下部署多個庫存微服務,使用synchronized是存在問題的
我們可以使用下面的架構來進行測試,測試上面的代碼不正確

nginx負載代碼后面的兩個tomcat

8080和8090就是nginx反向代理兩個tomcat
nginx的配置如下:
#user nobody;
worker_processes 1;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';
#access_log logs/access.log main;
log_format json '{"@timestamp":"$time_iso8601",'
'"remote_addr":"$remote_addr",'
'"status":$status,'
'"bodysize":$body_bytes_sent,'
'"referer":"$http_referer",'
'"ua":"$http_user_agent",'
'"handletime":$request_time,'
'"url":"$uri"}';
access_log logs/access.log;
access_log logs/access.json.log json;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
#引入自定義配置文件
include reverse-procy.conf;
upstream redislock{
server 127.0.0.1:6666 weight=1;
server 127.0.0.1:7777 weight=1;
}
server {
listen 8088;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / { root html; index index.html index.htm; proxy_pass http://redislock; }
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
}
# another virtual host using mix of IP-, name-, and port-based configuration
#
#server {
# listen 8000;
# listen somename:8080;
# server_name somename alias another.alias;
# location / {
# root html;
# index index.html index.htm;
# }
#}
# HTTPS server
#
#server {
# listen 443 ssl;
# server_name localhost;
# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;
# ssl_session_cache shared:SSL:1m;
# ssl_session_timeout 5m;
# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;
# location / {
# root html;
# index index.html index.htm;
# }
#}
}

第二個參數0表示同一時刻發起200次請求,配置成5,表示在5秒內會總共發器200次請求 ,配置成1,表示1秒鍾之內發起200次請求
在端口為7777這台機器上打印結果如下

在端口為6666這台機器上打印結果如下

我們來看下上面具體的代碼配置
兩台機器上存在相同的記錄,說明存在兩台機器對同一個庫存進行操作的情況,說明上面的代碼存在問題,我們可以使用redis的senx命令來解決

分布式鎖要注意解決下面的幾個問題:
1、釋放鎖其實只需要把鎖的key刪除即可,使用del xxx指令。不過,仔細思考,如果在我們執行del之前,
服務突然宕機,那么鎖豈不是永遠無法刪除了?!
為了避免因服務宕機引起鎖無法釋放問題,我們可以在獲取鎖的時候,給鎖加一個有效時間,當時間超
出時,就會自動釋放鎖,這樣就不會死鎖了
並且要保證鎖被刪除,要放在try finally中

步驟如下:
1、通過set命令設置鎖
2、判斷返回結果是否是OK
1)Nil,獲取失敗,結束或重試(自旋鎖)
2)OK,獲取鎖成功
執行業務
釋放鎖,DEL 刪除key即可
3、異常情況,服務宕機。超時時間EX結束,會自動釋放鎖
2、大家思考一下,釋放鎖就是用DEL語句把鎖對應的key給刪除,有沒有這么一種可能性:
1. 3個進程:A和B和C,在執行任務,並爭搶鎖,此時A獲取了鎖,並設置自動過期時間為10s
2. A開始執行業務,因為某種原因,業務阻塞,耗時超過了10秒,此時鎖自動釋放了
3. B恰好此時開始嘗試獲取鎖,因為鎖已經自動釋放,成功獲取鎖
4. A此時業務執行完畢,執行釋放鎖邏輯(刪除key),於是B的鎖被釋放了,而B其實還在執行業務
5. 此時進程C嘗試獲取鎖,也成功了,因為A把B的鎖刪除了。
問題出現了:B和C同時獲取了鎖,違反了互斥性!
如何解決這個問題呢?我們應該在刪除鎖之前,判斷這個鎖是否是自己設置的鎖,如果不是(例如自己
的鎖已經超時釋放),那么就不要刪除了。
那么問題來了:如何得知當前獲取鎖的是不是自己呢?
我們可以在set 鎖時,存入當前線程的唯一標識!刪除鎖前,判斷下里面的值是不是與自己標識釋放一
致,如果不一致,說明不是自己的鎖,就不要刪除了。 這里通過線程id來實現

package com.itheima.security.distributed.uaa; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author Administrator * @version 1.0 **/ @RestController public class OrderController { @Autowired Environment environment; @Autowired StringRedisTemplate redisTemplate; @GetMapping(value = "/deduce_stock") public String deduce_stock(){ String thread_id = UUID.randomUUID().toString(); String product_id = "001"; try{ //商品名稱,對同一個商品進行減少庫存的操作 Boolean setIfAbsent = redisTemplate.opsForValue().setIfAbsent(product_id,thread_id, 10, TimeUnit.MINUTES); if(!setIfAbsent){ return "庫存正在被操作,請稍等"; } int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock")); if(stock > 0){ int real_stock = stock -1; redisTemplate.opsForValue().set("stock", real_stock+""); System.out.println(environment.getProperty("local.server.port")+"扣減庫存成功,剩余庫存為:"+real_stock); }else{ System.out.println("扣減庫存失敗,庫存不足:"); } }finally{ //釋放鎖,不是自己的鎖,不能刪除掉 if(thread_id.equalsIgnoreCase(redisTemplate.opsForValue().get(product_id))){ System.out.println(environment.getProperty("local.server.port")+"扣減庫存成功,釋放鎖:"); redisTemplate.delete(product_id); } } return "end"; } @RequestMapping(value = "/aa") public String deduce_stock1(){ return "end" +environment.getProperty("local.server.port"); } }
上面的問題還存在一個問題,上面的鎖不是可重入鎖,如果我們在獲取鎖以后,執行代碼的過程中,再次嘗試獲取鎖,執行setnx肯定會失敗,因為鎖已經存在
了。這樣有可能導致死鎖,這樣的鎖就是不可重入的。
如何解決呢?
當然是想辦法改造成可重入鎖。
3.4.1.重入鎖
什么叫做可重入鎖呢?
可重入鎖,也叫做遞歸鎖,指的是在同一線程內,外層函數獲得鎖之后,內層遞歸函數仍然可以獲
取到該鎖。換一種說法:同一個線程再次進入同步代碼時,可以使用自己已獲取到的鎖。
可重入鎖可以避免因同一線程中多次獲取鎖而導致死鎖發生。
那么,如何實現可重入鎖呢?
獲取鎖:首先嘗試獲取鎖,如果獲取失敗,判斷這個鎖是否是自己的,如果是則允許再次獲取,
而且必須記錄重復獲取鎖的次數。
釋放鎖:釋放鎖不能直接刪除了,因為鎖是可重入的,如果鎖進入了多次,在最內層直接刪除鎖,
導致外部的業務在沒有鎖的情況下執行,會有安全問題。因此必須獲取鎖時累計重入的次數,釋
放時則減去重入次數,如果減到0,則可以刪除鎖.
因此,存儲在鎖中的信息就必須包含:key、線程標識、重入次數。不能再使用簡單的key-value結構,
這里推薦使用hash結構:
key:lock
hashKey:線程信息
hashValue:重入次數,默認1
可以使用hset命令
需要用到的一些Redis命令包括:
EXISTS key:判斷一個Key是否存在
HEXISTS key field:判斷一個hash的field是否存在
HSET key field value :給一個hash的field設置一個值
HINCRBY key field increment:給一個hash的field值增加指定數值
EXPIRE key seconds:給一個key設置過期時間
DEL key:刪除指定key
下面我們假設鎖的key為“ lock ”,hashKey是當前線程的id:“ threadId ”,鎖自動釋放時間假設為20
獲取鎖的步驟:
1、判斷lock是否存在 EXISTS lock
存在,說明有人獲取鎖了,下面判斷是不是自己的鎖
判斷當前線程id作為hashKey是否存在: HEXISTS lock threadId
不存在,說明鎖已經有了,且不是自己獲取的,鎖獲取失敗,end
存在,說明是自己獲取的鎖,重入次數+1: HINCRBY lock threadId 1 ,去到步驟3
2、不存在,說明可以獲取鎖, HSET key threadId 1
3、設置鎖自動釋放時間, EXPIRE lock 20
釋放鎖的步驟:
1、判斷當前線程id作為hashKey是否存在: HEXISTS lock threadId
不存在,說明鎖已經失效,不用管了
存在,說明鎖還在,重入次數減1: HINCRBY lock threadId -1 ,獲取新的重入次數
2、判斷重入次數是否為0:
為0,說明鎖全部釋放,刪除key: DEL lock
大於0,說明鎖還在使用,重置有效時間: EXPIRE lock 20



面探討的Redis鎖實現方案都忽略了一個非常重要的問題:原子性問題。無論是獲取鎖,還是釋放鎖
的過程,都是有多行Redis指令來完成的,如果不能保證這些Redis命令執行的原子性,則整個過程都是
不安全的。
而Redis中支持以Lua腳本來運行多行命令,並且保證整個腳本運行的原子性。
接下來,我們分幾塊來學習Lua腳本的使用:
Redis中如何執行Lua腳本
Lua腳本的基本語法
編寫上述分布式鎖對應的Lua腳本
redis使用lua腳本


我們來看下整個工程的代碼

lock.lua
local key = KEYS[1] local threadId = ARGV[1] local releaseTime = ARGV[2] if(redis.call('exists', key) == 0) then redis.call('hset', key, threadId, '1') redis.call('expire', key, releaseTime) return 1 end if(redis.call('hexists', key, threadId) == 1) then redis.call('hincrby', key, threadId, '1') redis.call('expire', key, releaseTime) return 1 end return 0
unlock.lua
local key = KEYS[1] local threadId = ARGV[1] local releaseTime = ARGV[2] if (redis.call('HEXISTS', key, threadId) == 0) then return nil end local count = redis.call('HINCRBY', key, threadId, -1) if (count > 0) then redis.call('EXPIRE', key, releaseTime) return nil else redis.call('DEL', key) return nil end
注意lua腳本中不能存在中文
RedisLock.java
package cn.itcast.demo.lock; /** * @author 虎哥 */ public interface RedisLock { /** * 獲取鎖 * @param releaseTime 鎖的自動釋放時間 * @return 獲取鎖是否成功 */ boolean tryLock(long releaseTime); /** * 釋放鎖 */ void unlock(); }
ReentrantRedisLock.java
package cn.itcast.demo.lock; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.scripting.support.ResourceScriptSource; import java.util.Collections; import java.util.UUID; public class ReentrantRedisLock implements RedisLock { private StringRedisTemplate redisTemplate; /** * 設定好鎖對應的 key */ private String key; /** * 存入的線程信息的前綴,防止與其它JVM中線程信息沖突 */ private final String ID_PREFIX = UUID.randomUUID().toString(); public ReentrantRedisLock(StringRedisTemplate redisTemplate, String key) { this.redisTemplate = redisTemplate; this.key = key; } private static final DefaultRedisScript<Long> LOCK_SCRIPT; private static final DefaultRedisScript<Object> UNLOCK_SCRIPT; static { // 加載釋放鎖的腳本 LOCK_SCRIPT = new DefaultRedisScript<>(); LOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua"))); LOCK_SCRIPT.setResultType(Long.class); // 加載釋放鎖的腳本 UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua"))); } // 鎖釋放時間 private String releaseTime; public boolean tryLock(long releaseTime) { // 記錄釋放時間 this.releaseTime = String.valueOf(releaseTime); // 執行腳本 Long result = redisTemplate.execute( LOCK_SCRIPT, Collections.singletonList(key), ID_PREFIX + Thread.currentThread().getId(), this.releaseTime); // 判斷結果 return result != null && result.intValue() == 1; } @Override public void unlock() { // 執行腳本 redisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(key), ID_PREFIX + Thread.currentThread().getId(), this.releaseTime); } }
RedisLockFactory
package cn.itcast.demo.lock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; /** * @author 虎哥 */ @Component public class RedisLockFactory { @Autowired private StringRedisTemplate redisTemplate; public RedisLock getReentrantLock(String key){ return new ReentrantRedisLock(redisTemplate, key); } }
我們通過Spring提供的RedisTemplate來操作lua腳本, RedisTemplate 中提供了一個方法,用來執行Lua腳本:
我們定義一個定時任務,模擬清理訂單的任務:
OrderController
package cn.itcast.demo.lock; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author Administrator * @version 1.0 **/ @RestController public class OrderController { @Autowired Environment environment; @Autowired private RedisLockFactory factory; @GetMapping(value = "/deduce_stock") public String deduce_stock() throws InterruptedException{ // 獲取鎖對象 RedisLock lock = factory.getReentrantLock("lock1"); // 嘗試獲取鎖 boolean isLock = lock.tryLock(50); if(!isLock){ // 獲取鎖失敗 return "error"; } try{ clearOrder(); }finally{ lock.unlock(); } return "end"; } @RequestMapping(value = "/aa") public String deduce_stock1(){ return "end" +environment.getProperty("local.server.port"); } public void clearOrder() throws InterruptedException { System.out.println("開始清理訂單"); Thread.sleep(500); System.out.println("開始恢復庫存"); } }
上面的代碼已經解決了分布式鎖的問題,但是在集群的環境下還存在問題
單點的redis無法保證高可用,因此一般我們都會給redis搭建主從集群。但是,主從集群無法保證分布式
鎖的高可用特性。
在Redis官網上,也對這種單點故障做了說明:
在這種場景(主從結構)中存在明顯的競態:
1. 客戶端A從master獲取到鎖
2. 在master將鎖同步到slave之前,master宕掉了。
3. slave節點被晉級為master節點
4. 客戶端B取得了同一個資源被客戶端A已經獲取到的另外一個鎖。安全失效!
有時候程序就是這么巧,比如說正好一個節點掛掉的時候,多個客戶端同時取到了鎖。如果你可以
接受這種小概率錯誤,那用這個基於復制的方案就完全沒有問題。
因此,Redis的作者又給出了一種新的算法來解決整個高可用問題,即Redlock算法,摘抄了算法的介紹
如下: 我們可以采用看門狗(watch dog)解決鎖超時問題,/開啟一個任務,這個任務在 獲取鎖之后10秒后,重
新向redis發起請求,重置有效期,重新執行expire
3.7.Redission
如果按照Redlock算法來實現分布式鎖,加上各種安全控制,代碼會比較復雜。而開源的Redission框架
就幫我們實現了各種基於Redis的分布式鎖,包括Redlock鎖。
1)依賴
使用起來非常方便,首先引入依賴:
2)配置
然后通過Java配置的方式,設置Redis的地址,構建RedissionClient客戶端:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.10.6</version> </dependency>
2)配置
然后通過Java配置的方式,設置Redis的地址,構建RedissionClient客戶端:
/** * @author 虎哥 */ @Configuration3)常用API介紹: RedissClient中定義了常見的鎖: 獲取鎖對象后,可以通過 tryLock() 方法獲取鎖: 有3個重載的方法,可以控制鎖是否需要重試來獲取: 三個參數:獲取鎖,設置鎖等待時間 waitTime 、釋放時間 leaseTime ,時間單位 unit 。 如果獲取鎖失敗后,會在 waitTime 減去獲取鎖用時的剩余時間段內繼續嘗試獲取鎖,如果依 然獲取失敗,則認為獲取鎖失敗; 獲取鎖后,如果超過 leaseTime 未釋放,為避免死鎖會自動釋放。 兩個參數:獲取鎖,設置鎖等待時間 time 、時間單位 unit 。釋放時間 leaseTime 按照默認的30s 空參:獲取鎖, waitTime 默認0s,即獲取鎖失敗不重試, leaseTime 默認30s 任務執行完畢,使用 unlock() 方法釋放鎖: public class RedisConfig { @Bean public RedissonClient redissonClient() { // 配置類 Config config = new Config(); // 添加redis地址,這里添加了單點的地址,也可以使用config.useClusterServers()添加集群地 址 config.useSingleServer() .setAddress("redis://192.168.150.101:6379"); // 創建客戶端 return Redisson.create(config); } }
4)完整案例
使用Redission來代替我們之前自定義鎖的測試案例:
LockDemoApplication
package cn.itcast.demo; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling @SpringBootApplication public class LockDemoApplication { public static void main(String[] args) { SpringApplication.run(LockDemoApplication.class, args); } public class RedisConfig { @Bean public RedissonClient redissonClient() { // 配置類 Config config = new Config(); // 添加redis地址,這里添加了單點的地址,也可以使用config.useClusterServers()添加集群地 config.useSingleServer() .setAddress("redis://127.0.0.1:6379"); // 創建客戶端 return Redisson.create(config); } } }
OrderController
package cn.itcast.demo.lock; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author Administrator * @version 1.0 **/ @RestController public class OrderController { @Autowired Environment environment; @Autowired RedissonClient redissonClient; @Autowired private RedisLockFactory factory; @GetMapping(value = "/deduce_stock") public String deduce_stock() throws InterruptedException{ // 獲取鎖對象 RLock lock = redissonClient.getLock("lock"); // 嘗試獲取鎖 boolean isLock = lock.tryLock(); if(!isLock){ // 獲取鎖失敗 return "error"; } try{ clearOrder(); }finally{ lock.unlock(); } return "end"; } @RequestMapping(value = "/aa") public String deduce_stock1(){ return "end" +environment.getProperty("local.server.port"); } public void clearOrder() throws InterruptedException { System.out.println("開始清理訂單"); Thread.sleep(500); System.out.println("開始恢復庫存"); } }
上面的代碼就能夠解決在集群環境下分布式鎖失效的問題
pom.xml文件如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.3.RELEASE</version> </parent> <groupId>cn.itcast.demo</groupId> <artifactId>lock-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>lock-demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.10.6</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
第二種解決方案,使用數據庫的行級別所來解決

下面我們假設鎖的key為“ lock ”,hashKey是當前線程的id:“ threadId ”,鎖自動釋放時間假設為20
獲取鎖的步驟:
1、判斷lock是否存在 EXISTS lock
存在,說明有人獲取鎖了,下面判斷是不是自己的鎖
判斷當前線程id作為hashKey是否存在: HEXISTS lock threadId
不存在,說明鎖已經有了,且不是自己獲取的,鎖獲取失敗,end
存在,說明是自己獲取的鎖,重入次數+1: HINCRBY lock threadId 1 ,去到步驟3
2、不存在,說明可以獲取鎖, HSET key threadId 1
3、設置鎖自動釋放時間, EXPIRE lock 20
釋放鎖的步驟:
1、判斷當前線程id作為hashKey是否存在: HEXISTS lock threadId
不存在,說明鎖已經失效,不用管了
存在,說明鎖還在,重入次數減1: HINCRBY lock threadId -1 ,獲取新的重入次數
2、判斷重入次數是否為0:
為0,說明鎖全部釋放,刪除key: DEL lock
| 序號 | 命令及描述 |
| 1 | EVAL script numkeys key [key ...] arg [arg ...] 執行 Lua 腳本。 |
| 2 | EVALSHA sha1 numkeys key [key ...] arg [arg ...] 執行 Lua 腳本。 |
| 3 | SCRIPT EXISTS script [script ...] 查看指定的腳本是否已經被保存在緩存當中。 |
| 4 | SCRIPT FLUSH 從腳本緩存中移除所有腳本。 |
| 5 | SCRIPT KILL 殺死當前正在運行的 Lua 腳本。 |
| 6 | SCRIPT LOAD script 將腳本 script 添加到腳本緩存中,但並不立即執行這個腳本。 |
大於0,說明鎖還在使用,重置有效時間: EXPIRE lock 20
