package tech.codestory.zookeeper.aalvcai.ConcurrentHashMapLock;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.io.*;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
/**
* @version 1.0.0
* @@menu <p>
* @date 2021/6/10 14:33
*/
public class SegmentDistributeLock {
/**
* 使用redis分布式鎖扣減庫存,弊端: 請求量大的話,會導致吞吐量降低
* 優化: 分段鎖並發扣減庫存
* 將表中的庫存字段 分為 5個庫存字段, 然后導入redis,庫存預熱, 然后參考ConcurrentHashMap的分段鎖思想
* 來一個請求后,對庫存字段 加 分段鎖, 分段鎖扣減庫存
* 如果當前分段鎖庫存不夠,就扣減掉當前的庫存,然后去鎖下一個分段鎖,扣減庫存
*
* git: https://gitee.com/easybao/segmentDistributeLock.git
* 依賴jar包:
* <dependency>
* <groupId>org.redisson</groupId>
* <artifactId>redisson</artifactId>
* <version>3.13.5</version>
* </dependency>
*/
RedissonClient redissonClient;
RBucket<RedisStock[]> bucket;
private ThreadLocal<StockRequest> threadLocal = new ThreadLocal<>();
static volatile RedisStock[] redisStocks;
private final int beginTotalNum; //初始總庫存,避免並發過程中 調用getCurrentTotalNum()獲取到的總庫存發生變化
public SegmentDistributeLock() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
this.redissonClient = Redisson.create(config);
redisStocks = new RedisStock[5];
redisStocks[0] = new RedisStock("pId_stock_00",20);
redisStocks[1] = new RedisStock("pId_stock_01",20);
redisStocks[2] = new RedisStock("pId_stock_02",20);
redisStocks[3] = new RedisStock("pId_stock_03",20);
redisStocks[4] = new RedisStock("pId_stock_04",20);
// 初始總庫存
this.beginTotalNum = getCurrentTotalNum();
// 庫存預熱,存到redis中 , 這里沒有采用因為將庫存預熱存到redis中,取出來的時候,解析異常, 不想花時間解決,所以將庫存預熱 變成一個類變量
// bucket = redissonClient.getBucket("pId_stock");
// bucket.set(redisStocks);
}
public RedissonClient getRedissonClient(){
return this.redissonClient;
}
public int getCurrentTotalNum(){
// 獲取實時總庫存
return Stream.of(redisStocks).mapToInt(RedisStock::getNum).sum();
}
/**
* 使用redis分布式鎖扣減庫存,弊端: 請求量大的話,會導致吞吐量降低
* 優化: 分段鎖並發扣減庫存
* 將表中的庫存字段 分為 5個庫存字段, 然后導入redis,庫存預熱, 然后參考ConcurrentHashMap的分段鎖思想
* 來一個請求后,對庫存字段 加 分段鎖, 分段鎖扣減庫存
* 如果當前分段鎖庫存不夠,就扣減掉當前的庫存,然后去鎖下一個分段鎖,扣減庫存
* @param request
* @return
*/
public boolean handlerStock_02(StockRequest request) {
// 先做校驗: 判斷扣減庫存 是否比 初始總庫存還大,是的話就直接false, 避免無限循環扣減不了
if(request.getBuyNum() > this.beginTotalNum){
return false;
}
// 使用本地線程變量保存請求,確保參數只在本線程使用
threadLocal.set(request);
// 這里使用 ThreadLocal代碼邏輯和ConcurrentHashMap的分段鎖
RedissonClient redissonClient = getRedissonClient();
RedisStock[] tab = redisStocks;
int len = tab.length;
int i = (request.getMemberId().hashCode() < 0 ? 0 : request.getMemberId().hashCode() ) % len ;
for(RedisStock e = tab[i]; e != null; e = tab[i = nextIndex(i,len)]){
RLock segmentLock = null;
try {
// 2: 對該元素加分布式分段鎖
segmentLock = redissonClient.getLock(e.getStockName());
segmentLock.lock();
int buyNum = threadLocal.get().getBuyNum();
if (buyNum <= e.getNum()) {
//扣減庫存
e.setNum(e.getNum() - buyNum);
// 扣減成功后,跳出循環,返回結果
return true;
}else{
// 如果並發過程中獲取到總庫存<= 0 說明已經沒有庫存了, 如果當前需要扣減的庫存 > 此時總庫存就返回false,扣件失敗
if (getCurrentTotalNum() <= 0 || threadLocal.get().getBuyNum() > getCurrentTotalNum()) {
// 沒有庫存就false
System.out.println(Thread.currentThread().getName() + " 扣減庫存數: " + threadLocal.get().getBuyNum() + "失敗" + " 此時總庫存為: " + getCurrentTotalNum());
return false;
}
// 扣減掉當前的 分段鎖對應的庫存,然后對下一個元素加鎖
threadLocal.get().setBuyNum( buyNum - e.getNum());
e.setNum(0);
}
} finally {
// 3: 解鎖
segmentLock.unlock();
}
}
threadLocal.remove();
return false;
}
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
public static int FNVHash(String key) {
final int p = 16777619;
Long hash = 2166136261L;
for (int idx = 0, num = key.length(); idx < num; ++idx) {
hash = (hash ^ key.charAt(idx)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
if (hash < 0) {
hash = Math.abs(hash);
}
return hash.intValue();
}
// 顯示redis中的庫存
public void showStocks(){
for (RedisStock redisStock : redisStocks) {
System.out.println(redisStock);
}
}
@AllArgsConstructor
class RedisStock implements Serializable {
// 庫存字段
String stockName;
// 庫存數據, 原子類來保證原子性 num的原子性
AtomicInteger num;
public RedisStock(String stockName, int num) {
this.stockName = stockName;
this.num = new AtomicInteger(num);
}
public void setNum(int num) {
this.num.set(num);
}
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
public int getNum() {
return this.num.get();
}
@Override
public String toString() {
return "RedisStock{" +
"stockName='" + stockName + '\'' +
", num=" + num.get() +
'}';
}
}
}
@Getter
@Setter
@AllArgsConstructor
class StockRequest implements Serializable{
//會員id
String memberId;
//購買數量
int buyNum;
}
class SegmentDistributeLockTest{
public static void main(String[] args) throws IOException, ClassNotFoundException {
// 模擬單線程扣減
SegmentDistributeLock segmentDistributeLock = new SegmentDistributeLock();
if(segmentDistributeLock.handlerStock_02(new StockRequest("memberId_001",54))){
System.out.println("扣減成功");
}else{
System.out.println("扣減失敗");
}
segmentDistributeLock.showStocks();
/**
* 成功; 結果為:
* RedisStock{stockName='pId_stock_00', num=0} 扣減了20個
* RedisStock{stockName='pId_stock_01', num=10} 扣減了10個
* RedisStock{stockName='pId_stock_02', num=20}
* RedisStock{stockName='pId_stock_03', num=20}
* RedisStock{stockName='pId_stock_04', num=20}
*/
}
}
class ConcurrentTest implements Runnable{
// 模擬10個線程並發
private static CountDownLatch countDownLatch = new CountDownLatch(10);
private static SegmentDistributeLock segmentDistributeLock = new SegmentDistributeLock();
int num; //購買數量
//會員id
String memberId;
public ConcurrentTest(String memberId,int num) {
this.num = num;
this.memberId = memberId;
}
public static void main(String[] args) throws InterruptedException {
Random random = new Random();
// 模擬並發扣減庫存(扣減1-50個)
for (int i = 0; i < 10; i++) {
new Thread(new ConcurrentTest("memberId_"+i,random.nextInt(50) + 1),"線程"+i).start();
countDownLatch.countDown();
}
TimeUnit.SECONDS.sleep(5);
// 並發扣減庫存結束,查詢最終庫存
System.out.println("-----並發扣減庫存結束,查看剩余庫存-------");
System.out.println("-----並發扣減庫存結束,查看剩余庫存-------");
System.out.println("-----並發扣減庫存結束,查看剩余庫存-------");
segmentDistributeLock.showStocks();
}
@Override
public void run() {
try {
StockRequest request = new StockRequest(this.memberId, this.num);
// 在此阻塞,等到計數器歸零之后,再同時開始 扣庫存
System.out.println(Thread.currentThread().getName() + "已到達, 即將開始扣減庫存: "+ this.num);
countDownLatch.await();
if(segmentDistributeLock.handlerStock_02(request)){
System.out.println(Thread.currentThread().getName() + " 扣減成功, 扣減庫存為: " + this.num);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
運行結果:
