前兩個版本的代碼 都或多或少存在一定的問題,雖然可能微乎其微,但是程序需要嚴謹再嚴謹,
第一個版本問題: 局限於單機版,依賴於 Jvm的鎖
第二個版本問題: 極端情況下,解鎖邏輯的問題,線程B的鎖,可能會被線程A解掉,這種情況實際上是不合理的。
1. 由於是客戶端自己生成過期時間,所以需要強制要求分布式下每個客戶端的時間必須同步。
2. 當鎖過期的時候,如果多個客戶端同時執行jedis.getSet()方法,那么雖然最終只有一個客戶端可以加鎖,
但是這個客戶端的鎖的過期時間可能被其他客戶端覆蓋。
3. 鎖不具備擁有者標識,即任何客戶端都可以解鎖。
版本一: http://www.cnblogs.com/xifenglou/p/8807323.html
版本二: http://www.cnblogs.com/xifenglou/p/8883717.html
所以基於以上問題,第三個版本出來了,
Talk is cheap, show me the code!
import org.springframework.util.StopWatch;
import redis.clients.jedis.Jedis;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 使用RedisTool.tryGetDistributedLock
* 實現 分布式鎖
* 終極版本
*/
public class TicketRunnable3 implements Runnable {
private CountDownLatch count;
private CyclicBarrier barrier;
private static final Integer Lock_Timeout = 10000;
private static final String lockKey = "LockKey";
private volatile static boolean working = true;
public TicketRunnable3(CountDownLatch count, CyclicBarrier barrier) {
this.count = count;
this.barrier = barrier;
}
private int num = 20; // 總票數 此處可隨意 寫一個數,保證線程能運行起來,真正的共享變量不應該寫死在程序中, 應該從redis中獲取,這樣模擬多進程多線程的並發訪問
public void sellTicket(Jedis jedis) {
String name = Thread.currentThread().getName();
try{
boolean getLock = RedisTool.tryGetDistributedLock(jedis,lockKey, name,Lock_Timeout);
if( getLock){
if(!working)
return;
// Do your job
num = Integer.parseInt(jedis.get("ticket"));
if (num > 0) {
num--;
jedis.set("ticket",num+"");
if(num!=0)
System.out.println("================"+Thread.currentThread().getName()+"================= 售出票號" + (num+1)+",還剩" + num + "張票--" );
else {
System.out.println("================"+Thread.currentThread().getName()+"================= 售出票號" + (num+1)+",票已經票完!--");
working = false;
}
}
}else{
//System.out.println();
if(!working)
return;
System.out.println(Thread.currentThread().getName()+" Try to get the Lock,and wait 20 millisecond....");
Thread.sleep(10);
}
}catch(Exception e){
System.out.println(e);
}finally {
try {
if(RedisTool.releaseDistributedLock(jedis,lockKey,name)){
Thread.sleep(30);
}
}catch (Exception e ) {
e.printStackTrace();
}
}
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"到達,等待中...");
Jedis jedis = new Jedis("localhost", 6379);
try{
barrier.await(); // 此處阻塞 等所有線程都到位后 一起進行搶票
if(Thread.currentThread().getName().equals("pool-1-thread-1")){
System.out.println("-----------------全部線程准備就緒,開始搶票------------------");
}else {
Thread.sleep(5);
}
while (working) {
sellTicket(jedis);
}
count.countDown(); //當前線程結束后,計數器-1
}catch (Exception e){e.printStackTrace();}
}
/**
*
* @param args
*/
public static void main(String[] args) {
int threadNum = 5; //模擬多個窗口 進行售票
final CyclicBarrier barrier = new CyclicBarrier(threadNum);
final CountDownLatch count = new CountDownLatch(threadNum); // 用於統計 執行時長
StopWatch watch = new StopWatch();
watch.start();
TicketRunnable3 tickets = new TicketRunnable3(count,barrier);
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
//ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) { //此處 設置數值 受限於 線程池中的數量
executorService.submit(tickets);
}
try {
count.await();
executorService.shutdown();
watch.stop();
System.out.println("耗 時:" + watch.getTotalTimeSeconds() + "秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import redis.clients.jedis.Jedis;
import java.util.Collections;
public class RedisTool {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
/**
* 嘗試獲取分布式鎖
* @param jedis Redis客戶端
* @param lockKey 鎖
* @param requestId 請求標識
* @param expireTime 超期時間
* @return 是否獲取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
System.out.println("=============="+Thread.currentThread().getName()+"=============== 獲取到鎖,開始工作!");
return true;
}
return false;
}
/**
* 釋放分布式鎖
* @param jedis Redis客戶端
* @param lockKey 鎖
* @param requestId 請求標識
* @return 是否釋放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
System.out.println("=============="+Thread.currentThread().getName()+"=============== 解鎖成功!");
return true;
}
return false;
}
}
解鎖部分,我們將Lua代碼傳到jedis.eval()方法里,並使參數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。eval()方法是將Lua代碼交給Redis服務端執行。
那么這段Lua代碼的功能是什么呢?其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,如果相等則刪除鎖(解鎖)。那么為什么要使用Lua語言來實現呢?因為要確保上述操作是原子性的。源於Redis的特性,下面是官網對eval命令的部分解釋:
簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,並且直到eval命令執行完成,Redis才會執行其他命令。
運行結果如下:
歡迎留言,期待更深層次的探討!
針對 上述代碼,使用兩個類 運行,
TicketRunnable3 TicketRunnable4 模擬多進程 多線程場景 ,
場景1: 運行時長 > 過期時長
此時: 鎖自動失效, 線程均不用解鎖,即使解鎖也是失敗!
代碼及運行結果如下:
import org.springframework.util.StopWatch;
import redis.clients.jedis.Jedis;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 使用RedisTool.tryGetDistributedLock
* 實現 分布式鎖
* 終極版本
*/
public class TicketRunnable4 implements Runnable {
private CountDownLatch count;
private CyclicBarrier barrier;
private static final Integer Lock_Timeout = 3000; // 過期時間 代表 3秒后過期
private static final Integer ExecuteTime = 5000;
private static final Integer RetryInterval = 20;
private static final String lockKey = "LockKey";
private volatile static boolean working = true;
public TicketRunnable4(CountDownLatch count, CyclicBarrier barrier) {
this.count = count;
this.barrier = barrier;
}
private int num = 20; // 總票數
public void sellTicket(Jedis jedis) {
String name = Thread.currentThread().getName();
boolean gotLock = false;
try{
gotLock = RedisTool.tryGetDistributedLock(jedis,lockKey, name,Lock_Timeout);
if( gotLock && working){
// Do your job
num = Integer.parseInt(jedis.get("ticket"));
if (num > 0) {
num--;
jedis.set("ticket",num+"");
if(num!=0)
System.out.println("=============="+name+"=============== 售出票號" + (num+1)+",還剩" + num + "張票--" );
else {
System.out.println("=============="+name+"=============== 售出票號" + (num+1)+",票已經票完!--");
return;
}
}
if(num == 0){
System.out.println("=============="+name+"============票已經被搶空啦");
working = false;
}
Thread.sleep(ExecuteTime);
}else{
//System.out.println();
//System.out.println(name+" Try to get the Lock,and wait "+RetryInterval+" millisecond....");
Thread.sleep(RetryInterval);
}
}catch(Exception e){
System.out.println(e);
}finally {
try {
if(!gotLock||!working) //未獲取到鎖的線程不用解鎖
return;
/**
* 解鎖成功后 sleep, 嘗試讓出cpu給其他線程機會
* 解鎖失敗 說明鎖已經失效 被其他線程獲取到
*/
if(RedisTool.releaseDistributedLock(jedis,lockKey,name)){
Thread.sleep(100);
}
}catch (Exception e ) {
e.printStackTrace();
}
}
}
@Override
public void run() {
String prefix = "#";
String threadName = Thread.currentThread().getName();
Thread.currentThread().setName(prefix+threadName);
System.out.println(Thread.currentThread().getName()+"到達,等待中...");
Jedis jedis = new Jedis("localhost", 6379);
try{
barrier.await(); // 此處阻塞 等所有線程都到位后 一起進行搶票
if(Thread.currentThread().getName().equals(prefix+"pool-1-thread-2")){
System.out.println("-----------------全部線程准備就緒,開始搶票------------------");
}else {
Thread.sleep(5);
}
while (working) {
sellTicket(jedis);
}
count.countDown(); //當前線程結束后,計數器-1
}catch (Exception e){e.printStackTrace();}
}
/**
*
* @param args
*/
public static void main(String[] args) {
int threadNum = 3; //模擬多個窗口 進行售票
final CyclicBarrier barrier = new CyclicBarrier(threadNum);
final CountDownLatch count = new CountDownLatch(threadNum); // 用於統計 執行時長
StopWatch watch = new StopWatch();
watch.start();
TicketRunnable4 tickets = new TicketRunnable4(count,barrier);
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
//ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) { //此處 設置數值 受限於 線程池中的數量
executorService.submit(tickets);
}
try {
count.await();
executorService.shutdown();
watch.stop();
System.out.println("耗 時:" + watch.getTotalTimeSeconds() + "秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
TicketRunnable3 售出 10 9 8 6 5 3 2 1 票號。
TicketRunnable4售出 7 4 兩個票號
合計10張票,模擬結束!
場景2: 運行時長 < 過期時長
此時: 此時需要有鎖線程去釋放鎖,這樣多線程再去競爭獲取鎖。
修改代碼:
private static final Integer Lock_Timeout = 5000; // 將時間從3秒改為5秒
private static final Integer ExecuteTime = 3000; // 將執行時間5秒改為3秒
運行結果如下:
一個進程售出 10 9 8 6 4 3 1 票號
另一進程售出 7 5 2 票號
此時 每個線程完成任務后,均需要釋放鎖,這樣本地線程或是異地線程 才能獲取到鎖,這樣才能有機會進行任務的執行!