一、基本概念
1、引入
傳統的鎖都是有JDK官方提供的鎖的解決方案,也就是說這些鎖只能在一個JVM進程內有效,我們把這種鎖叫做單體應用鎖。但是,在互聯網高速發展的今天,單體應用鎖能夠滿足我們的需求嗎?
新的閱讀體驗:http://www.zhouhong.icu/post/143
本篇文章所有代碼:https://github.com/Tom-shushu/Distributed-system-learning-notes/
2、互聯網系統架構的演進
在互聯網系統發展之初,系統比較簡單,消耗資源小,用戶訪問量也比較少,我們只部署一個Tomcat應用就可以滿足需求。系統架構圖如下:

一個Tomcat可以看作是一個JVM進程,當大量的請求並發到達系統時,所有的請求都落在這唯一的一個Tomcat上,如果某些請求方法是需要加鎖的,比如:秒殺扣減庫存,是可以滿足需求的,這和我們前面章節所講的內容是一樣的。但是隨着訪問量的增加,導致一個Tomcat難以支撐,這時我們就要集群部署Tomcat,使用多個Tomcat共同支撐整個系統

上圖中,我們部署了兩個Tomcat,共同支撐系統。當一個請求到達系統時,首先會經過Nginx,Nginx主要是做負載轉發的,它會根據自己配置的負載均衡策略將請求轉發到其中的一個Tomcat中。當大量的請求並發訪問時,兩個Tomcat共同承擔所有的訪問量,這時,我們同樣在秒殺扣減庫存的場景中,使用單體應用鎖,還能夠滿足要求嗎?
3、單體應用鎖的局限性
如上圖所示,在整個系統架構中,存在兩個Tomcat,每個Tomcat是一個JVM。在進行秒殺業務的時候,由於大家都在搶購秒殺商品,大量的請求同時到達系統,通過Nginx分發到兩個Tomcat上。我們通過一個極端的案例場景,可以更好地理解單體應用鎖的局限性。假如,秒殺商品的數量只有1個,這時,這些大量的請求當中,只有一個請求可以成功的搶到這個商品,這就需要在扣減庫存的方法上加鎖,扣減庫存的動作只能一個一個去執行,而不能同時去執行,如果同時執行,這1個商品可能同時被多個人搶到,從而產生超賣現象。加鎖之后,扣減庫存的動作一個一個去執行,凡是將庫存扣減為負數的,都拋出異常,提示該用戶沒有搶到商品。通過加鎖看似解決了秒殺的問題,但是事實上真的是這樣嗎?
我們看到系統中存在兩個Tomcat,我們加的鎖是JDK提供的鎖,這種鎖只能在一個JVM下起作用,也就是 在一個Tomcat內是沒有問題的。當存在兩個或兩個以上的Tomcat時,大量的並發請求分散到不同的Tomcat上,在每一個Tomcat中都可以防止並發的產生,但是在多個Tomcat之間,每個Tomcat中獲得鎖的這個請求,又產生了並發,從而產生超賣現象。這也就是單體應用鎖的局限性,它只能在一個JVM內加鎖,而不能從這個應用層面去加鎖。
那么這個問題如何解決呢?這就需要使用分布式鎖了,在整個應用層面去加鎖。什么是分布式鎖呢?我們怎么去使用分布式鎖呢?
4、什么是分布式鎖
在說分布式鎖之前,我們看一看單體應用鎖的特點,單體應用鎖是在一個JVM進程內有效,無法跨JVM、跨進程。那么分布式鎖的定義就出來了,分布式鎖就是可以跨越多個JVM、跨越多個進程的鎖,這種鎖就叫做分布式鎖。
5、分布式鎖的設計思路
在上圖中,由於Tomcat是由Java啟動的,所以每個Tomcat可以看成一個JVM,JVM內部的鎖是無法跨越多個進程的。所以,我們要實現分布式鎖,我們只能在這些JVM之外去尋找,通過其他的組件來實現分布式鎖。系統的架構如圖所示:

兩個Tomcat通過第三方的組件實現跨JVM、跨進程的分布式鎖。這就是分布式鎖的解決思路,找到所有JVM可以共同訪問的第三方組件,通過第三方組件實現分布式鎖。
6、目前存在的分布式的方案
分布式鎖都是通過第三方組件來實現的,目前比較流行的分布式鎖的解決方案有:
- 數據庫,通過數據庫可以實現分布式鎖,但是在高並發的情況下對數據庫壓力較大,所以很少使用。
- Redis,借助Redis也可以實現分布式鎖,而且Redis的Java客戶端種類很多,使用的方法也不盡相同。
- Zookeeper,Zookeeper也可以實現分布式鎖,同樣Zookeeper也存在多個Java客戶端,使用方法也不相同。
二、電商平台中針對超賣的解決思路
① 單體架構下針對超賣的解決方案
1、超賣現象一
什么是超賣:某件商品庫存數量10件,結果賣出了15件。
A和B同時下單,同時讀到庫存,同時減庫存,同時更新數據庫,這是庫存減1,結果卻下單了兩件。
解決辦法:
扣減庫存不在程序中進行,同時通過數據庫解決;向數據庫傳遞庫存增量,扣減1個庫存,增量為-1;在數據庫update語句計算庫存,通過update行鎖解決並發問題
2、超賣現象二
使用上述通過數據庫行鎖解決,會出現下面的第二種現象:數據庫的庫存會減為負數。
解決方案一
更新庫存成功后,再次檢索商品庫存,如果商品為負數,拋出異常。
解決方案二
添加鎖、將數據庫庫存校驗和數據庫庫存更新綁定到一起加上鎖,每次只能有一個得到這個鎖,從而避免庫存被減為負數(-1)的情況。
3、具體代碼實現
- 創建數據庫表
CREATE DATABASE /*!32312 IF NOT EXISTS*/`distribute` /*!40100 DEFAULT CHARACTER SET utf8mb4 */; USE `distribute`; DROP TABLE IF EXISTS `order`; CREATE TABLE `order` ( `id` int(11) NOT NULL AUTO_INCREMENT, `order_status` int(1) NOT NULL, `receiver_name` varchar(255) NOT NULL, `receiver_mobile` varchar(11) NOT NULL, `order_amount` decimal(11,0) NOT NULL, `create_time` time NOT NULL, `create_user` varchar(255) NOT NULL, `update_time` time NOT NULL, `update_user` varchar(255) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8mb4; DROP TABLE IF EXISTS `order_item`; CREATE TABLE `order_item` ( `id` int(11) NOT NULL AUTO_INCREMENT, `order_id` int(11) NOT NULL, `product_id` int(11) NOT NULL, `purchase_price` decimal(11,0) NOT NULL, `purchase_num` int(3) NOT NULL, `create_time` time NOT NULL, `create_user` varchar(255) NOT NULL, `update_time` time NOT NULL, `update_user` varchar(255) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4; DROP TABLE IF EXISTS `product`; CREATE TABLE `product` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id', `product_name` varchar(255) NOT NULL COMMENT '商品名稱', `price` decimal(11,0) NOT NULL COMMENT '價格', `count` int(5) NOT NULL COMMENT '庫存', `product_desc` varchar(255) NOT NULL COMMENT '描述', `create_time` time NOT NULL COMMENT '創建時間', `create_user` varchar(255) NOT NULL COMMENT '創建人', `update_time` time NOT NULL COMMENT '更新時間', `update_user` varchar(255) NOT NULL COMMENT '更新人', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=100101 DEFAULT CHARSET=utf8mb4; insert into `product`(`id`,`product_name`,`price`,`count`,`product_desc`,`create_time`,`create_user`,`update_time`,`update_user`) values (100100,'測試商品','1',1,'測試商品','18:06:00','周紅','19:19:21','xxx'); /**后續分布式鎖需要用到**/ DROP TABLE IF EXISTS `distribute_lock`; CREATE TABLE `distribute_lock` ( `id` int(11) NOT NULL AUTO_INCREMENT, `business_code` varchar(255) NOT NULL COMMENT '根據業務代碼區分,不同業務使用不同鎖', `business_name` varchar(255) NOT NULL COMMENT '注釋,標記編碼用途', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4; insert into `distribute_lock`(`id`,`business_code`,`business_name`) values (1,'demo','test');
- 訂單創建,庫存減1等主要邏輯代碼(這里使用 ReentrantLock 當然,也可以使用其他鎖 )
// 注意:這邊不能使用注解的方式回滾,不然會在事務提交前下一個線程會進來 // @Transactional(rollbackFor = Exception.class) public Integer createOrder() throws Exception{ Product product = null; lock.lock(); try { // 開啟事務 TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); // 查詢到所要購買的商品 product = productMapper.selectByPrimaryKey(purchaseProductId); if (product==null){ platformTransactionManager.rollback(transaction1); throw new Exception("購買商品:"+purchaseProductId+"不存在"); } // 獲取商品當前庫存 Integer currentCount = product.getCount(); System.out.println(Thread.currentThread().getName()+"庫存數:"+currentCount); // 校驗庫存 (購買商品數量大於庫存數量,拋出異常) if (purchaseProductNum > currentCount){ platformTransactionManager.rollback(transaction1); throw new Exception("商品"+purchaseProductId+"僅剩"+currentCount+"件,無法購買"); } productMapper.updateProductCount(purchaseProductNum,"xxx",new Date(),product.getId()); platformTransactionManager.commit(transaction1); }finally { lock.unlock(); } // 創建訂單 TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition); Order order = new Order(); order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum))); order.setOrderStatus(1);//待處理 order.setReceiverName("xxx"); order.setReceiverMobile("15287653421"); order.setCreateTime(new Date()); order.setCreateUser("不不不不"); order.setUpdateTime(new Date()); order.setUpdateUser("哈哈哈哈"); orderMapper.insertSelective(order); // 創建訂單明細 OrderItem orderItem = new OrderItem(); orderItem.setOrderId(order.getId()); orderItem.setProductId(product.getId()); orderItem.setPurchasePrice(product.getPrice()); orderItem.setPurchaseNum(purchaseProductNum); orderItem.setCreateUser("不不不"); orderItem.setCreateTime(new Date()); orderItem.setUpdateTime(new Date()); orderItem.setUpdateUser("哈哈哈哈"); orderItemMapper.insertSelective(orderItem); // 事務提交 platformTransactionManager.commit(transaction); return order.getId(); }
- 測試(使用五個線程同時並發的下單)
@Test public void concurrentOrder() throws InterruptedException { Thread.sleep(60000); CountDownLatch cdl = new CountDownLatch(5); CyclicBarrier cyclicBarrier = new CyclicBarrier(5); // 創建5個線程執行下訂單操作 ExecutorService es = Executors.newFixedThreadPool(5); for (int i =0;i<5;i++){ es.execute(()->{ try { // 等5個線程同時達到 await()時再執行創建訂單服務,這時候5個線程會堆積到同一時間執行 cyclicBarrier.await(); Integer orderId = orderService.createOrder(); System.out.println("訂單id:"+orderId); } catch (Exception e) { e.printStackTrace(); }finally { // 每個線程執行完成之后會減一 cdl.countDown(); } }); } cdl.await(); es.shutdown(); }
② 分布式架構下分布式鎖的實現
一、基於數據庫實現分布式鎖
多個進程、多個線程訪問共同組件---數據庫
通過select...for update 訪問同一條數據、for update 鎖定數據
- 在mapper.xml 里面加入如下自定義的SQL
<select id="selectDistributeLock" resultType="com.example.distributelock.model.DistributeLock"> select * from distribute_lock where business_code = #{businessCode,jdbcType=VARCHAR} for update </select>
- 主要的邏輯實現
@RequestMapping("singleLock") /** * 沒有添加 Transactional 注解前,查詢分布式鎖和sleep二者不是原子操作,在獲取到分布式鎖后自動提交事務, * 故不會阻止第二個請求獲取鎖。添加了注解后,在sleep結束前,事務一直未提交,故會等待sleep結束后再行提交事務, * 此時第二個請求才能從數據庫中獲取分布式鎖 */ @Transactional(rollbackFor = Exception.class) public String singleLock() throws Exception { log.info("我進入了方法!"); DistributeLock distributeLock = distributeLockMapper.selectDistributeLock("demo"); if (distributeLock==null) throw new Exception("分布式鎖找不到"); log.info("我進入了鎖!"); try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } return "我已經執行完成!"; }
- 另一個項目和這個相同,只需要更改端口號即可
優點:
- 簡單方便,易於理解,易於操作
缺點:
- 並發量大,對數據庫壓力較大
建議:
- 作為鎖的數據庫與業務數據庫分開
二、基於Redis的SetNX實現分布式鎖
① 獲取鎖的Redis命令
- Set resource_name my_random_value NX PX 30000
- resource_name:資源名稱,可根據不同的業務區分不同的鎖
- my_random_value:隨機值,每個線程的隨機值都不相同,用於釋放鎖時的校驗
- NX: key不存在是設置成功,key存在則設置不成功
- PX:自動失效時間,出現異常情況,鎖可以過期失效
② 實現原理
- 利用NX的原子性,多個線程並發時,只有一個線程可以設置成功
- 設置成功即獲得鎖,可以執行后續的業務處理
- 如果出現異常,過了鎖的有效期,鎖自動釋放。
- 釋放鎖采用了Redis的delete命令
- 釋放鎖時校驗值錢設置的隨機數,相同才能釋放
- 釋放鎖的LUA腳本
if redis.call("get",KEYS[1])==argv[1] then return redis.call("del",KEYS[1]) else return 0 end
③ 為什么要添加LUA腳本校驗:

沒有校驗可能導致鎖的混亂,如上圖所示:A可能釋放掉了B的鎖,會出現問題。
④ Redis分布式鎖關鍵代碼封裝
package com.example.distributelock.lock; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.data.redis.core.types.Expiration; import java.util.Arrays; import java.util.List; import java.util.UUID; @Slf4j public class RedisLock implements AutoCloseable { private RedisTemplate redisTemplate; private String key; private String value; // 過期時間 單位:秒 private int expireTime; public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){ this.redisTemplate = redisTemplate; this.key = key; this.expireTime=expireTime; this.value = UUID.randomUUID().toString(); } /** * 獲取分布式鎖 * @return */ public boolean getLock(){ RedisCallback<Boolean> redisCallback = connection -> { //設置NX RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent(); //設置過期時間 Expiration expiration = Expiration.seconds(expireTime); //序列化key byte[] redisKey = redisTemplate.getKeySerializer().serialize(key); //序列化value byte[] redisValue = redisTemplate.getValueSerializer().serialize(value); //執行setnx操作 Boolean result = connection.set(redisKey, redisValue, expiration, setOption); return result; }; //獲取分布式鎖 Boolean lock = (Boolean)redisTemplate.execute(redisCallback); return lock; } // 釋放鎖 public boolean unLock() { String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class); List<String> keys = Arrays.asList(key); Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value); log.info("釋放鎖的結果:"+result); return result; } @Override public void close() throws Exception { unLock(); } }
⑤ 測試
@RequestMapping("redisLock") public String redisLock(){ log.info("我進入了方法!"); try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){ if (redisLock.getLock()) { log.info("我進入了鎖!!"); Thread.sleep(15000); } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } log.info("方法執行完成"); return "方法執行完成"; }
⑥ 在項目中使用ESJOB定時任務沒問題,但是如果項目中使用了SpringTask來定時,那么在集群中可能會出現任務重復執行的情況。
解決辦法:使用分布式鎖在定時到達后,在執行任務之前,哪個節點獲取到了鎖,哪個節點就來執行這個任務。

⑦ 代碼實現
public class SchedulerService { @Autowired private RedisTemplate redisTemplate; @Scheduled(cron = "0/5 * * * * ?") public void sendSms(){ try(RedisLock redisLock = new RedisLock(redisTemplate,"autoSms",30)) { if (redisLock.getLock()){ log.info("每五秒執行這個程序!"); } } catch (Exception e) { e.printStackTrace(); } } }
三、基於Zookeeper實現分布式鎖
zookeeper的觀察器
- 可設置觀察器的3個方法:getData();getChildren();exists();
- 節點數據發生變化,發送給客戶端;
- 觀察器只能監控一次,再監控需重新設置;
實現原理
- 利用zookeeper的瞬時有序節點的特性;
- 多線程並發創建瞬時節點時,得到有序的序列;
- 序列號最小的線程獲得鎖;
- 其他線程監聽自己序號的前一個序號;
- 前一個線程執行完成,刪除自己序號節點;
- 下一個序號的線程得到通知,繼續執行;
- 以此類推,創建節點時,已經確定了線程的執行順序;
代碼實現:
package com.example.distributelock.lock; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; @Slf4j public class ZkLock implements Watcher,AutoCloseable { private ZooKeeper zooKeeper; private String businessName; private String znode; public ZkLock(String connectString,String businessName) throws IOException { this.zooKeeper = new ZooKeeper(connectString,30000,this); this.businessName = businessName; } public boolean getLock() throws KeeperException, InterruptedException { Stat existsNode = zooKeeper.exists("/" + businessName, false); if (existsNode == null){ zooKeeper.create("/" + businessName,businessName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } znode = zooKeeper.create("/" + businessName + "/" + businessName + "_", businessName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); znode = znode.substring(znode.lastIndexOf("/")+1); List<String> childrenNodes = zooKeeper.getChildren("/" + businessName, false); Collections.sort(childrenNodes); String firstNode = childrenNodes.get(0); if (!firstNode.equals(znode)){ String lastNode = firstNode; for (String node:childrenNodes){ if (!znode.equals(node)){ lastNode = node; }else { zooKeeper.exists("/"+businessName+"/"+lastNode,true); break; } } synchronized (this){ wait(); } } return true; } @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.NodeDeleted){ synchronized (this){ notify(); } } } @Override public void close() throws Exception { zooKeeper.delete("/"+businessName+"/"+znode,-1); zooKeeper.close(); log.info("我釋放了鎖"); } }
測試:
@RequestMapping("zkLock") public String zkLock(){ log.info("我進入了方法!"); try (ZkLock zkLock = new ZkLock("localhost:2181","order")){ if (zkLock.getLock()) { log.info("我進入了鎖!!"); Thread.sleep(15000); } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } log.info("方法執行完成"); return "方法執行完成"; }