【niubi-job——一個分布式的任務調度框架】----框架設計原理以及實現


引言

  

  niubi-job的框架設計是非常簡單實用的一套設計,去掉了很多其它調度框架中,錦上添花但並非必須的組件,例如MQ消息通訊組件(kafka等)。它的框架設計核心思想是,讓每一個jar包可以相對之間獨立的運行,並且由zk輔助進行集群中任務的調度。

  接下來,咱們就一步一步的來看下niubi-job整個的框架設計與實現。

  

框架設計概述

  

  講解之前,讓我們先來看一張niubi-job的框架設計圖。如下。

  可以看到,該圖的結構非常簡單,只有四個部分組成。

  1、web控制台:負責發布任務,監控任務的狀態信息,上傳jar包到jar包倉庫,將部分任務運行信息持久化到數據庫。

  2、zk集群:協調整個節點的運行,並且充當了幫web控制台和節點之間傳遞消息的角色。

  3、cluster節點(node):負責任務的運行,日志的打印和搜集。

  4、jar倉庫:jar倉庫默認是在web控制台的job目錄下,cluster節點會先檢查本地的job目錄下有沒有相應的jar包,沒有的話會從jar倉庫下載到本地。

  整個框架的設計還是比較簡單的,而且這里面也沒有什么技術難點,准確的說,niubi-job是zk和quartz組裝起來的一個更加方便使用的框架。

  

模塊依賴關系

  

  看完了上面的框架設計圖,接下來咱們看一下niubi-job的各個模塊之間的依賴關系,如下。

  圖還是比較簡單的,可以看出,niubi-job整個依賴關系主要分成兩條線,一條的頂端是console,另一條的頂端是cluster。下面LZ就分別介紹一下各個模塊的作用。

  core:主要包含一些工具類,異常類等。

  scanner:非常核心的一個模塊,包含了任務注解,jar包掃描,任務的抽象定義以及類加載器。

  persistent:負責持久化的模塊。

  api:負責與zk交互的模塊。

  scheduler:封裝了quartz。

  spring:包含了任務在spring容器里運行所需要的類。

  cluster:核心模塊,是集群節點的啟動程序,包含了集群調度策略。

  service:console項目的service層,集成了持久化,zk監控以及jar包掃描任務入庫。

  console:后台web項目,提供簡易的Web-UI給用戶使用。用戶可以在上面查看節點和任務等相關信息,也可以對任務進行啟動、暫停、重啟等操作。

  

Container與Node

  

  以上的內容都是niubi-job框架的全貌俯瞰,接下來LZ帶大家一起進到代碼里看看細節。在這個過程中,大家可以體會到LZ設計框架時的一些想法。

  看到本段的小標題,大家不難猜到,這一小段要講的主要就是兩個,一個是container,一個就是node。

  container是LZ抽象出來用來容納jar包運行環境的接口,而node接口則是代表一個獨立運行的集群節點,二者的關系是一對多。也就是說,一個node里面可能會包含多個container,這樣,一個節點就可以運行多個jar包中的任務。

  我們來看看container接口和Node接口的源碼。

package com.zuoxiaolong.niubi.job.scheduler.container;

import com.zuoxiaolong.niubi.job.scheduler.SchedulerManager;

/**
 * @author Xiaolong Zuo
 * @since 0.9.3
 */
public interface Container {

    SchedulerManager schedulerManager();

}
package com.zuoxiaolong.niubi.job.scheduler.node;

/**
 * @author Xiaolong Zuo
 * @since 0.9.3
 */
public interface Node {

    void join();

    void exit();

}

  Container接口非常簡單,每一個Container都會包含一個SchedulerManager接口,SchedulerManager接口其實就是quartz中Scheduler的升級版。每一個Container利用SchedulerManager就可以進行任務的調度。

  當然,Container當中還有兩個比較重要的角色,一個是JobScanner,一個是JobBeanFactory。這兩個,其中JobScanner是用來掃描jar包的,而JobBeanFactory是用來實例化Job的,有了JobBeanFactory的機制,可以很容易與spring集成。

  頂級的Node接口只有兩個方法,join方法代表着加入集群和啟動的含義,exit代表着退出集群或者關閉的含義。在Node的頂級接口中並沒有體現出與container的關系,這是因為當模式為單機模式時,一個Node只有一個container,但是當為集群模式時(standby和master-slave模式),一個Node卻對應了多個container。

  在解釋Node與Container的關系之前,咱們先來看一個類圖。

  由於畫圖工具受限,LZ沒有在類圖中標注出Node接口的兩個重要子接口,接下來咱們來看看這兩個子接口的代碼。

package com.zuoxiaolong.niubi.job.cluster.node;

import com.zuoxiaolong.niubi.job.scheduler.container.Container;
import com.zuoxiaolong.niubi.job.scheduler.node.Node;

/**
 * @author Xiaolong Zuo
 * @since 0.9.3
 */
public interface RemoteJobNode extends Node {

    Container getContainer(String jarFileName, String packagesToScan, boolean isSpring);

}
package com.zuoxiaolong.niubi.job.scheduler.node;

import com.zuoxiaolong.niubi.job.scheduler.container.Container;

/**
 * @author Xiaolong Zuo
 * @since 0.9.3
 */
public interface LocalJobNode extends Node {

    Container getContainer();

}

  從這兩個Node接口的方法定義可以看出,RemoteJobNode接口代表着Node與Container的關系為一對多,而LocalJobNode與Node則是一對一的關系。

  這里LocalJobNode主要代表的就是單機模式的Node,在單機模式下,一個Node只對應一個運行環境,因此這種Node只有一個Container。

  而RemoteJobNode則不同,它可以根據jarFileName取出不同的Container,這意味着RemoteJobNode的實現類應該包含一個Map<String,Container>,其中key是jar文件的名稱,而value則是對應的Container。如此一來,不同的jar文件,將使用不同的Container。

  我們來看下Node這個接口的繼承樹,如下圖。

  可以看到,Node的實現一共有四種。SimpleLocalJobNode是對應的單機版的非spring環境的實現,SimpleSpringJobNode是對應的單機版的spring環境的實現。

  另外兩種實現則都是集群模式的實現,其中StandbyNode代表的是主備模式,而MasterSlaveNode代表的是主從實現。StandbyNode和MasterSlaveNode這兩個類的代碼,是整個niubi-job集群實現的核心。

  所以接下來,LZ就帶大家來看看這兩個類的代碼,至於SimpleLocalJobNode和SimpleSpringJobNode,它們都是單機版的實現,實現相對簡單,LZ這里就不詳細介紹了,有興趣的同學可以去看看這兩個類的源碼。

  

 standby模式實現

  

  standby模式(主備模式),顧名思義,在standby模式中,集群當中只有一個節點在運行任務。只有當這個節點掛掉的時候,其余節點才會接替它去運行需要啟動的任務。

  要保證這一點,借助zk可以很容易的做到,只需要讓Master節點來運行任務,其它Backup節點都待命就可以了。接下來LZ就帶大家來看下niubi-job的源碼,看它是如何借助zk達到standby的效果的。

  拋開其它代碼,我們主要來看StandbyNode的代碼中的兩個listener,這兩個listener是集群調度的核心。首先來看第一個listener。

 1 private class StandbyLeadershipSelectorListener extends AbstractLeadershipSelectorListener {
 2 
 3         @Override
 4         public void acquireLeadership() throws Exception {
 5             StandbyNodeData.Data nodeData = new StandbyNodeData.Data(getIp());
 6             int runningJobCount = startupJobs();
 7             nodeData.setRunningJobCount(runningJobCount);
 8             nodeData.setState("Master");
 9             standbyApiFactory.nodeApi().updateNode(nodePath, nodeData);
10             LoggerHelper.info(getIp() + " has been updated. [" + nodeData + "]");
11             jobCache.start();
12         }
13 
14         private Integer startupJobs() {
15             List<StandbyJobData> standbyJobDataList = standbyApiFactory.jobApi().getAllJobs();
16             int runningJobCount = 0;
17             if (ListHelper.isEmpty(standbyJobDataList)) {
18                 return runningJobCount;
19             }
20             for (StandbyJobData standbyJobData : standbyJobDataList) {
21                 try {
22                     StandbyJobData.Data data = standbyJobData.getData();
23                     if ("Startup".equals(data.getState())) {
24                         Container container = getContainer(standbyJobData.getData().getJarFileName(), standbyJobData.getData().getPackagesToScan(), standbyJobData.getData().isSpring());
25                         container.schedulerManager().startupManual(data.getGroupName(), data.getJobName(), data.getCron(), data.getMisfirePolicy());
26                         runningJobCount++;
27                     }
28                 } catch (Exception e) {
29                     LoggerHelper.error("start jar failed [" + standbyJobData.getPath() + "]", e);
30                 }
31             }
32             return runningJobCount;
33         }
34 
35         @Override
36         public void relinquishLeadership() {
37             try {
38                 if (jobCache != null) {
39                     jobCache.close();
40                 }
41                 LoggerHelper.info("job cache has been closed.");
42             } catch (Throwable e) {
43                 LoggerHelper.warn("job cache close failed.", e);
44             }
45             LoggerHelper.info("begin stop scheduler manager.");
46             shutdownAllScheduler();
47             if (client.getState() == CuratorFrameworkState.STARTED) {
48                 StandbyNodeData.Data data = new StandbyNodeData.Data(getIp());
49                 standbyApiFactory.nodeApi().updateNode(nodePath, data);
50                 LoggerHelper.info(getIp() + " has been shutdown. [" + data + "]");
51             }
52             LoggerHelper.info("clear node successfully.");
53         }
54 
55     }

  這個listener是當節點取得Master權限時需要做的事情。我們可以看到,acquireLeadership這個方法中,主要調用了startupJobs這個方法,然后更新了自己的節點信息。而startupJobs這個方法,它做的事情就是檢查zk節點中現在正在運行的任務,如果發現的話,就把這些任務啟動。

  這就意味着,當其它節點掛掉的時候,任何一個節點得到Master的權限,都會檢查當前正在運行的任務,並且把它們在自己的節點上啟動,這就是standby模式的備份策略。而relinquishLeadership這個方法則是在節點失去Master權限時要做的事情,而我們做的,就是把它當下所有的任務都關閉。

  接下來,我們再來看另外一個listener。代碼如下。

 1 private class JobCacheListener implements PathChildrenCacheListener {
 2 
 3         @Override
 4         public synchronized void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
 5             AssertHelper.isTrue(isJoined(), "illegal state .");
 6             boolean hasLeadership = leaderSelector != null && leaderSelector.hasLeadership();
 7             if (!hasLeadership) {
 8                 return;
 9             }
10             if (!EventHelper.isChildModifyEvent(event)) {
11                 return;
12             }
13             StandbyJobData standbyJobData = new StandbyJobData(event.getData());
14             if (StringHelper.isEmpty(standbyJobData.getData().getOperation())) {
15                 return;
16             }
17             StandbyJobData.Data data = standbyJobData.getData();
18             if (data.isUnknownOperation()) {
19                 return;
20             }
21             StandbyNodeData.Data nodeData = standbyApiFactory.nodeApi().getNode(nodePath).getData();
22             executeOperation(nodeData, data);
23         }
24 
25         private void executeOperation(StandbyNodeData.Data nodeData, StandbyJobData.Data data) {
26             try {
27                 if (data.isStart() || data.isRestart()) {
28                     Container container = getContainer(data.getJarFileName(), data.getPackagesToScan(), data.isSpring());
29                     container.schedulerManager().startupManual(data.getGroupName(), data.getJobName(), data.getCron(), data.getMisfirePolicy());
30                     if (data.isStart()) {
31                         nodeData.increment();
32                     }
33                     data.setState("Startup");
34                 } else {
35                     Container container = getContainer(data.getOriginalJarFileName(), data.getPackagesToScan(), data.isSpring());
36                     container.schedulerManager().shutdown(data.getGroupName(), data.getJobName());
37                     nodeData.decrement();
38                     data.setState("Pause");
39                 }
40                 data.operateSuccess();
41                 standbyApiFactory.jobApi().updateJob(data.getGroupName(), data.getJobName(), data);
42                 standbyApiFactory.nodeApi().updateNode(nodePath, nodeData);
43             } catch (Throwable e) {
44                 LoggerHelper.error("handle operation failed. " + data, e);
45                 data.operateFailed(ExceptionHelper.getStackTrace(e, true));
46                 standbyApiFactory.jobApi().updateJob(data.getGroupName(), data.getJobName(), data);
47             }
48         }
49 
50     }

  首先,這個listener是當Job的zk節點數據有更新時,才會觸發。而zk節點數據有更新,一般都是控制台發來的啟動、暫停或者重啟的命令。這個時候,我們就需要進行一系列的檢查,比如該節點是否是Master,事件是否是控制台發來的等等。

  如果滿足條件,就會執行executeOperation方法,這個時候會根據控制台發過來的動作進行任務的調度。由於我們這個listener只有Master節點才啟動(在StandbyLeadershipSelectorListener的11行啟動),因此這就意味着,只有Master節點才會響應控制台的任務調度。

  如此一來,我們就能保證只有Master節點才會運行任務,並且集群的節點之間可以容災。當Master掛掉時,其它節點會獲取到Master權限,這個時候,獲得Master權限的節點就會接替任務運行的責任。

  好了,standby節點的實現就是這么簡單,大家明白了嗎?

  這就是站在巨人的肩膀上,我們只用這么簡單的代碼就實現了一個簡單的主備任務調度集群,是不是很酷呢?

  

master-slave模式實現

  

  master-slave模式(主從模式)與standby模式(主備模式)有着本質上的不同,但它們相同的是,這兩種任何一個模式下,一個Job都有且只有一個節點運行它。而不同的是,集群的調度策略會有不同。

  standby模式只有一個節點運行任務,只有當Master節點掛掉的時候,其它節點才代替Master節點繼續運行任務。

  而在master-slave模式下,所有Job將會被均衡的分配到各個節點,如果集群中的一個節點掛掉,那么在這個節點上運行的任務將會再次均衡的分配給剩下的活着的節點。當有節點加入到集群時,master-slave集群並不會主動暫停正在運行的任務進行任務的重新分布,需要用戶在控制台手動的暫停某任務,然后再啟動它,這時候這個任務將會被自動路由到新增加的節點上。因為新增加的節點任務數是0,它將會被優先分配。

  好了,master-slave模式的集群調度策略介紹的差不多了,接下來我們就來看看它的代碼吧。它比standby模式下多了一個listener,並且在master-slave模式下,Master與Slave節點的責任也與standby模式有所不同。

  我們先來看第一個listener。代碼如下。

 1 private class MasterSlaveLeadershipSelectorListener extends AbstractLeadershipSelectorListener {
 2 
 3         @Override
 4         public void acquireLeadership() throws Exception {
 5             checkUnavailableNode();
 6             MasterSlaveNodeData masterSlaveNodeData = masterSlaveApiFactory.nodeApi().getNode(nodePath);
 7             masterSlaveNodeData.getData().setState("Master");
 8             masterSlaveApiFactory.nodeApi().updateNode(nodePath, masterSlaveNodeData.getData());
 9             LoggerHelper.info(getIp() + " has been updated. [" + masterSlaveNodeData.getData() + "]");
10             nodeCache.start();
11         }
12 
13         /**
14          * Check unavailable nodes , release jobs that is assigned on these nodes.
15          */
16         private void checkUnavailableNode() {
17             List<MasterSlaveNodeData> masterSlaveNodeDataList = masterSlaveApiFactory.nodeApi().getAllNodes();
18             List<String> availableNodes = new ArrayList<>();
19             if (!ListHelper.isEmpty(masterSlaveNodeDataList)) {
20                 availableNodes.addAll(masterSlaveNodeDataList.stream().map(MasterSlaveNodeData::getPath).collect(Collectors.toList()));
21             }
22             List<MasterSlaveJobData> masterSlaveJobDataList = masterSlaveApiFactory.jobApi().getAllJobs();
23             if (!ListHelper.isEmpty(masterSlaveJobDataList)) {
24                 for (MasterSlaveJobData masterSlaveJobData : masterSlaveJobDataList) {
25                     MasterSlaveJobData.Data data = masterSlaveJobData.getData();
26                     if (!availableNodes.contains(data.getNodePath())) {
27                         data.release();
28                         masterSlaveApiFactory.jobApi().updateJob(data.getGroupName(), data.getJobName(), data);
29                     }
30                 }
31             }
32         }
33 
34         @Override
35         public void relinquishLeadership() {
36             try {
37                 if (nodeCache != null) {
38                     nodeCache.close();
39                 }
40                 LoggerHelper.info("node cache has been closed.");
41             } catch (Throwable e) {
42                 LoggerHelper.warn("node cache close failed.", e);
43             }
44             if (client.getState() == CuratorFrameworkState.STARTED) {
45                 MasterSlaveNodeData.Data nodeData = new MasterSlaveNodeData.Data(getIp());
46                 releaseJobs(nodePath, nodeData);
47                 nodeData.setState("Slave");
48                 masterSlaveApiFactory.nodeApi().updateNode(nodePath, nodeData);
49             }
50             LoggerHelper.info("clear node successfully.");
51         }
52 
53     }

  與standby模式相同,這個listener也是節點在獲取到Master權限時要做的事情。我們可以看到,當取到Master權限時,節點不會做任何任務的調度,只是簡單的更新了一下自己節點的信息,並且啟動了一下nodeCache(代碼第10行)。

  那么nodeCache是做什么的呢?

  接下來,我們來看第二個listener就知道了,代碼如下。

 1 private class NodeCacheListener implements PathChildrenCacheListener {
 2 
 3         @Override
 4         public synchronized void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
 5             AssertHelper.isTrue(isJoined(), "illegal state .");
 6             //double check
 7             if (!leaderSelector.hasLeadership()) {
 8                 return;
 9             }
10             if (EventHelper.isChildRemoveEvent(event)) {
11                 MasterSlaveNodeData masterSlaveNodeData = new MasterSlaveNodeData(event.getData().getPath(), event.getData().getData());
12                 releaseJobs(masterSlaveNodeData.getPath(), masterSlaveNodeData.getData());
13             }
14         }
15 
16     }

  這個listener是在節點有變化時被觸發,而且只有Master權限的節點才會去觸發。可以看到,我們先是判斷了當前節點是不是Master權限,然后判斷了當前事件是不是有節點離開了集群,如果是的話,就把這個掛掉的節點下面的所有任務都釋放掉,也就是releaseJobs方法所做的事情。

  這就意味着,作為Master節點,它必須感知集群當中是否有節點掛掉,如果有的話,就必須把它下面的Job都釋放掉,以便於其它節點繼續執行這些任務,這就是master-slave模式下容災的一種體現。

  接下來,我們來看重頭戲,也就是最后一個listener。它做的事情相對來說就要復雜的多了。代碼如下。

  1     private class JobCacheListener implements PathChildrenCacheListener {
  2 
  3         @Override
  4         public synchronized void childEvent(CuratorFramework clientInner, PathChildrenCacheEvent event) throws Exception {
  5             AssertHelper.isTrue(isJoined(), "illegal state .");
  6             if (!EventHelper.isChildModifyEvent(event)) {
  7                 return;
  8             }
  9             MasterSlaveJobData jobData = new MasterSlaveJobData(event.getData());
 10             if (StringHelper.isEmpty(jobData.getData().getOperation())) {
 11                 return;
 12             }
 13             MasterSlaveJobData.Data data = jobData.getData();
 14             if (data.isUnknownOperation()) {
 15                 return;
 16             }
 17             boolean hasLeadership = leaderSelector != null && leaderSelector.hasLeadership();
 18             if (hasLeadership && StringHelper.isEmpty(data.getNodePath())) {
 19                 //if has operation, wait a moment.
 20                 if (checkNotExecuteOperation()) {
 21                     try {
 22                         Thread.sleep(3000);
 23                     } catch (Throwable e) {
 24                         //ignored
 25                     }
 26                     masterSlaveApiFactory.jobApi().updateJob(data.getGroupName(), data.getJobName(), data);
 27                     return;
 28                 }
 29                 List<MasterSlaveNodeData> masterSlaveNodeDataList = masterSlaveApiFactory.nodeApi().getAllNodes();
 30                 if (ListHelper.isEmpty(masterSlaveNodeDataList)) {
 31                     data.operateFailed("there is not any one node live.");
 32                     masterSlaveApiFactory.jobApi().updateJob(data.getGroupName(), data.getJobName(), data);
 33                     return;
 34                 }
 35                 Collections.sort(masterSlaveNodeDataList);
 36                 data.setNodePath(masterSlaveNodeDataList.get(0).getPath());
 37                 masterSlaveApiFactory.jobApi().updateJob(data.getGroupName(), data.getJobName(), data);
 38                 return;
 39             }
 40             if (hasLeadership) {
 41                 //check whether the node is available or not.
 42                 List<MasterSlaveNodeData> masterSlaveNodeDataList = masterSlaveApiFactory.nodeApi().getAllNodes();
 43                 boolean nodeIsLive = false;
 44                 for (MasterSlaveNodeData masterSlaveNodeData : masterSlaveNodeDataList) {
 45                     if (masterSlaveNodeData.getPath().equals(data.getNodePath())) {
 46                         nodeIsLive = true;
 47                         break;
 48                     }
 49                 }
 50                 if (!nodeIsLive) {
 51                     data.clearNodePath();
 52                     masterSlaveApiFactory.jobApi().updateJob(data.getGroupName(), data.getJobName(), data);
 53                 }
 54             }
 55             //if the job has been assigned to this node, then execute.
 56             if (EventHelper.isChildUpdateEvent(event) && nodePath.equals(data.getNodePath())) {
 57                 MasterSlaveNodeData.Data nodeData;
 58                 try {
 59                     nodeData = masterSlaveApiFactory.nodeApi().getNode(nodePath).getData();
 60                 } catch (Throwable e) {
 61                     LoggerHelper.error("node [" + nodePath + "] not exists.");
 62                     data.clearNodePath();
 63                     masterSlaveApiFactory.jobApi().updateJob(data.getGroupName(), data.getJobName(), data);
 64                     return;
 65                 }
 66                 executeOperation(nodeData, jobData);
 67                 return;
 68             }
 69         }
 70 
 71         private boolean checkNotExecuteOperation() {
 72             List<MasterSlaveJobData> masterSlaveJobDataList = masterSlaveApiFactory.jobApi().getAllJobs();
 73             if (ListHelper.isEmpty(masterSlaveJobDataList)) {
 74                 return false;
 75             }
 76             for (MasterSlaveJobData masterSlaveJobData : masterSlaveJobDataList) {
 77                 boolean hasOperation = !StringHelper.isEmpty(masterSlaveJobData.getData().getOperation());
 78                 boolean assigned = !StringHelper.isEmpty(masterSlaveJobData.getData().getNodePath());
 79                 if (hasOperation && assigned) {
 80                     return true;
 81                 }
 82             }
 83             return false;
 84         }
 85 
 86         private void executeOperation(MasterSlaveNodeData.Data nodeData, MasterSlaveJobData jobData) {
 87             MasterSlaveJobData.Data data = jobData.getData();
 88             try {
 89                 if (data.isStart() || data.isRestart()) {
 90                     Container container = getContainer(data.getJarFileName(), data.getPackagesToScan(), data.isSpring());
 91                     container.schedulerManager().startupManual(data.getGroupName(), data.getJobName(), data.getCron(), data.getMisfirePolicy());
 92                     if (data.isStart()) {
 93                         nodeData.addJobPath(jobData.getPath());
 94                     }
 95                     data.setState("Startup");
 96                 } else {
 97                     Container container = getContainer(data.getOriginalJarFileName(), data.getPackagesToScan(), data.isSpring());
 98                     container.schedulerManager().shutdown(data.getGroupName(), data.getJobName());
 99                     nodeData.removeJobPath(jobData.getPath());
100                     data.clearNodePath();
101                     data.setState("Pause");
102                 }
103                 data.operateSuccess();
104                 masterSlaveApiFactory.jobApi().updateJob(data.getGroupName(), data.getJobName(), data);
105                 masterSlaveApiFactory.nodeApi().updateNode(nodePath, nodeData);
106             } catch (Throwable e) {
107                 LoggerHelper.error("handle operation failed. " + data, e);
108                 data.operateFailed(ExceptionHelper.getStackTrace(e, true));
109                 masterSlaveApiFactory.jobApi().updateJob(data.getGroupName(), data.getJobName(), data);
110             }
111         }
112 
113     }

  這個listener是當任務節點信息有變化時,才會被觸發。在代碼的18行,我們判斷了該任務是否被分配了節點,如果沒被分配的話,並且當前節點是Master權限的話,就會進入If塊。這個時候我們會采用輪詢的方式給任務選擇節點,具體的做法就是先將節點按照當前任務數排序,我們選擇當前任務數最少的那個節點分配任務。

  當任務被分配完節點時,我們需要把事件重新激活,然后讓被分配了的節點去啟動、暫停或者重啟該任務。因此在第37行,我們直接將任務打回,繼續發送一個Job更新的事件。

  接下來,我們來到了40行,在這一行,LZ寫了一個簡單的注釋(由於LZ英文水平有限,因此注釋目前還不多,但是這個地方非常重要,所以就加了行注釋),注釋翻譯過來意思就是說要檢查一下當前被分配的節點是否可用。

  一種可能的情況是,在Master節點給任務分配完節點后,被分配的節點掛掉了。這個時候如果不檢查節點是否有效,當前任務的操作就會被忽略掉,因此我們讓擁有Master權限的節點檢查一下,如果發現節點已經掛掉,則把任務上分配的節點信息清除掉(代碼51行),然后重新激活(代碼52行)。

  接下來就來到了56行,這一行判斷的邏輯就比較簡單了,判斷一下如果任務有更新,並且被分配的節點就是自己的話,就把該任務進行相應的操作,比如啟動、暫停或者重啟。

  好了,master-slave模式的實現到這里也就講解的差不多了,還是那句話,站在巨人的肩膀上,事情將會變的非常簡單。

  這個類整個代碼不超400行,我們就實現了一個可以自動容災,負載均衡的任務調度集群,是不是很暴力呢?

  

niubi-job的類加載機制

  

  好了,我們已經介紹完了集群策略的實現,那么重點來了,我們如何保證每個任務的jar包運行的時候互相不影響呢?

  這就依靠於niubi-job內部的類加載機制,它基本上是套用的tomcat的類加載機制,這又是站在了巨人的肩膀上,0-0。

  好吧,接下來,LZ就大言不慚的介紹一下niubi-job的類加載機制(其實基本上就是tomcat的,0-0)。首先,我們先來看一個圖。如下。

  如果看到過tomcat類加載機制的圖的朋友,可能會覺得這個圖似曾相識。你沒有看錯,這個圖和tomcat的那個圖是非常相似的,也就是換了下名字罷了。

  在niubi-job的節點中,任何一個類的加載都會遵循以下的原則(具體的代碼這里就不貼了,大家可以去看ApplicationClassLoader這個類的代碼)。

  1、從bootstrap類加載器加載,如果沒有,進行下一步查找。

  2、從ext類加載器加載,如果沒有,進行下一步查找。

  3、從job的jar文件中加載,如果沒有,進行下一步查找。

  4、從app類加載器加載,如果沒有,進行下一步查找。

  5、從node類加載器加載,如果沒有,則拋出ClassNotFoundException。

  大家可以發現,niubi-job的類加載查找順序是違背了父委托機制,這樣做的原因是為了保證jar包之間的絕對隔離,也就是說,除了ext和bootstrap當中的類,所有的類都優先加載jar包中的,這樣可以保證jar包之間類的絕對隔離。但這樣做的壞處就是,如果jar包中包含了cluster本身包含的類,那么就會產生類轉換的異常。

  因此大家切記,在打任務的jar包時,cluster的lib目錄下的所有jar包都不能包含,如果非要使用自己的jar包,可以在部署cluster節點時,把lib包中的jar包替換成自己的(比如把log4j的jar包替換成自己想要使用的),但是不能直接把與lib目錄下重復的jar包打到任務jar包當中。

  

結語

  

  本文介紹了niubi-job中非常重要的集群策略實現和類加載機制,這基本上包含了niubi-job的代碼中80%以上的精華,剩下的那20%,大家如果有時間的話,可以自己去研究,LZ非常歡迎。

  最后,歡迎大家給niubi-job提交Issue和PR,LZ一定格盡職守的進行解答和Review。

  下一篇文章,LZ將會介紹一下0.9.3版中,niubi-job中web控制台的更新內容,比0.9.2版更加炫酷,功能也更加齊全。

  下次見!各位!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM