海豚調度Dolphinscheduler源碼分析(三)


今天繼續分析海豚調度的源碼

上回分析的是dolphinscheduler-service模塊zookeeper相關的代碼

這回分析是dolphinscheduler-server模塊zookeeper相關的代碼

 

ZkMasterClient master服務zk客戶端類

類繼承的關系如下:

 

 

 

這個類的方法如下:

 

 

 

方法介紹:

  • start()  根據路徑dolphinscheduler/lock/failover/master 創建一個分布式鎖,並進行初始化,檢查是否有master節點競爭鎖,確保只有一個主master,如果只有一個master節點,那么無法進行master服務的故障轉移
  • dataChange() 變更zk節點
  • removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) 移除zookeeper 節點,並在/dead路徑添加節點,並會判斷是否需要容錯
  • handleDeadServer()  父類方法,就是處理宕機服務的zookeeper路徑,將獲取節點刪除,添加/dead路徑數據
  • failoverServerWhenDown() 當服務宕機后,轉移服務,分為master服務和server服務
  • checkTaskInstanceNeedFailover()
  • failoverWorker()  將worker上的task任務進行故障轉移
    • 如果是yarn任務,干掉yarn任務
    • 將任務狀態變更為需要故障轉移
    •   當工作節點全部為null時,將所有任務進行故障轉移

  

zk分布式鎖獲取代碼如下:

public void start() {

//Curator是zk的一個客戶端框架,其中分裝了分布式公平可重入互斥鎖,最為常見是InterProcessMutex
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
///根據這個路徑dolphinscheduler/lock/failover/master 創建一個分布式鎖
String znodeLock = getMasterStartUpLockPath();
//InterProcessMutex的構造方法,需要一個客戶端和路徑
mutex = new InterProcessMutex(getZkClient(), znodeLock);
//獲取鎖,鎖的獲取,最后必須釋放
mutex.acquire();

// init system znode
this.initSystemZNode();

    //檢查是否有master節點
while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}


// self tolerant
//如果活動的master節點只有1個,無法進行master服務的容錯,故failoverMaster(null)
if (getActiveMasterNum() == 1) {
failoverWorker(null, true);
failoverMaster(null);
}

}catch (Exception e){
logger.error("master start up exception",e);
}finally {
//釋放鎖,這個方法是父類AbstractZKClient的,在finally中釋放,保證鎖最后能夠釋放
releaseMutex(mutex);
}
}

 

對於InterProcessMutex,CuratorZooKeeper的一個客戶端框架,其中封裝了分布式互斥鎖的實現,最為常用的是InterProcessMutex

InterProcessMutex基於Zookeeper實現了分布式的公平可重入互斥鎖,類似於單個JVM進程內的ReentrantLock(fair=true)

全局同步的可重入分布式鎖,任何時刻不會有兩個客戶端同時持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味着同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞

相關鏈接:https://blog.csdn.net/hosaos/article/details/89521537

相關鏈接:https://www.cnblogs.com/a-du/p/9876314.html

相關鏈接:https://blog.csdn.net/qq_34021712/article/details/82878396

主要方法:

//獲取鎖,若失敗則阻塞等待直到成功,支持重入
public void acquire() throws Exception
//超時獲取鎖,超時失敗
public boolean acquire(long time, TimeUnit unit) throws Exception
//釋放鎖,一般在finally中釋放
public void release() throws Exception

注意點,調用acquire()方法后需相應調用release()來釋放鎖

 

private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) 移除zk節點
 
         

      /**     

     * remove zookeeper node path

   * * @param path zookeeper node path * @param zkNodeType zookeeper node type * @param failover is failover */ private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { logger.info("{} node deleted : {}", zkNodeType.toString(), path); InterProcessMutex mutex = null; try { String failoverPath = getFailoverLockPath(zkNodeType); // create a distributed lock mutex = new InterProcessMutex(getZkClient(), failoverPath); mutex.acquire(); String serverHost = getHostByEventDataPath(path); // handle dead server
       //處理 宕機服務,刪除原來節點,在dead路徑增加節點,
handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); //failover server
       //是否故障轉移服務
if(failover){ failoverServerWhenDown(serverHost, zkNodeType); } }catch (Exception e){ logger.error("{} server failover failed.", zkNodeType.toString()); logger.error("failover exception ",e); } finally { releaseMutex(mutex); } }

 


 

zookeeper分布式鎖詳解

在分布式環境中 ,為了保證數據的一致性,經常在程序的某個運行點(例如,減庫存操作或者流水號生成等)需要進行同步控制。以一個"流水號生成"的場景為例,普通的后台應用通常都是使用時間戳來生成流水號,但是在用戶訪問量很大的情況下,可能會出現並發問題。下面通過示例程序就演示一個典型的並發問題:

public static void main(String[] args) throws Exception {

    CountDownLatch down = new CountDownLatch(1);
    for (int i=0;i<10;i++){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    down.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                String orderNo = sdf.format(new Date());
                System.out.println("生成的訂單號是:"+orderNo);
            }
        }).start();
    }
    down.countDown();
}

 

輸出結果如下:

Thread[Thread-8,5,main]生成的訂單號是:14:41:26|098
Thread[Thread-4,5,main]生成的訂單號是:14:41:26|107
Thread[Thread-9,5,main]生成的訂單號是:14:41:26|108
Thread[Thread-3,5,main]生成的訂單號是:14:41:26|108
Thread[Thread-0,5,main]生成的訂單號是:14:41:26|108
Thread[Thread-6,5,main]生成的訂單號是:14:41:26|108
Thread[Thread-7,5,main]生成的訂單號是:14:41:26|108
Thread[Thread-2,5,main]生成的訂單號是:14:41:26|108
Thread[Thread-5,5,main]生成的訂單號是:14:41:26|108
Thread[Thread-1,5,main]生成的訂單號是:14:41:26|108

不難發現,生成的10個訂單不少都是重復的,如果是實際的生產環境中,這顯然沒有滿足我們的也無需求。究其原因,就是因為在沒有進行同步的情況下,出現了並發問題。下面我們來看看如何使用Curator實現分布式鎖功能。

 

Shared Reentrant Lock(分布式可重入鎖)

全局同步的可重入分布式鎖,任何時刻不會有兩個客戶端同時持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味着同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。

相關的類

InterProcessMutex

使用

創建InterProcessMutex實例
InterProcessMutex提供了兩個構造方法,傳入一個CuratorFramework實例和一個要使用的節點路徑,InterProcessMutex還允許傳入一個自定義的驅動類,默認是使用StandardLockInternalsDriver。

public InterProcessMutex(CuratorFramework client, String path);
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver);
獲取鎖

使用acquire方法獲取鎖,acquire方法有兩種:

public void acquire() throws Exception;

獲取鎖,一直阻塞到獲取到鎖為止。獲取鎖的線程在獲取鎖后仍然可以調用acquire() 獲取鎖(可重入)。 鎖獲取使用完后,調用了幾次acquire(),就得調用幾次release()釋放。

public boolean acquire(long time, TimeUnit unit) throws Exception;

與acquire()類似,等待time * unit時間獲取鎖,如果仍然沒有獲取鎖,則直接返回false。

釋放鎖

使用release()方法釋放鎖
線程通過acquire()獲取鎖時,可通過release()進行釋放,如果該線程多次調用 了acquire()獲取鎖,則如果只調用 一次release()該鎖仍然會被該線程持有。

注意:同一個線程中InterProcessMutex實例是可重用的,也就是不需要在每次獲取鎖的時候都new一個InterProcessMutex實例,用同一個實例就好。

鎖撤銷

InterProcessMutex 支持鎖撤銷機制,可通過調用makeRevocable()將鎖設為可撤銷的,當另一線程希望你釋放該鎖時,實例里的listener會被調用。 撤銷機制是協作的。

 

示例代碼(官網)

共享資源

public class FakeLimitedResource {

    //總共250張火車票
    private Integer ticket = 250;

    public void use() throws InterruptedException {
        try {
            System.out.println("火車票還剩"+(--ticket)+"張!");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

使用鎖操作資源

public class ExampleClientThatLocks {

    /***/
    private final InterProcessMutex lock;
    /** 共享資源 */
    private final FakeLimitedResource resource;
    /** 客戶端名稱 */
    private final String clientName;

    public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if ( !lock.acquire(time, unit) ) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " has the lock");
            //操作資源
            resource.use();
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); //總是在Final塊中釋放鎖。
        }
    }
}

客戶端

public class LockingExample {
    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String CONNECTION_STRING = "172.20.10.9:2181";
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {

        //FakeLimitedResource模擬某些外部資源,這些外部資源一次只能由一個進程訪問
        final FakeLimitedResource resource = new FakeLimitedResource();

        ExecutorService service = Executors.newFixedThreadPool(QTY);
        try {
            for ( int i = 0; i < QTY; ++i ){
                final int index = i;
                Callable<Void>  task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));
                        try {
                            client.start();
                            ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index);
                            for ( int j = 0; j < REPETITIONS; ++j ) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        }catch ( InterruptedException e ){
                            Thread.currentThread().interrupt();
                        }catch ( Exception e ){
                            e.printStackTrace();
                        }finally{
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

 

起五個線程,即五個窗口賣票,五個客戶端分別有50張票可以賣,先是嘗試獲取鎖,操作資源后,釋放鎖。

轉自:https://blog.csdn.net/qq_34021712/article/details/82878396


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM