今天繼續分析海豚調度的源碼
上回分析的是dolphinscheduler-service模塊zookeeper相關的代碼
這回分析是dolphinscheduler-server模塊zookeeper相關的代碼
ZkMasterClient master服務zk客戶端類
類繼承的關系如下:
這個類的方法如下:
方法介紹:
- start() 根據路徑dolphinscheduler/lock/failover/master 創建一個分布式鎖,並進行初始化,檢查是否有master節點競爭鎖,確保只有一個主master,如果只有一個master節點,那么無法進行master服務的故障轉移
- dataChange() 變更zk節點
- removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) 移除zookeeper 節點,並在/dead路徑添加節點,並會判斷是否需要容錯
- handleDeadServer() 父類方法,就是處理宕機服務的zookeeper路徑,將獲取節點刪除,添加/dead路徑數據
- failoverServerWhenDown() 當服務宕機后,轉移服務,分為master服務和server服務
- checkTaskInstanceNeedFailover()
- failoverWorker() 將worker上的task任務進行故障轉移
- 如果是yarn任務,干掉yarn任務
- 將任務狀態變更為需要故障轉移
- 當工作節點全部為null時,將所有任務進行故障轉移
zk分布式鎖獲取代碼如下:
public void start() {
//Curator是zk的一個客戶端框架,其中分裝了分布式公平可重入互斥鎖,最為常見是InterProcessMutex
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
///根據這個路徑dolphinscheduler/lock/failover/master 創建一個分布式鎖
String znodeLock = getMasterStartUpLockPath();
//InterProcessMutex的構造方法,需要一個客戶端和路徑
mutex = new InterProcessMutex(getZkClient(), znodeLock);
//獲取鎖,鎖的獲取,最后必須釋放
mutex.acquire();
// init system znode
this.initSystemZNode();
//檢查是否有master節點
while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
// self tolerant
//如果活動的master節點只有1個,無法進行master服務的容錯,故failoverMaster(null)
if (getActiveMasterNum() == 1) {
failoverWorker(null, true);
failoverMaster(null);
}
}catch (Exception e){
logger.error("master start up exception",e);
}finally {
//釋放鎖,這個方法是父類AbstractZKClient的,在finally中釋放,保證鎖最后能夠釋放
releaseMutex(mutex);
}
}
對於InterProcessMutex,Curator
是ZooKeeper
的一個客戶端框架,其中封裝了分布式互斥鎖
的實現,最為常用的是InterProcessMutex
InterProcessMutex
基於Zookeeper
實現了分布式的公平可重入互斥鎖
,類似於單個JVM進程內的ReentrantLock(fair=true)
全局同步的可重入分布式鎖,任何時刻不會有兩個客戶端同時持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味着同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞
相關鏈接:https://blog.csdn.net/hosaos/article/details/89521537
相關鏈接:https://www.cnblogs.com/a-du/p/9876314.html
相關鏈接:https://blog.csdn.net/qq_34021712/article/details/82878396
主要方法:
//獲取鎖,若失敗則阻塞等待直到成功,支持重入 public void acquire() throws Exception //超時獲取鎖,超時失敗 public boolean acquire(long time, TimeUnit unit) throws Exception //釋放鎖,一般在finally中釋放 public void release() throws Exception
注意點,調用acquire()方法后需相應調用release()來釋放鎖
private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) 移除zk節點
/**
* remove zookeeper node path
* * @param path zookeeper node path * @param zkNodeType zookeeper node type * @param failover is failover */ private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { logger.info("{} node deleted : {}", zkNodeType.toString(), path); InterProcessMutex mutex = null; try { String failoverPath = getFailoverLockPath(zkNodeType); // create a distributed lock mutex = new InterProcessMutex(getZkClient(), failoverPath); mutex.acquire(); String serverHost = getHostByEventDataPath(path); // handle dead server
//處理 宕機服務,刪除原來節點,在dead路徑增加節點,
handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); //failover server
//是否故障轉移服務
if(failover){ failoverServerWhenDown(serverHost, zkNodeType); } }catch (Exception e){ logger.error("{} server failover failed.", zkNodeType.toString()); logger.error("failover exception ",e); } finally { releaseMutex(mutex); } }
zookeeper分布式鎖詳解
在分布式環境中 ,為了保證數據的一致性,經常在程序的某個運行點(例如,減庫存操作或者流水號生成等)需要進行同步控制。以一個"流水號生成"的場景為例,普通的后台應用通常都是使用時間戳來生成流水號,但是在用戶訪問量很大的情況下,可能會出現並發問題。下面通過示例程序就演示一個典型的並發問題:
public static void main(String[] args) throws Exception { CountDownLatch down = new CountDownLatch(1); for (int i=0;i<10;i++){ new Thread(new Runnable() { @Override public void run() { try { down.await(); } catch (InterruptedException e) { e.printStackTrace(); } SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); String orderNo = sdf.format(new Date()); System.out.println("生成的訂單號是:"+orderNo); } }).start(); } down.countDown(); }
輸出結果如下:
Thread[Thread-8,5,main]生成的訂單號是:14:41:26|098 Thread[Thread-4,5,main]生成的訂單號是:14:41:26|107 Thread[Thread-9,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-3,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-0,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-6,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-7,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-2,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-5,5,main]生成的訂單號是:14:41:26|108 Thread[Thread-1,5,main]生成的訂單號是:14:41:26|108
不難發現,生成的10個訂單不少都是重復的,如果是實際的生產環境中,這顯然沒有滿足我們的也無需求。究其原因,就是因為在沒有進行同步的情況下,出現了並發問題。下面我們來看看如何使用Curator實現分布式鎖功能。
Shared Reentrant Lock(分布式可重入鎖)
全局同步的可重入分布式鎖,任何時刻不會有兩個客戶端同時持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味着同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。
相關的類
InterProcessMutex
使用
創建InterProcessMutex實例
InterProcessMutex提供了兩個構造方法,傳入一個CuratorFramework實例和一個要使用的節點路徑,InterProcessMutex還允許傳入一個自定義的驅動類,默認是使用StandardLockInternalsDriver。
public InterProcessMutex(CuratorFramework client, String path); public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver);
獲取鎖
使用acquire方法獲取鎖,acquire方法有兩種:
public void acquire() throws Exception;
獲取鎖,一直阻塞到獲取到鎖為止。獲取鎖的線程在獲取鎖后仍然可以調用acquire() 獲取鎖(可重入)。 鎖獲取使用完后,調用了幾次acquire(),就得調用幾次release()釋放。
public boolean acquire(long time, TimeUnit unit) throws Exception;
與acquire()類似,等待time * unit時間獲取鎖,如果仍然沒有獲取鎖,則直接返回false。
釋放鎖
使用release()方法釋放鎖
線程通過acquire()獲取鎖時,可通過release()進行釋放,如果該線程多次調用 了acquire()獲取鎖,則如果只調用 一次release()該鎖仍然會被該線程持有。
注意:同一個線程中InterProcessMutex實例是可重用的,也就是不需要在每次獲取鎖的時候都new一個InterProcessMutex實例,用同一個實例就好。
鎖撤銷
InterProcessMutex 支持鎖撤銷機制,可通過調用makeRevocable()將鎖設為可撤銷的,當另一線程希望你釋放該鎖時,實例里的listener會被調用。 撤銷機制是協作的。
示例代碼(官網)
共享資源
public class FakeLimitedResource { //總共250張火車票 private Integer ticket = 250; public void use() throws InterruptedException { try { System.out.println("火車票還剩"+(--ticket)+"張!"); }catch (Exception e){ e.printStackTrace(); } } }
使用鎖操作資源
public class ExampleClientThatLocks { /** 鎖 */ private final InterProcessMutex lock; /** 共享資源 */ private final FakeLimitedResource resource; /** 客戶端名稱 */ private final String clientName; public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; lock = new InterProcessMutex(client, lockPath); } public void doWork(long time, TimeUnit unit) throws Exception { if ( !lock.acquire(time, unit) ) { throw new IllegalStateException(clientName + " could not acquire the lock"); } try { System.out.println(clientName + " has the lock"); //操作資源 resource.use(); } finally { System.out.println(clientName + " releasing the lock"); lock.release(); //總是在Final塊中釋放鎖。 } } }
客戶端
public class LockingExample { private static final int QTY = 5; private static final int REPETITIONS = QTY * 10; private static final String CONNECTION_STRING = "172.20.10.9:2181"; private static final String PATH = "/examples/locks"; public static void main(String[] args) throws Exception { //FakeLimitedResource模擬某些外部資源,這些外部資源一次只能由一個進程訪問 final FakeLimitedResource resource = new FakeLimitedResource(); ExecutorService service = Executors.newFixedThreadPool(QTY); try { for ( int i = 0; i < QTY; ++i ){ final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE)); try { client.start(); ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index); for ( int j = 0; j < REPETITIONS; ++j ) { example.doWork(10, TimeUnit.SECONDS); } }catch ( InterruptedException e ){ Thread.currentThread().interrupt(); }catch ( Exception e ){ e.printStackTrace(); }finally{ CloseableUtils.closeQuietly(client); } return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); }catch (Exception e){ e.printStackTrace(); } } }
起五個線程,即五個窗口賣票,五個客戶端分別有50張票可以賣,先是嘗試獲取鎖,操作資源后,釋放鎖。
轉自:https://blog.csdn.net/qq_34021712/article/details/82878396