Redisson實現分布式鎖(一)


為什么要使用分布式鎖?

單機情況下,多線程同時訪問並改變臨界資源(可變共享變量),將會使得這個變量不可預知,所以我們引入了同步(lock—synchronized)。但在分布式場景下(多機部署),業務上我們需保證某個共享變量數據最終一致性,但實際每個機器的變量是獨立的,同步(lock—synchronized)的機制僅僅限於單機,這種情況下,就需要有一個多機情況下的共享數據庫(通常為redis),通過某種手段達到與同步一樣效果機制。

demo

 <!-- redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.2.3</version>
        </dependency>
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Bean
    public Redisson redissonSingle() {
        //支持Single單機,Master/Slave 主從,Sentinel哨兵,Cluster集群等模式
        //此為單機模式
        Config config = new Config();
        config.useSingleServer()
                //redis://127.0.0.1:6379  報錯:原因未知??
                .setAddress("127.0.0.1:6379")
                .setTimeout(3000);
        return (Redisson)Redisson.create(config);
    }
}
import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Date;
import java.util.concurrent.TimeUnit;

@Service
public class ProductService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private Redisson redisson;

    /**
     * 分布式鎖的key
     */
    private String lockKey = "key_product_id";

    /**
     * 從redis或取
     */
    public int getProductId() {
        String productid = stringRedisTemplate.opsForValue().get("product");
        return Integer.parseInt(productid);
    }

    /**
     * 修改product
     *
     * @return
     */
    public void setProductId() {
        RLock lock = redisson.getLock(lockKey);

        //60s 后自動釋放鎖
        lock.lock(60, TimeUnit.SECONDS);

        String productId = stringRedisTemplate.opsForValue().get("product");

        /*獲取redis中的key-value對象,key不存在沒關系
        RBucket<Integer> keyObject = redisson.getBucket("product");
        System.out.println(keyObject.get());
        keyObject.set(100);
        */

        int sprodId = Integer.parseInt(productId);

        if (sprodId > 0) {
            stringRedisTemplate.opsForValue().set("product", --sprodId + "");
            System.out.println(Thread.currentThread().getName() + "  lockkey:" + lockKey + ",product:" + sprodId + "");
        }
        lock.unlock(); //釋放鎖
    }
}
    @Test
    public void testSetProductId(){
        //開啟100線程(有興趣可以另起一個工程,看看兩個工程執行細節)
        ExecutorService executorService= Executors.newFixedThreadPool(5);
        for(int i = 0;i < 1000;i++){
            executorService.submit(()->{productService.setProductId();});
        }

        while(true){
            try {
                Thread.sleep(50000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            int prodId = productService.getProductId();
            if(prodId <= 0){
                executorService.shutdown();
                break;
            }
        }
    }

結果:

...

pool-4-thread-4 lockkey:key_product_id,product:3
pool-4-thread-3 lockkey:key_product_id,product:2
pool-4-thread-5 lockkey:key_product_id,product:1
pool-4-thread-1 lockkey:key_product_id,product:0

其實上面代碼不是太嚴謹:

RedissonLock源碼:

當鎖再Thread1持有時,試想當線程Thread2在lock.lock()后發生中斷時,異常被捕捉,lock不會因為等待Thread1線程釋放鎖而阻塞,而是直接處理業務邏輯。這就導致需要同步的部分沒有同步,並且試圖釋放不是自己的鎖,發生異常。

eg:

 

public void setProductId() {
        RLock lock = redisson.getLock(lockKey);

        //60s 后自動釋放鎖
        lock.lock(60, TimeUnit.SECONDS);

        Thread t = Thread.currentThread();

        //if(!t.isInterrupted()){
            try{
                String productId = stringRedisTemplate.opsForValue().get("product");
                int sprodId = Integer.parseInt(productId);
                if (sprodId > 0) {
                    stringRedisTemplate.opsForValue().set("product", --sprodId + "");
                    System.out.println(">>>>>>"+t.getName() + ",product:" + sprodId + "");
                }

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }finally {
                lock.unlock(); //釋放鎖
            }
        //}else{
        //    System.out.println(t.getName()+"發生中斷.....");
        //}
    }

 

 @Test
    public void testSetProductId2(){
        //開啟2個線程
        Thread thread1 = new Thread(()-> productService.setProductId());
        Thread thread2 = new Thread(()-> productService.setProductId());

        thread1.start();
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread2.start();

        thread2.interrupt();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

 運行結果:

可知:thread-10取值是9,而不是Thread-9執行完的8。

 嚴謹的寫法:

public void setProductId() {
        RLock lock = redisson.getLock(lockKey);

        //60s 后自動釋放鎖
        lock.lock(60, TimeUnit.SECONDS);

        Thread t = Thread.currentThread();

        if(!t.isInterrupted()){
            try{
                String productId = stringRedisTemplate.opsForValue().get("product");
                int sprodId = Integer.parseInt(productId);
                if (sprodId > 0) {
                    stringRedisTemplate.opsForValue().set("product", --sprodId + "");
                    System.out.println(">>>>>>"+t.getName() + ",product:" + sprodId + "");
                }          
            }finally {
                lock.unlock(); //釋放鎖
            }
        }else{
            System.out.println(t.getName()+"發生中斷.....");
        }

    }

當然,上面的方法用lock是阻塞方法,可以用tryLock()方法。

 public void setProductId2() {
        RLock lock = redisson.getLock(lockKey);

        //60s 后自動釋放鎖
        try {
            boolean locked = lock.tryLock(60,TimeUnit.SECONDS);
            if(locked){
                String productId = stringRedisTemplate.opsForValue().get("product");
                int sprodId = Integer.parseInt(productId);
                if (sprodId > 0) {
                    stringRedisTemplate.opsForValue().set("product", --sprodId + "");
                    System.out.println(">>>>>>"+Thread.currentThread().getName() + ",product:" + sprodId + "");
                }

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock(); //釋放鎖
        }
    }
  @Test
    public void testSetProductId2(){
        //開啟2個線程
        Thread thread1 = new Thread(()-> productService.setProductId2());
        Thread thread2 = new Thread(()-> productService.setProductId2());

        thread1.start();
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread2.start();

        //thread2.interrupt();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

執行結果:

 

 RLock接口的特點

繼承標准接口Lock

擁有標准鎖接口的所有特性,比如lock,unlock,trylock等等。

擴展標准接口Lock

擴展了很多方法,常用的主要有:強制鎖釋放,帶有效期的鎖,還有一組異步的方法。其中前面兩個方法主要是解決標准lock可能造成的死鎖問題。比如某個線程獲取到鎖之后,線程所在機器死機,此時獲取了鎖的線程無法正常釋放鎖導致其余的等待鎖的線程一直等待下去。

可重入機制

各版本實現有差異,可重入主要考慮的是性能,同一線程在未釋放鎖時如果再次申請鎖資源不需要走申請流程,只需要將已經獲取的鎖繼續返回並且記錄上已經重入的次數即可,與jdk里面的ReentrantLock功能類似。重入次數靠hincrby命令來配合使用,詳細的參數下面的代碼。

判斷是否是同一個線程:

public class RedissonLock extends RedissonExpirable implements RLock {
   
    final UUID id;
    protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
        super(commandExecutor, name);
        this.internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L);
        this.commandExecutor = commandExecutor;
        this.id = id;
    }

    String getLockName(long threadId) {
        return this.id + ":" + threadId;
    }

RLock獲取鎖的兩種場景

這里拿tryLock的源碼來看:tryAcquire方法是申請鎖並返回鎖有效期還剩余的時間,如果為空說明鎖未被其它線程申請直接獲取並返回,如果獲取到時間,則進入等待競爭邏輯。

 

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(leaseTime, unit);
        if(ttl == null) {
            return true;
        } else {
            //有競爭獲取鎖
            time -= System.currentTimeMillis() - current;
            if(time <= 0L) {
                return false;
            } else {
                current = System.currentTimeMillis();
                
                final RFuture subscribeFuture = this.subscribe(threadId);
                if(!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
                    //超過客戶端設置的最大等待時間,取消訂閱,返回false
                    if(!subscribeFuture.cancel(false)) {
                        subscribeFuture.addListener(new FutureListener() {
                            public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                                if(subscribeFuture.isSuccess()) {
                                    RedissonLock.this.unsubscribe(subscribeFuture, threadId);
                                }

                            }
                        });
                    }

                    return false;
                } else {
                    boolean var16;
                    try {
                        time -= System.currentTimeMillis() - current;
                        if(time <= 0L) {
                            //不等待申請鎖,返回false
                            boolean currentTime1 = false;
                            return currentTime1;
                        }

                        do {
                            long currentTime = System.currentTimeMillis();
                            //tryAcquire方法是申請鎖並返回鎖有效期還剩余的時間
                            //如果為空說明鎖未被其它線程申請直接獲取並返回
                            //如果獲取到時間,則進入等待競爭邏輯
                            ttl = this.tryAcquire(leaseTime, unit);
                            if(ttl == null) {                                
                                var16 = true;
                                return var16;
                            }

                            time -= System.currentTimeMillis() - currentTime;
                            if(time <= 0L) {
                                //不等待申請鎖,返回false
                                var16 = false;
                                return var16;
                            }

                            currentTime = System.currentTimeMillis();
                            
                            //通過信號量(共享鎖)阻塞,等待解鎖消息
                            if(ttl.longValue() >= 0L && ttl.longValue() < time) {
                                //ttl(剩余時間) 小於time(等待時間),就在ttl時間范圍內
                                this.getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS);
                            } else {
                                //在time(等待時間)范圍內
                                this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                            }
                            
                            //更新等待時間(最大等待時間-已經消耗的阻塞時間)
                            time -= System.currentTimeMillis() - currentTime;
                        } while(time > 0L);

                        var16 = false;
                    } finally {
                        // 無論是否獲得鎖,都要取消訂閱解鎖消息
                        this.unsubscribe(subscribeFuture, threadId);
                    }

                    return var16;
                }
            }
        }
    }
View Code

 首先看this.tryAcquire()

 private Long tryAcquire(long leaseTime, TimeUnit unit) {
        return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId()));
    }

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1L) {
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(30L, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.addListener(new FutureListener<Long>() {
                public void operationComplete(Future<Long> future) throws Exception {
                    if (future.isSuccess()) {
                        Long ttlRemaining = (Long)future.getNow();
                        if (ttlRemaining == null) {
                            RedissonLock.this.scheduleExpirationRenewal(threadId);
                        }

                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }
View Code

tryAcquire()方法是申請鎖並返回鎖有效期還剩余的時間。

發現源碼其實是一個lua腳本進行加鎖操作(由於高版本的redis支持lua腳本,所以redisson也對其進行了支持,采用了腳本模式)

參數
KEYS[1](getName()) :需要加鎖的key,這里需要是字符串類型。
ARGV[1](internalLockLeaseTime) :鎖的超時時間,防止死鎖
ARGV[2](getLockName(threadId)) :鎖的唯一標識, id(UUID.randomUUID()) + “:” + threadId

--檢查key是否被占用了,如果沒有則設置超時時間和唯一標識,初始化value=1
if
(redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); //key,field,1 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;

--如果鎖重入,需要判斷鎖的key field 都一致情況下 value 加一
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); //key,field,+1
   --鎖重入重新設置超時時間 redis.call(
'pexpire', KEYS[1], ARGV[1]); return nil;
end;
--返回lockKey剩余的過期時間
return redis.call('pttl', KEYS[1]);

加鎖的流程:

  1. 判斷lock鍵是否存在,不存在直接調用hset存儲當前線程信息並且設置過期時間,返回nil,告訴客戶端直接獲取到鎖。
  2. 判斷lock鍵是否存在,存在則將重入次數加1,並重新設置過期時間,返回nil,告訴客戶端直接獲取到鎖。
  3. 被其它線程已經鎖定,返回鎖有效期的剩余時間,告訴客戶端需要等待。
key[1:]lock.com.paic.phssp.springtest.redisson.ProductService.anotationProd#product.1
agv[1]:e9ca7a5b-e7d5-4ebe-968c-1759f690984d75
agv[2]:1000

 

 同理unlockInnerAsync()解鎖:

參數:
KEYS[1](getName()):需要加鎖的key,這里需要是字符串類型。
KEYS[2](getChannelName()):redis消息的ChannelName,一個分布式鎖對應唯一的一個 channelName:“redisson_lock__channel__{” + getName() + “}”
ARGV[1](LockPubSub.unlockMessage):redis消息體,這里只需要一個字節的標記就可以,主要標記redis的key已經解鎖,再結合redis的Subscribe,能喚醒其他訂閱解鎖消息的客戶端線程申請鎖。
ARGV[2](internalLockLeaseTime):鎖的超時時間,防止死鎖
ARGV[3](getLockName(threadId)) :鎖的唯一標識, id(UUID.randomUUID()) + “:” + threadId--

--如果keys[1]不存在,則發布消息,說明已經被解鎖了
if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
--key和field不匹配,說明當前客戶端線程沒有持有鎖,不能主動解鎖  
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end;
--將value減1,這里主要用在重入鎖
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else
--刪除key並消息 redis.call(
'del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;

解鎖

  1. 如果lock鍵不存在,發消息說鎖已經可用
  2. 如果鎖不是被當前線程鎖定,則返回nil
  3. 由於支持可重入,在解鎖時將重入次數需要減1
  4. 如果計算后的重入次數>0,則重新設置過期時間
  5. 如果計算后的重入次數<=0,則發消息說鎖已經可用

 鎖續期:

就是當線程運行時間超過lock過時時間,如何保證鎖不釋放,而是等到線程結束后釋放。

tryAcquireAsync()-->scheduleExpirationRenewal()

private void scheduleExpirationRenewal(final long threadId) {
        if (!expirationRenewalMap.containsKey(this.getEntryName())) {
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
                    future.addListener(new FutureListener<Boolean>() {
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                            if (!future.isSuccess()) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
                            } else {
                                if ((Boolean)future.getNow()) {
                                    RedissonLock.this.scheduleExpirationRenewal(threadId);
                                }

                            }
                        }
                    });
                }
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
                task.cancel();
            }

        }
    }
View Code

獲取鎖成功就會開啟一個定時任務,頻率:internalLockLeaseTime / 3L,當線程沒結束會續期,當宕機時,定時任務跑不了,就不會續期。鎖到期就釋放。

參考:

https://www.cnblogs.com/zhongkaiuu/p/redisson.html

https://www.cnblogs.com/ASPNET2008/p/6385249.html

https://www.jianshu.com/p/b12e1c0b3917


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM