在分布式系統中,分布式鎖是一個很常見的技術。即有很多個進程同時訪問同一個共享資源沒有同步訪問,資源的載體可能是傳統關系型數據庫或者NoSQL。
如果是在單機環境中,可以使用ReentrantLock或者synchronized代碼塊來實現,然而這些在分布式環境下卻不能滿足要求。
例如有這樣的一個場景:
多台服務器同時從MQ消息隊列接消息,接到消息后的處理邏輯是:根據消息中的業務id去查詢數據庫,如果數據庫中不存在,則插入,反之則更新。
這樣就會出現這樣一個問題:服務器A和服務器B中接收到的消息中業務id是一樣的,2台服務器同時到數據庫中查詢,都不存在,則都會進行插入操作,最后的結果是表中有2條一樣的記錄(或者有主鍵約束的話報主建不唯一異常插入失敗),顯然這不是我能所期望的!
分布式環境中多台服務器相當於多個獨立的jvm環境,有多個進程(注意不是線程)對同一資源進行操作,jdk中用作同步的Lock或者synchronized就會失效。
這里就需要用到分布式鎖,目前實現分布式鎖的主要技術大概有:
-
基於Redis實現(或者其他緩存memcached,tair),主要基於redis的setnx(set if not exist)命令;
-
基於Zookeeper實現;
-
基於數據庫表version字段實現,樂觀鎖,兩個線程可以同時讀取到原有的version值,但是最終只有一個可以完成操作;
下面是redis實現分布式鎖的方法,主要用到了Redisson,它是redis官方推薦的分布式java客戶端,和Jedis相比它實現了分布式和可擴展的java數據結構,
這里有一篇對比文章:Jedis與Redisson選型對比
Redisson的介紹可以到:https://github.com/redisson/redisson/wiki/1.-%E6%A6%82%E8%BF%B0 這里去了解!
具體實現代碼:
import org.redisson.Config; import org.redisson.Redisson; import org.redisson.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author created by dehong * @date 2018年3月4日 下午4:12:42 * */ public class RedissonManager { private static final Logger logger = LoggerFactory.getLogger(RedissonManager.class); private static RedissonClient redisson; //private static Config config; /** * 通過配置文件初始化 */ static{ logger.info("init redisson ..."); Config config = null; try { config = Config.fromYAML(RedissonManager.class.getClassLoader().getResourceAsStream("redisson.yaml")); //config = Config.fromJSON(RedissonManager.class.getClassLoader().getResourceAsStream("redisson.json")); } catch (Exception e) { logger.error("RedissonClient init failed !", e); } redisson = Redisson.create(config); } /** * 獲取Redisson的實例對象 * @return */ public static Redisson getRedisson(){ if(redisson == null){ logger.info("RedissonClient init failed !"); return null; } return (Redisson) redisson; } /** * 測試Redisson是否正常 * @param args */ public static void main(String[] args) { Redisson redisson = RedissonManager.getRedisson(); System.out.println("redisson = " + redisson); } }
上面的RedissonClient初始化是用配置文件yaml的形式,也可以用json,可以去參考官方文檔 https://github.com/redisson/redisson
yaml示例(單機redis節點模式)
--- singleServerConfig: idleConnectionTimeout: 10000 pingTimeout: 1000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 reconnectionTimeout: 3000 failedAttempts: 3 password: null subscriptionsPerConnection: 5 clientName: null address: - redis://192.168.43.84:6379 subscriptionConnectionMinimumIdleSize: 1 subscriptionConnectionPoolSize: 50 connectionMinimumIdleSize: 32 connectionPoolSize: 64 database: 0 dnsMonitoring: false dnsMonitoringInterval: 5000
工具類:
import java.util.concurrent.TimeUnit; import org.redisson.Redisson; import org.redisson.core.RLock; /** * * @author created by dehong * @date 2018年3月4日 下午4:30:11 * Redisson分布式鎖 工具類 */ public class RedissonLockUtil { private static Redisson redisson = RedissonManager.getRedisson(); private static final String LOCK_FLAG = "redissonlock_"; /** * 根據name對進行上鎖操作,redissonLock 阻塞事的,采用的機制發布/訂閱 * @param key */ public static void lock(String key){ String lockKey = LOCK_FLAG + key; RLock lock = redisson.getLock(lockKey); //lock提供帶timeout參數,timeout結束強制解鎖,防止死鎖 :1分鍾 lock.lock(1, TimeUnit.MINUTES); } /** * 根據name對進行解鎖操作 * @param key */ public static void unlock(String key){ String lockKey = LOCK_FLAG + key; RLock lock = redisson.getLock(lockKey); //如果鎖被當前線程持有,則釋放 if(lock.isHeldByCurrentThread()){ lock.unlock(); } } }
注意:上面的unlock()中不加isHeldByCurrentThread()條件的話,在執行的task的時間超過timeout時,此時如果unlock,其實redisson已經主動unlock了,就會出現IllegalMonitorStateException 異常
調用:
try { //獲取鎖,這里的key可以上面場景中的業務id RedissonLockUtil.lock(key); //具體要執行的代碼.... } catch (Exception e) { e.printStackTrace(); }finally { //釋放鎖 RedissonLockUtil.unlock(key); }