一、簡介
鎖的概念,在Java日常開發和面試中,都是個很重要的知識點。鎖能很好的控制生產數據的安全性,比如商品的數量超賣問題等。傳統的做法中,可以直接利用數據庫鎖(行鎖或者表鎖)來進行數據訪問控制。隨着請求量逐步變多的情況下,將壓力懟到數據庫上會對其性能產生極大影響。這時候,單體應用中可以利用JVM鎖,在程序層面進行訪問的控制,將壓力前移,對數據庫友好。當請求量再進一步變多,這時候一般會考慮集群分布式去處理,不斷的加機器來抗壓。這時候,JVM鎖就不能很好的控制壓力了,同一時刻還是會有大量請求懟到數據庫上,這時就需要提升為分布式鎖去控制了,將壓力繼續停留在程序層面。
Java的面向接口編程,可以很好很快的去切換實現而不需要動業務代碼部分。下面,基於Lock接口去使用鎖。
zookeeper的集群搭建:https://www.cnblogs.com/eric-fang/p/9283904.html
二、JVM鎖
基於ReentrantLock實現鎖控制,業務控制層service部分代碼如下,用 lock 鎖去控制並發訪問
package com.cfang.service; import java.sql.Time; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; import com.cfang.dao.ProductDao; import lombok.extern.slf4j.Slf4j; @Service @Slf4j @Scope("prototype") public class ProductWithLockService { private Lock lock = new ReentrantLock(); @Autowired private ProductDao productDao; @Transactional public boolean buy(String userName, String productname, int number) { boolean result = false; try { lock.lock(); // TimeUnit.SECONDS.sleep(1); log.info("用戶{}欲購買{}個{}", userName, number, productname); int stock = productDao.getStock(productname); log.info("{} 查詢數量{}...", userName, stock); if(stock < number) { log.warn("庫存不足..."); return false; } result = productDao.buy(userName, productname, number); } catch (Exception e) { } finally { log.info("{} 釋放鎖...", userName); lock.unlock(); } log.info("{}購買結果,{}",userName, result); return result; } }
在單體應用中,這樣子使用是可以的,但是當應用部署多套的時候,那么,就不能很好的保障並發控制了,同一時刻的請求可能會大量打到數據庫上。所以,這就引入下面的分布式鎖去控制了。
三、基於ZooKeeper的分布式鎖
首先,鎖獲取釋放的工具類:
package com.cfang.zkLockUtil; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.commons.lang3.StringUtils; import com.cfang.zkClient.MyZkSerializer; import lombok.extern.slf4j.Slf4j; @Slf4j public class ZkLockUtil implements Lock{ private String znode; private ZkClient zkClient; public ZkLockUtil(String znode) { if(StringUtils.isBlank(znode)) { throw new IllegalArgumentException("鎖節點znode不能為空字符串"); } this.znode = znode; this.zkClient = new ZkClient("111.231.51.200:2181,111.231.51.200:2182,111.231.51.200:2183"); this.zkClient.setZkSerializer(new MyZkSerializer()); } @Override public void lock() { if(!tryLock()) { //搶鎖失敗 // 阻塞等待鎖節點的釋放 waitLock(); //遞歸調用,重新嘗試去搶占鎖 lock(); } } private void waitLock() { CountDownLatch latch = new CountDownLatch(1); // 注冊監聽znode鎖節點變化,當刪除的時候,說明鎖被釋放 IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { log.info("znode節點被刪除,鎖釋放..."); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; this.zkClient.subscribeDataChanges(this.znode, listener); try { // 阻塞等待鎖znode節點的刪除釋放 if(this.zkClient.exists(znode)) { latch.await(); } } catch (Exception e) { } //取消znode節點監聽 this.zkClient.unsubscribeDataChanges(this.znode, listener); } @Override public boolean tryLock() { boolean result = false; try { this.zkClient.createEphemeral(znode); //創建臨時節點 result = true; } catch (ZkNodeExistsException e) { log.warn("鎖節點znode已存在,搶占失敗..."); result = false; } catch (Exception e) { log.warn("創建鎖節點znode異常,{}...", e.getMessage()); } return result; } @Override public void unlock() { zkClient.delete(znode); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } }
業務控制service中,就是將基本的JVM鎖的service中,Lock的實現更換即可:
private Lock lock = new ZkLockUtil("/p1node");
當程序運行中,所有的請求會去爭搶創建zk節點,誰創建成功,則就獲得鎖資源,繼續執行業務代碼。其他所有線程基於遞歸等待,等待zk節點的刪除,然后再去嘗試爭搶創建。達到控制並發的目的。
但是,這種但是有個不好的地方,也就是,當一個鎖釋放后,所有的線程都會一下子全去爭搶,每次都是輪回這樣哄搶的過程,會有一定的壓力,也不必如此。所以,下面基於zk永久節點下臨時順序節點做點改善,每個線程節點,只需要關注前面一個節點變化即可,不需要造成哄搶事件。
四、ZooKeeper的分布式鎖提高版
鎖獲取釋放的工具類:
package com.cfang.zkLockUtil; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.apache.commons.lang3.StringUtils; import com.cfang.zkClient.MyZkSerializer; import lombok.extern.slf4j.Slf4j; @Slf4j public class ZKLockImproveUtil implements Lock{ private String znode; private ZkClient zkClient; private ThreadLocal<String> currentNode = new ThreadLocal<String>(); //當前節點 private ThreadLocal<String> beforeNode = new ThreadLocal<String>(); //前一個節點 public ZKLockImproveUtil(String znode) { if(StringUtils.isBlank(znode)) { throw new IllegalArgumentException("鎖節點znode不能為空字符串"); } this.znode = znode; this.zkClient = new ZkClient("111.231.51.200:2181,111.231.51.200:2182,111.231.51.200:2183"); this.zkClient.setZkSerializer(new MyZkSerializer()); try { if(!this.zkClient.exists(znode)) { this.zkClient.createPersistent(znode, true); // true是否創建層級目錄 } } catch (Exception e) { } } @Override public void lock() { if(!tryLock()) { waitLock(); lock(); } } private void waitLock() { CountDownLatch latch = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { log.info("{}節點刪除,鎖釋放...", dataPath); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; this.zkClient.subscribeDataChanges(beforeNode.get(), listener); try { if(this.zkClient.exists(beforeNode.get())) { latch.await(); } } catch (Exception e) { } this.zkClient.unsubscribeDataChanges(beforeNode.get(), listener); } @Override public boolean tryLock() { boolean result = false; // 創建順序臨時節點 if(null == currentNode.get() || !this.zkClient.exists(currentNode.get())) { String enode = this.zkClient.createEphemeralSequential(znode + "/", "zk-locked"); this.currentNode.set(enode); } // 獲取znode節點下的所有子節點 List<String> list = this.zkClient.getChildren(znode); Collections.sort(list); /** * 如果當前節點是第一個的話,則是為獲取鎖,繼續執行 * 不是頭結點的話,則去查詢其前面一個節點,然后准備監聽前一個節點的刪除釋放操作 */ if(currentNode.get().equals(this.znode + "/" + list.get(0))) { log.info("{}節點為頭結點,獲得鎖...", currentNode.get()); result = true; } else { int currentIndex = list.indexOf(currentNode.get().substring(this.znode.length() + 1)); String bnode = this.znode + "/" + list.get(currentIndex - 1); this.beforeNode.set(bnode); } return result; } @Override public void unlock() { if(null != this.currentNode) { this.zkClient.delete(currentNode.get()); this.currentNode.set(null); } } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } }
service中更換實現:
private Lock lock = new ZKLockImproveUtil("/pnode");
五、小結
主要是學習測試使用,並未考慮到生產實際的問題,比如 如果業務處理中假死狀態,導致zk不釋放鎖,那么就會導致死鎖問題(可以對鎖節點來個有效期處理)。
上述為部分代碼片段,整體工程可以在github上獲取,地址:https://github.com/qiuhan00/zkLock