海豚調度Dolphinscheduler源碼分析(二)


項目結構

模塊

  1. dolphinscheduler-ui 前端頁面模塊
  2. dolphinscheduler-server 核心模塊。包括master/worker等功能
  3. dolphinscheduler-common 公共模塊。公共方法或類
  4. dolphinscheduler-api Restful接口。前后端交互層,與master/worker交互等功能
  5. dolphinscheduler-dao 數據操作層。實體定義、數據存儲
  6. dolphinscheduler-alert 預警模塊。與預警相關的方法、功能
  7. dolphinscheduler-dist 與編譯、分發相關的模塊。沒有具體邏輯功能
  8. dolphinscheduler-microbench 基准測試
  9. dolphinscheduler-remote
  10. dolphinscheduler-service 核心模塊。zk客戶端,包括日志等功能1.3版本中不再使用 GRPC 進行通信了

 

先來看dolphinscheduler-service 模塊

 

 

 

zookeeper相關類源碼分析:

1.zookeeperConfig類 -- zk的相關配置

 

 

 

相關代碼入下:只粘貼了一些重要的

/**
 * zookeeper conf
 */
@Component
@PropertySource("classpath:zookeeper.properties")
public class ZookeeperConfig {

    //zk connect config
    @Value("${zookeeper.quorum:39.100.43.16:2181}")
    private String serverList;

    @Value("${zookeeper.retry.base.sleep:100}")
    private int baseSleepTimeMs;

    @Value("${zookeeper.retry.max.sleep:30000}")
    private int maxSleepMs;

    @Value("${zookeeper.retry.maxtime:10}")
    private int maxRetries;

    @Value("${zookeeper.session.timeout:60000}")
    private int sessionTimeoutMs;

    @Value("${zookeeper.connection.timeout:30000}")
    private int connectionTimeoutMs;

    @Value("${zookeeper.connection.digest: }")
    private String digest;

    @Value("${zookeeper.dolphinscheduler.root:/dolphinscheduler}")
    private String dsRoot;

    @Value("${zookeeper.max.wait.time:10000}")
    private int maxWaitTime;
}

 

其中大部分都是常見配置,如serverList,sessionTimeout 等,其中有一個配置沒有見過故單獨貼出來。

@Value("${zookeeper.connection.digest: }") private String digest;

 

這個是zookeeper的ACL權限認證方式,讀者可以自行查看相關資料,這里先不展開過多介紹:

相關連接:https://www.sohu.com/a/336744170_120173656

相關鏈接:https://www.cnblogs.com/dalianpai/p/12748144.html

相關鏈接:https://www.cnblogs.com/zwcry/p/10414001.html


 

 

2.CuratorZookeeperClient 類 初始化連接zookeeper客戶端類

 

首先介紹一個框架Curator,Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊Watcher和NodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”給Curator予高度評價。

Curator有“館長”,“管理者”之意。

Curator包含了幾個包:

  • curator-framework:對zookeeper的底層api的一些封裝。
  • curator-client:提供一些客戶端的操作,例如重試策略等。
  • curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分布式鎖、分布式計數器、分布式Barrier等。

相關鏈接:http://www.throwable.club/2018/12/16/zookeeper-curator-usage/

 

 

 

 

//聲明curator zk客戶端對象
    private CuratorFramework zkClient;

    //通過springboot 創建bean對象之前,對bean進行初始化
    @Override
    public void afterPropertiesSet() throws Exception {
        this.zkClient = buildClient();
        initStateLister();
    }
    //通過工廠創建Curator
    private CuratorFramework buildClient() {
        logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList());

        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null")))
                .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));

        //these has default value
        if (0 != zookeeperConfig.getSessionTimeoutMs()) {
            builder.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs());
        }
        if (0 != zookeeperConfig.getConnectionTimeoutMs()) {
            builder.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs());
        }
        //當配置文件中有digest認證方式,需要通過digest認證來獲取權限
        if (StringUtils.isNotBlank(zookeeperConfig.getDigest())) {
            builder.authorization("digest", zookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() {

                @Override
                public List<ACL> getDefaultAcl() {
                    return ZooDefs.Ids.CREATOR_ALL_ACL;
                }

                @Override
                public List<ACL> getAclForPath(final String path) {
                    return ZooDefs.Ids.CREATOR_ALL_ACL;
                }
            });
        }
        zkClient = builder.build();
        zkClient.start();
        try {
            zkClient.blockUntilConnected();
        } catch (final Exception ex) {
            throw new RuntimeException(ex);
        }
        return zkClient;
    }

 


 

 

3.ZookeeperOperator類 ——zookeeper數據節點操作類(創建節點,刪除節點,獲取節點等)

這個類主要是對CuratorZookeeperClient 類創建的zkClient 客戶端進行一些zk 數據階段操作。

Zookeeper的節點創建模式

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化並且帶序列號
  • EPHEMERAL:臨時
  • EPHEMERAL_SEQUENTIAL:臨時並且帶序列號

 

 

 

    //遞歸創建持久節點
    public void persist(final String key, final String value) {
        try {
            if (!isExisted(key)) {
                zookeeperClient.getZkClient().create().creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
            } else {
                update(key, value);
            }
        } catch (Exception ex) {
            logger.error("persist key : {} , value : {}", key, value, ex);
        }
    }
//創建遞歸順序臨時節點
    public void persistEphemeralSequential(final String key, String value) {
        try {
            zookeeperClient.getZkClient().create().creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
        } catch (final Exception ex) {
            logger.error("persistEphemeralSequential key : {}", key, ex);
        }
    }

 

其中這個creatingParentContainersIfNeeded()接口非常有用,因為一般情況開發人員在創建一個子節點必須判斷它的父節點是否存在,如果不存在直接創建會拋出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能夠自動遞歸創建所有所需的父節點。


 

 

4.ZookeeperCachedOperator類 緩存類

這個類繼承了 上面的ZookeeperOperator類,並加入監聽節點的cache方法

Zookeeper原生支持通過注冊Watcher來進行事件監聽,但是開發者需要反復注冊(Watcher只能單次注冊單次使用)。Cache是Curator中對事件監聽的包裝,可以看作是對事件監聽的本地緩存視圖,能夠自動為開發者處理反復注冊監聽。Curator提供了三種Watcher(Cache)來監聽結點的變化。

1.Path Cache

2.Node Cache

3.Tree Cache

Tree Cache可以監控整個樹上的所有節點,類似於PathCache和NodeCache的組合,主要涉及到下面四個類:

  • TreeCache - Tree Cache實現類
  • TreeCacheListener - 監聽器類
  • TreeCacheEvent - 觸發的事件類
  • ChildData - 節點數據

**注意:**TreeCache在初始化(調用start()方法)的時候會回調TreeCacheListener實例一個事TreeCacheEvent,而回調的TreeCacheEvent對象的Type為INITIALIZED,ChildData為null,此時event.getData().getPath()很有可能導致空指針異常,這里應該主動處理並避免這種情況。

 

海豚調度 主要是使用Tree Cache緩存 主要代碼入下:

//zk緩存類
//Curator提供了三種Watcher(Cache)來監聽結點的變化。
//        1.Path Cache
//        2.Node Cache
//        3.Tree Cache 海豚調度是使用了Tree Cache緩存
public class ZookeeperCachedOperator extends ZookeeperOperator {

    private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class);

    private TreeCache treeCache;
    /**
     * register a unified listener of /${dsRoot},
     * 在/dolphinscheduler下注冊監聽節點
     */
    @Override
    protected void registerListener() {
        treeCache = new TreeCache(getZkClient(), getZookeeperConfig().getDsRoot() + "/nodes");
        logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot());
        try {
            treeCache.start();
        } catch (Exception e) {
            logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
            throw new RuntimeException(e);
        }

        treeCache.getListenable().addListener((client, event) -> {
            String path = null == event.getData() ? "" : event.getData().getPath();
            if (path.isEmpty()) {
                return;
            }
            dataChanged(client, event, path);
        });

    }

 


 

 

5.AbstractZKClient類—— 抽象zk客戶端 (抽象類)

這個是對master,worker的抽象 

這個類繼承了上面第4個類,並且是個抽象類,具體的實現有ZKMasterClient和ZookeeperMonitor類,具體的UML圖入下

 

 

 

這個類有個分布式鎖

提醒:

1.推薦使用ConnectionStateListener監控連接的狀態,因為當連接LOST時你不再擁有鎖

2.分布式的鎖全局同步, 這意味着任何一個時間點不會有兩個客戶端都擁有相同的鎖。

可重入共享鎖—Shared Reentrant Lock

Shared意味着鎖是全局可見的, 客戶端都可以請求鎖。 Reentrant和JDK的ReentrantLock類似,即可重入, 意味着同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。 它是由類InterProcessMutex來實現。 它的構造函數為:

public InterProcessMutex(CuratorFramework client, String path)

這個類用到鎖的代碼入下:

/**

* 釋放鎖 * release mutex * @param mutex mutex */ public void releaseMutex(InterProcessMutex mutex) { if (mutex != null){ try { mutex.release(); } catch (Exception e) { if("instance must be started before calling this method".equals(e.getMessage())){ logger.warn("lock release"); }else{ logger.error("lock release failed",e); } } } }

 

另外這個獲取節點父路徑的比較有意思:

/**
     *
     * @param zkNodeType zookeeper node type
     * @return get zookeeper node parent path
     */
    public String getZNodeParentPath(ZKNodeType zkNodeType) {
        String path = "";
        switch (zkNodeType){
            case MASTER:
                return getMasterZNodeParentPath();
            case WORKER:
                return getWorkerZNodeParentPath();
            case DEAD_SERVER:
                return getDeadZNodeParentPath();
            default:
                break;
        }
        return path;
    }

 

另一個特殊的地方是,官方文檔說是創建master和worker節點都為臨時節點,但是下面創建父路徑的代碼確是持久節點,而且zookeeper中,節點也為持久節點,可能是后期代碼迭代,將znode節點改為了永久節點,但不知道其用意,等待后面的學習。

 

/**
     *  init system znode
     */
    protected void initSystemZNode(){
        try {
            persist(getMasterZNodeParentPath(), "");
            persist(getWorkerZNodeParentPath(), "");
            persist(getDeadZNodeParentPath(), "");

            logger.info("initialize server nodes success.");
        } catch (Exception e) {
            logger.error("init system znode failed",e);
        }
    }

persist這個方法之前介紹過,為創建持久節點,所以創建的Master,worker,deadznode節點都為持久節點。
查看下zookeeper的節點狀態,發現都為持久節點:

 

 
        

 

 

 


好了,今天基本介紹了service模塊下的zk相關代碼,今天的源碼閱讀就到這里了!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM