[轉]JStorm之Nimbus簡介


一、簡介

JStorm集群包含兩類節點:主控節點(Nimbus)和工作節點(Supervisor)。其分別對應的角色如下:
1. 主控節點(Nimbus)上運行Nimbus Daemon。Nimbus負責接收Client提交的Topology,分發代碼,分配任務給工作節點,監控集群中運行任務的狀態等工作。Nimbus作用類似於Hadoop中JobTracker。
2. 工作節點(Supervisor)上運行Supervisor Daemon。Supervisor通過subscribe Zookeeper相關數據監聽Nimbus分配過來任務,據此啟動或停止Worker工作進程。每個Worker工作進程執行一個Topology任務的子集;單個Topology的任務由分布在多個工作節點上的Worker工作進程協同處理。

Nimbus和Supervisor節點之間的協調工作通過Zookeeper實現。此外,Nimbus和Supervisor本身均為無狀態進程,支持Fail Fast;JStorm集群節點的狀態信息或存儲在Zookeeper,或持久化到本地,這意味着即使Nimbus/Supervisor宕機,重啟后即可繼續工作。這個設計使得JStorm集群具有非常好的穩定性。

前面介紹了JStorm中節點狀態信息保存在Zookeeper里面,Nimbus通過向Zookeeper寫狀態信息分配任務,Supervisor通過從Zookeeper訂閱相關數據領取任務,同時Supervisor也定期發送心跳信息到Zookeeper,使得Nimbus可以掌握整個JStorm集群的狀態,從而可以進行任務調度或負載均衡。ZooKeeper使得整個JStorm集群十分健壯,任何節點宕機都不影響集群任務,只要重啟節點即可。

Zookeeper上存儲的狀態數據及Nimbus/Supervisor本地持久化數據涉及到的地方較多,詳細介紹Nimbus之前就上述數據的存儲結構簡要說明如下(注:引用自[5]http://xumingming.sinaapp.com/)。

圖1 JStorm存儲在Zookeeper中數據說明

圖2 Nimbus本地數據說明

圖3 Supervisor本地數據說明

二、系統架構與原理

Nimbus做三件事情:
1、接收Client提交Topology任務;
2、任務調度;
3、監控集群任務運行狀況。

前面已經提到,Nimbus通過向Zookeeper寫數據完成任務分配,通過讀Zookeeper上相關狀態信息監控集群中任務的運行狀態,所以與Nimbus直接發生交互僅Client和Zookeeper。如下圖示。

三、實現邏輯與代碼剖析

以jstorm-0.7.1為例,Nimbus相關實現在jstorm-server/src/main/java目錄的com.alipay.dw.jstorm.daemon.nimbus包里。Nimbus Daemon的啟動入口在NimbusServer.java。

1.Nimbus啟動

Nimbus Daemon進程啟動流程如下:
1、根據配置文件初始化Context數據;
2、與Zookeeper數據同步;
3、初始化RPC服務處理類ServiceHandler;
4、啟動任務分配策略線程;
5、啟動Task的Heartbeat監控線程;
6、啟動RPC服務;
7、其他初始化工作。
Nimbus的詳細啟動邏輯如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SuppressWarnings("rawtypes")
private void launchServer(Map conf) throws Exception {
    LOG.info("Begin to start nimbus with conf " + conf);
    //1.檢查配置文件中是否配置為分布式模式
    StormConfig.validate_distributed_mode(conf);
    //2.注冊主線程退出Hook現場清理(關閉線程+清理數據)
    initShutdownHook();
    //3.新建NimbusData數據,記錄30s超時上傳下載通道Channel/BufferFileInputStream
    data = createNimbusData(conf);
    //4.nimbus本地不存在的stormids數據如果在ZK上存在則刪除,其中刪除操作包括/zk/{assignments,tasks,storms}相關數據
    NimbusUtils.cleanupCorruptTopologies(data);
    //5.啟動Topology分配策略
    initTopologyAssign();
    //6.初始化所有topology的狀態為startup
    initTopologyStatus();
    //7.監控所有task的heartbeat,一旦發現taskid失去心跳將其置為needreassign 1次/10s
    initMonitor(conf);
    //8.啟動cleaner線程,默認600s掃描一次,默認刪除3600s沒有讀寫過的jar文件
    initCleaner(conf);
    //9.初始化ServiceHandler
    serviceHandler = new ServiceHandler(data);
    //10.啟動rpc server
    initThrift(conf);
}

2.Topology提交

JStorm集群啟動完成后,Client可向其提交Topology。jstorm-0.7.1源碼目錄jstorm-client/src/main/java下包backtype.storm為用戶提供向集群提交Topology的StormSubmitter.submitTopology方法。提交Topology在Client/Nimbus兩端都會做相關的處理。

Client端提交Topology分兩步完成:
1)打包Topology計算邏輯代碼jar提交給Nimbus,上傳到Nimbus目錄$jstorm_local_dir/nimbus/inbox/stormjar-{$randomid}.jar;其中randomid是Nimbus生成的隨機UUID;
2)Client通過RPC向Nimbus提交Topology DAG及配置信息;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void submitTopology(
String name,
Map stormConf,
StormTopology topology)
throws AlreadyAliveException, InvalidTopologyException {
  if(!Utils.isValidConf(stormConf)) {
      throw new IllegalArgumentException("Storm conf is not valid.");
  }
  stormConf = new HashMap(stormConf);
  stormConf.putAll(Utils.readCommandLineOpts());
  Map conf = Utils.readStormConfig();
  conf.putAll(stormConf);
  try {
      String serConf = JSONValue.toJSONString(stormConf);
      if(localNimbus!=null) {
          LOG.info("Submitting topology " + name + " in local mode");
          localNimbus.submitTopology(name, null, serConf, topology);
      } else {
          //1.向Nimbus提交jar包
          submitJar(conf);
          NimbusClient client = NimbusClient.getConfiguredClient(conf);
          try {
              LOG.info("Submitting topology " +  name + " in distributed mode with conf "  + serConf);
              //2.提交topology DAG及序列化后的配置信息serconf(json)
              client.getClient().submitTopology(name, submittedJar, serConf, topology);
          } finally {
              client.close();
          }
      }
      LOG.info("Finished submitting topology: " +  name);
  } catch(TException e) {
      throw new RuntimeException(e);
  }
}

其中RPC和數據序列化通過跨語言服務框架Thrift(http://wiki.apache.org/thrift/)實現。JStorm的服務定義在other/storm.thrift里。

Nimbus端接收到Client提交上來的Topology計算邏輯代碼jar包后如前面所述將jar包暫存在目錄$jstorm_local_dir/nimbus/inbox/stormjar-{$randomid}.jar;
Nimbus端接收到Client提交上來的Topology DAG和配置信息后:
1)簡單合法性檢查;主要檢查是否存在相同TopologyName的Topology,如果存在,拒絕Topology提交。
2)生成topologyid;生成規則:TopologyName-counter-currenttime;
3)序列化配置文件和Topology代碼;
4)Nimbus本地准備運行時所需數據;
5)向Zookeeper注冊Topology和Task;
6)將Tasks壓入分配隊列等待TopologyAssign分配;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@SuppressWarnings("unchecked")
@Override
public void submitTopology(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology)
        throws AlreadyAliveException, InvalidTopologyException, TException {
    ……
    try {
    	//1.檢測topologyName是否已經存在,如果存在相同名稱的topology則拒絕提交
        checkTopologyActive(data, topologyname, false);
    }
    ……
    //2.根據topologyname構造topologyid(=topologyname-$counter-$ctime)
    int counter = data.getSubmittedCount().incrementAndGet();
    String topologyId = topologyname + "-" + counter + "-"
            + TimeUtils.current_time_secs();
    //3.根據輸入參數jsonConf重組配置數據
    Map serializedConf = (Map) JStormUtils.from_json(jsonConf);
    if (serializedConf == null) {
        throw new InvalidTopologyException("");
    }
    serializedConf.put(Config.STORM_ID, topologyId);
    Map stormConf;
    try {
        stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology);
    } catch (Exception e1) {
        throw new TException(errMsg);
    }
    Map totalStormConf = new HashMap(conf);
    totalStormConf.putAll(stormConf);
    StormTopology newtopology = new StormTopology(topology);
    //4.檢查topology的合法性,包括componentid檢查和spout/bolt不能為空檢查
    // this validates the structure of the topology
    Common.validate_basic(newtopology, totalStormConf, topologyId);
    try {
        StormClusterState stormClusterState = data.getStormClusterState();
        //5.在nimbus的本地准備所有topology相關數據
        //包括$storm-local-dir/nimbus/stormdist/topologyid/{tormjar.jar,stormcode.ser,stormconf.ser}
        // create $storm-local-dir/nimbus/topologyId/xxxx files
        setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
                newtopology);
        //6.向zk寫入task信息
        //6.1新建目錄$zkroot/taskbeats/topologyid
        //6.2寫文件$zkroot/tasks/topologyid/taskid 內容為對應task的taskinfo[內容:componentid]
        // generate TaskInfo for every bolt or spout in ZK
        // $zkroot /tasks/topoologyId/xxx
        setupZkTaskInfo(conf, topologyId, stormClusterState);
        //7.任務分配事件壓入待分配隊列
        // make assignments for a topology
        TopologyAssignEvent assignEvent = new TopologyAssignEvent();
        assignEvent.setTopologyId(topologyId);
        assignEvent.setScratch(false);
        assignEvent.setTopologyName(topologyname);
        TopologyAssign.push(assignEvent);
    }
    ……
}

3.任務調度

Topology被成功提交后會壓入Nimbus中TopologyAssign的FIFO隊列,后台任務調度線程對隊列中的Topology逐個進行任務調度。
從0.9.0開始,JStorm提供非常強大的調度功能,基本上可以滿足大部分的需求,同時支持自定義任務調度策略。JStorm的資源不再僅是Worker的端口,而從CPU/Memory/Disk/Net等四個維度綜合考慮。
jstorm-0.7.1的任務調度策略仍主要以Worker端口/Net單一維度調度。

任務調度需要解決的問題是:如何將Topology DAG中各個計算節點和集群資源匹配,才能發揮高效的邏輯處理。0.7.1的策略是:
1、將集群中的資源排序:按照空閑worker數從小到大的順序重排節點,節點內部按照端口大小順序排列;
2、Topology中需要分配的任務(重新分配的Topology時大多任務不再需要分配)逐個映射到上述排好序的資源里。
任務調度核心邏輯如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public static List sortSlots(
Set allSlots, int needSlotNum) {
    Map> nodeMap = new HashMap>();
    // group by first
    // 按照節點進行組織Map> : nodeid -> ports
    for (NodePort np : allSlots) {
        String node = np.getNode();
        List list = nodeMap.get(node);
        if (list == null) {
            list = new ArrayList();
            nodeMap.put(node, list);
        }
        list.add(np);
    }
 
    //每一個nodeid按照端口的大小進行排序
    for (Entry> entry : nodeMap.entrySet()) {
        List ports = entry.getValue();
        Collections.sort(ports);
    }
 
    //收集所有的workers
    List> splitup = new ArrayList>(nodeMap.values());
 
    //按照節點可用worker數從小到大排序
    //1.assignTasks-Map supInfos
    //2.availSlots : splitup/List>
    Collections.sort(splitup, new Comparator> () {
        public int compare(List o1, List o2) {
            return o1.size() - o2.size();
        }
    });
 
    /*
     * splitup目前的狀態(A-F表示節點,1-h表示端口)
     * |A| |B| |C| |D| |E| |F|
     *--|---|---|---|---|---|--
     * |1| |2| |3| |4| |5| |6|
     *     |7| |8| |9| |0| |a|
     *         |b| |c| |d| |e|
     *                 |f| |g|
     *                     |h|
     * 經過interleave_all收集到的sortedFreeSlots為:
     * 1-2-3-4-5-6-7-8-9-0-a-b-c-d-e-f-g-h
     */
    List sortedFreeSlots = JStormUtils.interleave_all(splitup);
 
    //比較sortedFreeSlots.size和needSlotNum的大小分配workers
    if (sortedFreeSlots.size()  needSlotNum
    return sortedFreeSlots.subList(0, needSlotNum);
}

4.任務監控

初始化Nimbus時后台會隨之啟動一個稱為MonitorRunnable的線程,該線程的作用是定期檢查所有運行Topology的任務Tasks是否存在Dead的狀態。一旦發現Topology中存在Dead的任務Task,MonitorRunnable將該Topology置為StatusType.monitor,等待任務分配線程對該Topology中的Dead任務進行重新分配。
MonitorRunnable線程默認10s執行一次檢查,主要邏輯如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public void run() {
    //1.獲取jstorm對zk的操作接口
    StormClusterState clusterState = data.getStormClusterState();
    try {
        // Attetion, here don't check /ZK-dir/taskbeats to
        // get active topology list
        //2.通過$zkroot/assignments/獲取所有需要檢查active topology
        List active_topologys = clusterState.assignments(null);
 
        if (active_topologys == null) {
            LOG.info("Failed to get active topologies");
            return;
        }
 
        for (String topologyid : active_topologys) {
            LOG.debug("Check tasks " + topologyid);
            // Attention, here don't check /ZK-dir/taskbeats/topologyid to
            // get task ids
            //3.通過$zkroot/tasks/topologyid獲取組成topology的所有tasks
            List taskIds = clusterState.task_ids(topologyid);
            if (taskIds == null) {
                LOG.info("Failed to get task ids of " + topologyid);
                continue;
            }
 
            boolean needReassign = false;
            for (Integer task : taskIds) {
                //4.檢查task是否為Dead狀態,主要是task心跳是否超時
                boolean isTaskDead = NimbusUtils.isTaskDead(data, topologyid, task);
                if (isTaskDead == true) {
                    needReassign = true;
                    break;
                }
            }
            if (needReassign == true) {
                //5.如果Topology里有Dead狀態的Task則topology狀態置為monitor等待任務分配線程重新分配
                NimbusUtils.transition(data, topologyid, false, StatusType.monitor);
            }
        }
    } catch (Exception e) {
        // TODO Auto-generated catch block
        LOG.error(e.getCause(), e);
    }
}

四、結語

本文簡單介紹了Nimbus在整個JStorm系統中扮演的角色,及其實現邏輯和關鍵流程的源碼剖析,希望能夠對剛接觸JStorm的同學有所幫助。文中難免存在不足和錯誤,歡迎交流指導。

五、參考文獻

[1]Storm社區. http://Storm.incubator.apache.org/
[2]JStorm源碼. https://github.com/alibaba/jStorm/
[3]Storm源碼. https://github.com/nathanmarz/Storm/
[4]Jonathan Leibiusky, Gabriel Eisbruch, etc. Getting Started with Storm.http://shop.oreilly.com/product/0636920024835.do. O’Reilly Media, Inc.
[5]Xumingming Blog. http://xumingming.sinaapp.com/
[6]量子恆道官方博客. http://blog.linezing.com/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM