redission分布式鎖實戰


為什么要引入分布式鎖?

分布式鎖是用來解決分布式或集群場景中的並發沖突的一種常用手段。

分布式鎖和傳統jvm中的synchronized、ReentrantLock有什么區別?

分布式鎖:解決分布式或集群場景下多個線程的並發競爭。(多進程多線程)

synchronized、ReentrantLock:只能解決單體應用中多個線程的並發競爭。(單進程多線程)

案例及問題分析

  • 創建springboot應用

1

點擊next,進入下一步

2

繼續點擊next,選擇springboot版本,然后finish創建完成。

  • pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.good.study</groupId>
    <artifactId>springboot-redis-lock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-redis-lock</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

目錄結構如下:

3

resources目錄下,創建application.yml文件

server:
  port: 8080
spring:
  redis:
    database: 0
    timeout: 6000
    # redis密碼
    password: qwert321123
    lettuce:
      pool:
        max-active: 20
        max-idle: 10
        min-idle: 5
        max-wait: 1000
    # 集群環境配置    
    cluster:
      nodes:
        - 192.168.124.136:7001
        - 192.168.124.136:7002
        - 192.168.124.136:7003
        - 192.168.124.137:7004
        - 192.168.124.137:7005
        - 192.168.124.137:7006

創建一個IndexController,內容如下:

@RestController
public class IndexController {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @GetMapping("/test1")
    public void test1() {
        synchronized (this) {
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock -1;
     			// 提前將stock的值設置到redis,redis客戶端執行 set stock 50
                redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                System.out.println("扣減成功,剩余庫存:" + realStock);
            } else {
                System.out.println("扣減失敗,庫存不足");
            }
        }
    }
}

思考:上面代碼是否會有問題?

如果應用只是部署在單機節點下,那么上面代碼確實是沒有什么問題的,但是如果是部署在多個機器節點下,那么問題就嚴重了。假設上面代碼部署在多個機器節點下,會出現什么問題呢?我們先來看一張圖:

5

從上圖可以看到,應用部署在web1和web2,加入了nginx來做負載均衡,提供給前端、移動端的地址也將會是nginx的地址,假如有兩個請求同時訪問nignx,第一個請求,到達了web1,第二個次請求到達了web2,兩次請求都是同一時刻到達的,通過上面代碼,同時獲取到的值都是50,那么就會出現庫存超扣的情況。

廢話不多說,來演示一下你就明白了。

首先配置nginx,配置如下:

#user  nobody;

worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;
	client_max_body_size 1024M;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;

	upstream stocklock {
		server 127.0.0.1:8080 weight=1;
		server 127.0.0.1:9090 weight=1;
	}
	
    server {
        listen       80;
        server_name  127.0.0.1;
		
		proxy_set_header X-Forwarded-Host $host;
        proxy_set_header X-Forwarded-Server $host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;
		location / {
			proxy_pass http://stocklock;
			proxy_connect_timeout 600;
			proxy_read_timeout 600;
		}
    }

}

啟動應用

6

修改端口號為9090,再啟動第二個應用,但是要先做一下配置,如下圖所示:

7

點擊Edit Configurations進入設置

8

勾選Allow parallel run,然后點擊Apply,再次點擊ok。

9

兩個應用都啟動完成,然后啟動nginx,接下來,我們還需要借助一個工具:jmeter

10

點擊jmeter.bat啟動,關於jmeter使用,不做過多贅述。

創建一個線程組

11

這里設置的200,是指開啟200個線程進行壓測,設置的0,是指同一時刻進行訪問。

添加一個HTTP request ,壓測nginx的地址。

12

13

點擊運行,觀察java控制台。

14

再觀察第二個應用的日志

15

兩邊日志對比,同時出現了49,47,46,44,43,這就是上面所說的出現庫存超扣的問題。

代碼如何改進?

代碼又做了一下改進,具體代碼如下:

@GetMapping("/test2")
public void test2() {
	synchronized (this) {
		String stockKey = "product_1001";
		try {
            // ①.不存在key則設置,存在直接返回
			Boolean result = redisTemplate.opsForValue().setIfAbsent(stockKey, "1");
			if (!result) {
				return;
			}
            // ②.設置過期時間
			redisTemplate.expire(stockKey, 10, TimeUnit.SECONDS);
			int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
			if (stock > 0) {
				int realStock = stock -1;
				redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
				System.out.println("扣減成功,剩余庫存:" + realStock);
			} else {
				System.out.println("扣減失敗,庫存不足");
			}
		} finally {
            // ③.刪除key(釋放鎖)
			redisTemplate.delete(stockKey);
		}
	}
}

代碼看上去是沒什么問題,但是實際坑還是有的,①和②的過程中,如果出現網絡故障或者redis宕機了,那么第一個請求進來的獲取到鎖了,第二個請求進來時,發現這個key存在了,直接return了。③也是有問題的,如果第一個請求進來,代碼還沒執行完,第二個請求進來了,假如第二個請求的執行時間比第一個請求的執行時間快,第二個請求執行到了③,把第一個請求的key刪掉了,就會導致鎖失效。

又進一步做了改進,代碼如下:

@GetMapping("/test3")
public void test3() {
	synchronized (this) {
		String stockKey = "product_1001";
		String requestId = UUID.randomUUID().toString();
		try {
			Boolean result = redisTemplate.opsForValue().setIfAbsent(stockKey, requestId, 10, TimeUnit.SECONDS);
			if (!result) {
				return;
			}
			
			int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
			if (stock > 0) {
				int realStock = stock -1;
				redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
				System.out.println("扣減成功,剩余庫存:" + realStock);
			} else {
				System.out.println("扣減失敗,庫存不足");
			}
		} finally {
			if (requestId.equals(redisTemplate.opsForValue().get(stockKey))) {
				redisTemplate.delete(stockKey);
			}
		}
	}
}

其實做到這一步,已經算沒什么問題了,但是對於大型互聯網來說,還是會出現一些問題,具體是什么問題,大家可以自己思考一下,或者自己動手實踐一下。

更進一步的解決方案

其實現在很多開源框架已經解決了上面的問題,這個開源框架就是redission,沒聽過redission的小伙伴可以去官網看官方文檔:https://redisson.org/

  • pom文件添加redission依賴包
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.11.2</version>
        </dependency>
  • 配置RedissionProperties
@Component
@ConfigurationProperties(prefix = "spring.redis")
public class RedisConfigProperties {
    private String password;
    private cluster cluster;

    public static class cluster {
        private List<String> nodes;

        public List<String> getNodes() {
            return nodes;
        }

        public void setNodes(List<String> nodes) {
            this.nodes = nodes;
        }
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public RedisConfigProperties.cluster getCluster() {
        return cluster;
    }

    public void setCluster(RedisConfigProperties.cluster cluster) {
        this.cluster = cluster;
    }
}
  • 配置RedissionConfig
@Configuration
public class RedissonConfig {
    @Autowired
    private RedisConfigProperties redisConfigProperties;

    //添加redisson的bean
    @Bean
    public Redisson redisson() {
        //redisson版本是3.5,集群的ip前面要加上“redis://”,不然會報錯,3.2版本可不加
        List<String> clusterNodes = new ArrayList<>();
        for (int i = 0; i < redisConfigProperties.getCluster().getNodes().size(); i++) {
            clusterNodes.add("redis://" + redisConfigProperties.getCluster().getNodes().get(i));
        }
        Config config = new Config();
        ClusterServersConfig clusterServersConfig = config.useClusterServers()
                .addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
        clusterServersConfig.setPassword(redisConfigProperties.getPassword());//設置密碼
        return (Redisson) Redisson.create(config);
    }
}
redission實現分布式鎖原理

19

加鎖機制

如果該客戶端面對的是一個redis cluster集群,他首先會根據hash節點選擇一台機器。

發送lua腳本到redis服務器上,腳本如下:

"if (redis.call('exists',KEYS[1])==0) then "+       --看有沒有鎖
	"redis.call('hset',KEYS[1],ARGV[2],1) ; "+       --無鎖 加鎖  
	"redis.call('pexpire',KEYS[1],ARGV[1]) ; "+      
	"return nil; end ;" +
"if (redis.call('hexists',KEYS[1],ARGV[2]) ==1 ) then "+  --我加的鎖
		"redis.call('hincrby',KEYS[1],ARGV[2],1) ; "+  --重入鎖
		"redis.call('pexpire',KEYS[1],ARGV[1]) ; "+  
	"return nil; end ;" +
"return redis.call('pttl',KEYS[1]) ;"  --不能加鎖,返回鎖的時間

lua的作用:保證這段復雜業務邏輯執行的原子性。
lua的解釋:
KEYS[1]) : 加鎖的key
ARGV[1] : key的生存時間,默認為30秒
ARGV[2] : 加鎖的客戶端ID (UUID.randomUUID()) + “:” + threadId)
第一段if判斷語句,就是用“exists myLock”命令判斷一下,如果你要加鎖的那個鎖key不存在的話,你就
進行加鎖。如何加鎖呢?很簡單,用下面的命令:

hset myLock 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”這個客戶端對“myLock”這個鎖key完成了加
鎖。

接着會執行“pexpire myLock 30000”命令,設置myLock這個鎖key的生存時間是30秒。

鎖互斥機制

如果客戶端2來嘗試加鎖,執行了同樣的一段lua腳本,第一個if判斷會執行“exists myLock”,發現myLock這個鎖key已經存在了。

接着第二個if判斷,判斷一下,myLock鎖key的hash數據結構中,是否包含客戶端2的ID,但是明顯不
是的,因為那里包含的是客戶端1的ID。

所以,客戶端2會獲取到pttl myLock返回的一個數字,這個數字代表了myLock這個鎖key的剩余生存時
間。比如還剩15000毫秒的生存時間。
此時客戶端2會進入一個while循環,不停的嘗試加鎖。

自動延時機制

只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個后台線程,會每隔10秒檢查一
下,如果客戶端1還持有鎖key,那么就會不斷的延長鎖key的生存時間。

可重入鎖機制

第一個if判斷肯定不成立,“exists myLock”會顯示鎖key已經存在了。
第二個if判斷會成立,因為myLock的hash數據結構中包含的那個ID,就是客戶端1的那個ID,也就是
“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

此時就會執行可重入加鎖的邏輯,他會用:
incrby myLock
8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
通過這個命令,對客戶端1的加鎖次數,累加1。

釋放鎖機制

執行lua腳本如下:

#如果key已經不存在,說明已經被解鎖,直接發布(publish)redis消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
          "end;" +
# key和field不匹配,說明當前客戶端線程沒有持有鎖,不能主動解鎖。 不是我加的鎖 不能解鎖
          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
          "end; " +
# 將value減1
          "local counter = redis.call('hincrby', KEYS[1], ARGV[3],
-1); " +
# 如果counter>0說明鎖在重入,不能刪除key
          "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
# 刪除key並且publish 解鎖消息
          "else " +
            "redis.call('del', KEYS[1]); " + #刪除鎖
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; "+
          "end; " +
          "return nil;",

– KEYS[1] :需要加鎖的key,這里需要是字符串類型。
– KEYS[2] :redis消息的ChannelName,一個分布式鎖對應唯一的一個channelName:
“redisson_lockchannel{” + getName() + “}”
– ARGV[1] :reids消息體,這里只需要一個字節的標記就可以,主要標記redis的key已經解鎖,再結合
redis的Subscribe,能喚醒其他訂閱解鎖消息的客戶端線程申請鎖。
– ARGV[2] :鎖的超時時間,防止死鎖
– ARGV[3] :鎖的唯一標識,也就是剛才介紹的 id(UUID.randomUUID()) + “:” + threadId
如果執行lock.unlock(),就可以釋放分布式鎖,此時的業務邏輯也是非常簡單的。
其實說白了,就是每次都對myLock數據結構中的那個加鎖次數減1。
如果發現加鎖次數是0了,說明這個客戶端已經不再持有鎖了,此時就會用:
“del myLock”命令,從redis里刪除這個key。
然后呢,另外的客戶端2就可以嘗試完成加鎖了。

redission使用

具體代碼如下:

@RestController
public class IndexController {

    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private Redisson redisson;

    @GetMapping("/test1")
    public void test1() {
        synchronized (this) {
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock -1;
                redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                System.out.println("扣減成功,剩余庫存:" + realStock);
            } else {
                System.out.println("扣減失敗,庫存不足");
            }
        }
    }

    @GetMapping("/test2")
    public void test2() {
        synchronized (this) {
            String stockKey = "product_1001";
            String requestId = UUID.randomUUID().toString();
            try {
                Boolean result = redisTemplate.opsForValue().setIfAbsent(stockKey, requestId);
                if (!result) {
                    return;
                }
                redisTemplate.expire(stockKey, 10, TimeUnit.SECONDS);
                int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int realStock = stock -1;
                    redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                    System.out.println("扣減成功,剩余庫存:" + realStock);
                } else {
                    System.out.println("扣減失敗,庫存不足");
                }
            } finally {
                redisTemplate.delete(stockKey);
            }
        }
    }

    @GetMapping("/test3")
    public void test3() {
        synchronized (this) {
            String stockKey = "product_1001";
            String requestId = UUID.randomUUID().toString();
            try {
                Boolean result = redisTemplate.opsForValue().setIfAbsent(stockKey, requestId, 10, TimeUnit.SECONDS);
                if (!result) {
                    return;
                }

                int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int realStock = stock -1;
                    redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                    System.out.println("扣減成功,剩余庫存:" + realStock);
                } else {
                    System.out.println("扣減失敗,庫存不足");
                }
            } finally {
                if (requestId.equals(redisTemplate.opsForValue().get(stockKey))) {
                    redisTemplate.delete(stockKey);
                }
            }
        }
    }

    @GetMapping("/test4")
    public void test4() {
        String stockKey = "product_1001";
        RLock lock = redisson.getLock(stockKey);
        try {
            // 加鎖
            lock.lock();
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock -1;
                redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                System.out.println("扣減成功,剩余庫存:" + realStock);
            } else {
                System.out.println("扣減失敗,庫存不足");
            }
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }
}

重新啟動兩個應用,再把redis里的stock恢復到50,進行測試。

注意:測試的時候把壓測工具里的接口路徑由test1改為test4

16

17

18

對比兩邊的日志,沒有再出現上面的庫存超扣問題了,好了,這就是redission實現的分布式鎖,主要用來解決

分布式場景下數據並發競爭和庫存超賣等應用場景。對redission分布式鎖實現感興趣的小伙伴,也可以自己看看底層加鎖和釋放鎖的邏輯,以及鎖續命的具體實現。

相關技術:https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers/#81-lock


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM