只做記錄,直接上代碼
父類:
package com.ylcloud.common.lock; import com.alibaba.fastjson.JSON; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @author cjh * @Description: zk分布鎖 * @date: 2018/9/27 11:36 */ public class ZkLock { private static Logger logger = LogManager.getLogger(ZkLock.class); public static String ZOOKEEPER_IP_PORT = "127.0.0.1:2181"; public static Integer sessionTimeout = 30000; public static Integer connectTimeout = 30000; /** * 節點鎖標記 */ public String lockPath; /** * 前一個節點(設置用戶添加監聽器) */ public String beforeNode; /** * 當前執行節點(設置用於刪除) */ public String currentNode; /** * 當前請求節點 */ public String threadTag = null; private String lock1 = null; private String lock2 = null; public static final ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, sessionTimeout, connectTimeout, new SerializableSerializer()); private static final ThreadLocal<String> NODES = new ThreadLocal<String>(); public ZkLock() { } public void init(String code) { this.lockPath = code; this.lock1 = code + "LOCK"; this.lock2 = code + "UNLOCK"; client.deleteRecursive(lockPath); } public void lock() { synchronized (lock1) { if (!client.exists(lockPath)) { client.createPersistent(lockPath); } if (!tryLock()) { } } } public void unlock() { List<String> childrens = client.getChildren(lockPath); Collections.sort(childrens); String nodes = NODES.get(); logger.info(JSON.toJSONString(childrens) + " ==== " + nodes + " ==== " + (nodes.equals(lockPath + '/' + childrens.get(0)))); if (childrens.size() > 0 && nodes.equals(lockPath + '/' + childrens.get(0))) { client.delete(nodes); } } private boolean tryLock() { threadTag = client.createEphemeralSequential(lockPath + '/', ""); NODES.set(threadTag); List<String> childrens = client.getChildren(lockPath); Collections.sort(childrens); currentNode = lockPath + '/' + childrens.get(0); if (threadTag.equals(currentNode)) { return true; } else { currentNode = threadTag; int wz = Collections.binarySearch(childrens, threadTag.substring(lockPath.length() + 1)); beforeNode = lockPath + '/' + childrens.get(wz - 1); final CountDownLatch latch = new CountDownLatch(1); try { client.subscribeDataChanges(beforeNode, new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { if (latch != null && latch.getCount() > 0) { latch.countDown(); } } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }); if (client.exists(beforeNode)) { latch.await(sessionTimeout, TimeUnit.MILLISECONDS); } } catch (Exception e) { return true; } finally { } } return false; } }
子類
package com.ylcloud.common.lock.ext; import com.ylcloud.common.lock.ZkLock; /** * @Description: 用戶編碼鎖 * @author cjh * @date: 2018/10/10 14:47 */ public class ZkLockUserCode extends ZkLock { public ZkLockUserCode() { super.init("/USER_CODE"); } }
使用示例:
private ZkLock zkLock = new ZkLockUserCode(); public void addUser() { try { zkLock.lock(); /** *業務實現 */ } catch (Exception e) { logger.info("err {} {} ", e.getMessage(), e.getCause()); } finally { zkLock.unlock(); } }
注意:unlock必須寫在finally里面,否則一旦業務出現運行錯誤造成沒有解鎖,下一次訪問的人就需要等待一個sessionTime了
題外話:zk在linux上啟動命令 ./zkServer.sh start
自己實現的很多細節沒考慮到導致在高並發的項目中出現了問題,然后我改用了Curator框架來操控zookeeper實現分布鎖:
<curator.version>4.0.0</curator.version> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency>
鎖實現代碼:
package com.ylcloud.common.lock; import com.ylcloud.common.lock.ext.ZkLockRoleCode; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @author cjh * @Description: zk分布鎖 * @date: 2018/9/27 11:36 */ public class ZkLock { private static Logger logger = LogManager.getLogger(ZkLock.class); public static String ZOOKEEPER_IP_PORT = "127.0.0.1:2181"; public static String NAME_SPACE = "YL_CLOUD"; public static Integer sessionTimeout = 15000; public static Integer connectTimeout = 30000; private static Object initLock = new Object(); public static CuratorFramework client = null; private static Map<String,ZkLock> tagLocks = new HashMap<String,ZkLock>(); public ZkLock() { } public InterProcessMutex mutex = null; static { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(ZOOKEEPER_IP_PORT) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectTimeout) .retryPolicy(retryPolicy) .namespace(NAME_SPACE) .build(); client.start(); } public void init(String code) { this.mutex = new InterProcessMutex(client,code); } //prefix 標記 code 主鍵 public static ZkLock getTagLock(String prefix,String code){ String tag = prefix + "/" + code; if(!tagLocks.containsKey(tag)){ synchronized (initLock){ if(!tagLocks.containsKey(tag)){ ZkLock zkLock = new ZkLock(); zkLock.init(tag); tagLocks.put(tag,zkLock); } } } return tagLocks.get(tag); } public void lock(){ try { this.mutex.acquire(); } catch (Exception e) { logger.error("加鎖失敗"); } } public void unlock(){ try { this.mutex.release(); } catch (Exception e) { logger.error("鎖釋放失敗"); } } }
調用代碼(不變)
轉載請注明博客出處:http://www.cnblogs.com/cjh-notes/