分布式鎖的實現方式及原理


* 在集群等多服務器中經常使用到同步處理一下業務,這是普通的事務是滿足不了業務需求,需要分布式鎖
*
* 分布式鎖的常用3種實現:
*        0.數據庫樂觀鎖實現
*        1.Redis實現  --- 使用redis的setnx()、get()、getset()方法,用於分布式鎖,解決死鎖問題
 *                2、zookeeper實現
 
Zookeeper實現
*           參考:http://surlymo.iteye.com/blog/2082684
*              http://www.jb51.net/article/103617.htm
1、實現原理:
基於zookeeper瞬時有序節點實現的分布式鎖,其主要邏輯如下(該圖來自於IBM網站)。大致思想即為:每個客戶端對某個功能加鎖時,在zookeeper上的與該功能對應的指定節點的目錄下,生成一個唯一的瞬時有序節點。判斷是否獲取鎖的方式很簡單,只需要判斷有序節點中序號最小的一個。當釋放鎖的時候,只需將這個瞬時節點刪除即可。同時,其可以避免服務宕機導致的鎖無法釋放,而產生的死鎖問題。
2、優點
鎖安全性高,zk可持久化
3、缺點
性能開銷比較高。因為其需要動態產生、銷毀瞬時節點來實現鎖功能。
4、實現
可以直接采用zookeeper第三方庫curator即可方便地實現分布式鎖
 
 Redis實現分布式鎖的原理:
*  1.通過setnx(lock_timeout)實現,如果設置了鎖返回1, 已經有值沒有設置成功返回0
*  2.死鎖問題:通過實踐來判斷是否過期,如果已經過期,獲取到過期時間get(lockKey),然后getset(lock_timeout)判斷是否和get相同,
*   相同則證明已經加鎖成功,因為可能導致多線程同時執行getset(lock_timeout)方法,這可能導致多線程都只需getset后,對於判斷加鎖成功的線程,
*   再加expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS)過期時間,防止多個線程同時疊加時間,導致鎖時效時間翻倍
*  3.針對集群服務器時間不一致問題,可以調用redis的 time ()獲取當前時間
 

2.Redis分分布式鎖的代碼實現

  1.定義鎖接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.jay.service.redis;
  
/**
  * Redis分布式鎖接口
  * Created by hetiewei on 2017/4/7.
  */
public interface RedisDistributionLock {
   /**
    * 加鎖成功,返回加鎖時間
    * @param lockKey
    * @param threadName
    * @return
    */
   public long lock(String lockKey, String threadName);
  
   /**
    * 解鎖, 需要更新加鎖時間,判斷是否有權限
    * @param lockKey
    * @param lockValue
    * @param threadName
    */
   public void unlock(String lockKey, long lockValue, String threadName);
  
   /**
    * 多服務器集群,使用下面的方法,代替System.currentTimeMillis(),獲取redis時間,避免多服務的時間不一致問題!!!
    * @return
    */
   public long currtTimeForRedis();
}

   2.定義鎖實現

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package com.jay.service.redis.impl;
  
import com.jay.service.redis.RedisDistributionLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
  
import java.util.concurrent.TimeUnit;
  
/**
  * Created by hetiewei on 2017/4/7.
  */
public class RedisLockImpl implements RedisDistributionLock {
  
   //加鎖超時時間,單位毫秒, 即:加鎖時間內執行完操作,如果未完成會有並發現象
   private static final long LOCK_TIMEOUT = 5 * 1000 ;
  
   private static final Logger LOG = LoggerFactory.getLogger(RedisLockImpl. class );
  
   private StringRedisTemplate redisTemplate;
  
   public RedisLockImpl(StringRedisTemplate redisTemplate) {
     this .redisTemplate = redisTemplate;
   }
  
   /**
    * 加鎖
    * 取到鎖加鎖,取不到鎖一直等待知道獲得鎖
    * @param lockKey
    * @param threadName
    * @return
    */
   @Override
   public synchronized long lock(String lockKey, String threadName) {
     LOG.info(threadName+ "開始執行加鎖" );
     while ( true ){ //循環獲取鎖
       //鎖時間
       Long lock_timeout = currtTimeForRedis()+ LOCK_TIMEOUT + 1 ;
       if (redisTemplate.execute( new RedisCallback<Boolean>() {
         @Override
         public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
           //定義序列化方式
           RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
           byte [] value = serializer.serialize(lock_timeout.toString());
           boolean flag = redisConnection.setNX(lockKey.getBytes(), value);
           return flag;
         }
       })){
         //如果加鎖成功
         LOG.info(threadName + "加鎖成功 ++++ 111111" );
         //設置超時時間,釋放內存
         redisTemplate.expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
         return lock_timeout;
       } else {
         //獲取redis里面的時間
         String result = redisTemplate.opsForValue().get(lockKey);
         Long currt_lock_timeout_str = result== null ? null :Long.parseLong(result);
         //鎖已經失效
         if (currt_lock_timeout_str != null && currt_lock_timeout_str < System.currentTimeMillis()){
           //判斷是否為空,不為空時,說明已經失效,如果被其他線程設置了值,則第二個條件判斷無法執行
           //獲取上一個鎖到期時間,並設置現在的鎖到期時間
           Long old_lock_timeout_Str = Long.valueOf(redisTemplate.opsForValue().getAndSet(lockKey, lock_timeout.toString()));
           if (old_lock_timeout_Str != null && old_lock_timeout_Str.equals(currt_lock_timeout_str)){
             //多線程運行時,多個線程簽好都到了這里,但只有一個線程的設置值和當前值相同,它才有權利獲取鎖
             LOG.info(threadName + "加鎖成功 ++++ 22222" );
             //設置超時間,釋放內存
             redisTemplate.expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
  
             //返回加鎖時間
             return lock_timeout;
           }
         }
       }
  
       try {
         LOG.info(threadName + "等待加鎖, 睡眠100毫秒" );
//        TimeUnit.MILLISECONDS.sleep(100);
         TimeUnit.MILLISECONDS.sleep( 200 );
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
     }
   }
  
   /**
    * 解鎖
    * @param lockKey
    * @param lockValue
    * @param threadName
    */
   @Override
   public synchronized void unlock(String lockKey, long lockValue, String threadName) {
     LOG.info(threadName + "執行解鎖==========" ); //正常直接刪除 如果異常關閉判斷加鎖會判斷過期時間
     //獲取redis中設置的時間
     String result = redisTemplate.opsForValue().get(lockKey);
     Long currt_lock_timeout_str = result == null ? null :Long.valueOf(result);
  
     //如果是加鎖者,則刪除鎖, 如果不是,則等待自動過期,重新競爭加鎖
     if (currt_lock_timeout_str != null && currt_lock_timeout_str == lockValue){
       redisTemplate.delete(lockKey);
       LOG.info(threadName + "解鎖成功------------------" );
     }
   }
  
   /**
    * 多服務器集群,使用下面的方法,代替System.currentTimeMillis(),獲取redis時間,避免多服務的時間不一致問題!!!
    * @return
    */
   @Override
   public long currtTimeForRedis(){
     return redisTemplate.execute( new RedisCallback<Long>() {
       @Override
       public Long doInRedis(RedisConnection redisConnection) throws DataAccessException {
         return redisConnection.time();
       }
     });
   }
  
}

  3.分布式鎖驗證     

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@RestController
@RequestMapping ( "/distribution/redis" )
public class RedisLockController {
  
   private static final String LOCK_NO = "redis_distribution_lock_no_" ;
  
   private static int i = 0 ;
  
   private ExecutorService service;
  
   @Autowired
   private StringRedisTemplate redisTemplate;
  
   /**
    * 模擬1000個線程同時執行業務,修改資源
    *
    * 使用線程池定義了20個線程
    *
    */
   @GetMapping ( "lock1" )
   public void testRedisDistributionLock1(){
  
     service = Executors.newFixedThreadPool( 20 );
  
     for ( int i= 0 ;i< 1000 ;i++){
       service.execute( new Runnable() {
         @Override
         public void run() {
           task(Thread.currentThread().getName());
         }
       });
     }
  
   }
  
   @GetMapping ( "/{key}" )
   public String getValue( @PathVariable ( "key" ) String key){
     Serializable result = redisTemplate.opsForValue().get(key);
     return result.toString();
   }
  
   private void task(String name) {
//    System.out.println(name + "任務執行中"+(i++));
  
     //創建一個redis分布式鎖
     RedisLockImpl redisLock = new RedisLockImpl(redisTemplate);
     //加鎖時間
     Long lockTime;
     if ((lockTime = redisLock.lock((LOCK_NO+ 1 )+ "" , name))!= null ){
       //開始執行任務
       System.out.println(name + "任務執行中" +(i++));
       //任務執行完畢 關閉鎖
       redisLock.unlock((LOCK_NO+ 1 )+ "" , lockTime, name);
     }
  
   }
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM