問題
(1)什么是分布式鎖?
(2)為什么需要分布式鎖?
(3)mysql如何實現分布式鎖?
(4)mysql分布式鎖的優點和缺點?
簡介
隨着並發量的不斷增加,單機的服務遲早要向多節點或者微服務進化,這時候原來單機模式下使用的synchronized或者ReentrantLock將不再適用,我們迫切地需要一種分布式環境下保證線程安全的解決方案,今天我們一起來學習一下mysql分布式鎖如何實現分布式線程安全。
基礎知識
mysql中提供了兩個函數——get_lock('key', timeout)
和release_lock('key')
——來實現分布式鎖,可以根據key
來加鎖,這是一個字符串,可以設置超時時間(單位:秒),當調用release_lock('key')
或者客戶端斷線
的時候釋放鎖。
它們的使用方法如下:
mysql> select get_lock('user_1', 10);
-> 1
mysql> select release_lock('user_1');
-> 1
get_lock('user_1', 10)
如果10秒之內獲取到鎖則返回1,否則返回0;
release_lock('user_1')
如果該鎖是當前客戶端持有的則返回1,如果該鎖被其它客戶端持有着則返回0,如果該鎖沒有被任何客戶端持有則返回null;
多客戶端案例
為了便於舉例【本篇文章由“彤哥讀源碼”原創,請支持原創,謝謝!】,這里的超時時間全部設置為0,也就是立即返回。
時刻 | 客戶端A | 客戶端B |
---|---|---|
1 | get_lock('user_1', 0) -> 1 | - |
2 | - | get_lock('user_1', 0) -> 0 |
3 | - | release_lock('user_1', 0) -> 0 |
4 | release_lock('user_1', 0) -> 1 | - |
5 | release_lock('user_2', 0) -> null | - |
6 | - | get_lock('user_1', 0) -> 1 |
7 | - | release_lock('user_1', 0) -> 1 |
Java實現
為了方便快速實現,這里使用 springboot2.1 + mybatis 實現,並且省略spring的配置,只列舉主要的幾個類。
定義Locker接口
接口中只有一個方法,入參1為加鎖的key,入參2為執行的命令。
public interface Locker {
void lock(String key, Runnable command);
}
mysql分布式鎖實現
mysql的實現中要注意以下兩點:
(1)加鎖、釋放鎖必須在同一個session(同一個客戶端)中,所以這里不能使用Mapper接口的方式調用,因為Mapper接口有可能會導致不在同一個session。
(2)可重入性是通過ThreadLocal保證的;
@Slf4j
@Component
public class MysqlLocker implements Locker {
private static final ThreadLocal<SqlSessionWrapper> localSession = new ThreadLocal<>();
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Override
public void lock(String key, Runnable command) {
// 加鎖、釋放鎖必須使用同一個session
SqlSessionWrapper sqlSessionWrapper = localSession.get();
if (sqlSessionWrapper == null) {
// 第一次獲取鎖
localSession.set(new SqlSessionWrapper(sqlSessionFactory.openSession()));
}
try {
// 【本篇文章由“彤哥讀源碼”原創,請支持原創,謝謝!】
// -1表示沒獲取到鎖一直等待
if (getLock(key, -1)) {
command.run();
}
} catch (Exception e) {
log.error("lock error", e);
} finally {
releaseLock(key);
}
}
private boolean getLock(String key, long timeout) {
Map<String, Object> param = new HashMap<>();
param.put("key", key);
param.put("timeout", timeout);
SqlSessionWrapper sqlSessionWrapper = localSession.get();
Integer result = sqlSessionWrapper.sqlSession.selectOne("LockerMapper.getLock", param);
if (result != null && result.intValue() == 1) {
// 獲取到了鎖,state加1
sqlSessionWrapper.state++;
return true;
}
return false;
}
private boolean releaseLock(String key) {
SqlSessionWrapper sqlSessionWrapper = localSession.get();
Integer result = sqlSessionWrapper.sqlSession.selectOne("LockerMapper.releaseLock", key);
if (result != null && result.intValue() == 1) {
// 釋放鎖成功,state減1
sqlSessionWrapper.state--;
// 當state減為0的時候說明當前線程獲取的鎖全部釋放了,則關閉session並從ThreadLocal中移除
if (sqlSessionWrapper.state == 0) {
sqlSessionWrapper.sqlSession.close();
localSession.remove();
}
return true;
}
return false;
}
private static class SqlSessionWrapper {
int state;
SqlSession sqlSession;
public SqlSessionWrapper(SqlSession sqlSession) {
this.state = 0;
this.sqlSession = sqlSession;
}
}
}
LockerMapper.xml
定義get_lock()、release_lock()的語句。
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="LockerMapper">
<select id="getLock" resultType="integer">
select get_lock(#{key}, #{timeout});
</select>
<select id="releaseLock" resultType="integer">
select release_lock(#{key})
</select>
</mapper>
測試類
這里啟動1000個線程,每個線程打印一句話並睡眠2秒鍾。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MysqlLockerTest {
@Autowired
private Locker locker;
@Test
public void testMysqlLocker() throws IOException {
for (int i = 0; i < 1000; i++) {
// 多節點測試
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
locker.lock("lock", ()-> {
// 可重入性測試
locker.lock("lock", ()-> {
System.out.println(String.format("time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
});
}, "Thread-"+i).start();
}
System.in.read();
}
}
運行結果
查看運行結果發現每隔2秒打印一個線程的信息,說明這個鎖是有效的,至於分布式環境下面的驗證也很簡單,起多個MysqlLockerTest實例即可。
time: 1568715905952, threadName: Thread-3
time: 1568715907955, threadName: Thread-4
time: 1568715909966, threadName: Thread-8
time: 1568715911967, threadName: Thread-0
time: 1568715913969, threadName: Thread-1
time: 1568715915972, threadName: Thread-9
time: 1568715917975, threadName: Thread-6
time: 1568715919997, threadName: Thread-5
time: 1568715921999, threadName: Thread-7
time: 1568715924001, threadName: Thread-2
總結
(1)分布式環境下需要使用分布式鎖,單機的鎖將無法保證線程安全;
(2)mysql分布式鎖是基於get_lock('key', timeout)
和release_lock('key')
兩個函數實現的;
(3)mysql分布式鎖是可重入鎖;
彩蛋
使用mysql分布式鎖需要注意些什么呢?
答:必須保證多個服務節點使用的是同一個mysql庫【本篇文章由“彤哥讀源碼”原創,請支持原創,謝謝!】。
mysql分布式鎖具有哪些優點?
答:1)方便快捷,因為基本每個服務都會連接數據庫,但是不是每個服務都會使用redis或者zookeeper;
2)如果客戶端斷線了會自動釋放鎖,不會造成鎖一直被占用;
3)mysql分布式鎖是可重入鎖,對於舊代碼的改造成本低;
mysql分布式鎖具有哪些缺點?
答:1)加鎖直接打到數據庫,增加了數據庫的壓力;
2)加鎖的線程會占用一個session,也就是一個連接數,如果並發量大可能會導致正常執行的sql語句獲取不到連接;
3)服務拆分后如果每個服務使用自己的數據庫,則不合適;
4)相對於redis或者zookeeper分布式鎖,效率相對要低一些;
歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。