一、簡介
JStorm集群包含兩類節點:主控節點(Nimbus)和工作節點(Supervisor)。其分別對應的角色如下:
1. 主控節點(Nimbus)上運行Nimbus Daemon。Nimbus負責接收Client提交的Topology,分發代碼,分配任務給工作節點,監控集群中運行任務的狀態等工作。Nimbus作用類似於Hadoop中JobTracker。
2. 工作節點(Supervisor)上運行Supervisor Daemon。Supervisor通過subscribe Zookeeper相關數據監聽Nimbus分配過來任務,據此啟動或停止Worker工作進程。每個Worker工作進程執行一個Topology任務的子集;單個Topology的任務由分布在多個工作節點上的Worker工作進程協同處理。
Nimbus和Supervisor節點之間的協調工作通過Zookeeper實現。此外,Nimbus和Supervisor本身均為無狀態進程,支持Fail Fast;JStorm集群節點的狀態信息或存儲在Zookeeper,或持久化到本地,這意味着即使Nimbus/Supervisor宕機,重啟后即可繼續工作。這個設計使得JStorm集群具有非常好的穩定性。
前面介紹了JStorm中節點狀態信息保存在Zookeeper里面,Nimbus通過向Zookeeper寫狀態信息分配任務,Supervisor通過從Zookeeper訂閱相關數據領取任務,同時Supervisor也定期發送心跳信息到Zookeeper,使得Nimbus可以掌握整個JStorm集群的狀態,從而可以進行任務調度或負載均衡。ZooKeeper使得整個JStorm集群十分健壯,任何節點宕機都不影響集群任務,只要重啟節點即可。
Zookeeper上存儲的狀態數據及Nimbus/Supervisor本地持久化數據涉及到的地方較多,詳細介紹Nimbus之前就上述數據的存儲結構簡要說明如下(注:引用自[5]http://xumingming.sinaapp.com/)。
圖1 JStorm存儲在Zookeeper中數據說明
圖2 Nimbus本地數據說明
圖3 Supervisor本地數據說明
二、系統架構與原理
Nimbus做三件事情:
1、接收Client提交Topology任務;
2、任務調度;
3、監控集群任務運行狀況。
前面已經提到,Nimbus通過向Zookeeper寫數據完成任務分配,通過讀Zookeeper上相關狀態信息監控集群中任務的運行狀態,所以與Nimbus直接發生交互僅Client和Zookeeper。如下圖示。
三、實現邏輯與代碼剖析
以jstorm-0.7.1為例,Nimbus相關實現在jstorm-server/src/main/java目錄的com.alipay.dw.jstorm.daemon.nimbus包里。Nimbus Daemon的啟動入口在NimbusServer.java。
1.Nimbus啟動
Nimbus Daemon進程啟動流程如下:
1、根據配置文件初始化Context數據;
2、與Zookeeper數據同步;
3、初始化RPC服務處理類ServiceHandler;
4、啟動任務分配策略線程;
5、啟動Task的Heartbeat監控線程;
6、啟動RPC服務;
7、其他初始化工作。
Nimbus的詳細啟動邏輯如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
@SuppressWarnings("rawtypes") private void launchServer(Map conf) throws Exception { LOG.info("Begin to start nimbus with conf " + conf); //1.檢查配置文件中是否配置為分布式模式 StormConfig.validate_distributed_mode(conf); //2.注冊主線程退出Hook現場清理(關閉線程+清理數據) initShutdownHook(); //3.新建NimbusData數據,記錄30s超時上傳下載通道Channel/BufferFileInputStream data = createNimbusData(conf); //4.nimbus本地不存在的stormids數據如果在ZK上存在則刪除,其中刪除操作包括/zk/{assignments,tasks,storms}相關數據 NimbusUtils.cleanupCorruptTopologies(data); //5.啟動Topology分配策略 initTopologyAssign(); //6.初始化所有topology的狀態為startup initTopologyStatus(); //7.監控所有task的heartbeat,一旦發現taskid失去心跳將其置為needreassign 1次/10s initMonitor(conf); //8.啟動cleaner線程,默認600s掃描一次,默認刪除3600s沒有讀寫過的jar文件 initCleaner(conf); //9.初始化ServiceHandler serviceHandler = new ServiceHandler(data); //10.啟動rpc server initThrift(conf); } |
2.Topology提交
JStorm集群啟動完成后,Client可向其提交Topology。jstorm-0.7.1源碼目錄jstorm-client/src/main/java下包backtype.storm為用戶提供向集群提交Topology的StormSubmitter.submitTopology方法。提交Topology在Client/Nimbus兩端都會做相關的處理。
Client端提交Topology分兩步完成:
1)打包Topology計算邏輯代碼jar提交給Nimbus,上傳到Nimbus目錄$jstorm_local_dir/nimbus/inbox/stormjar-{$randomid}.jar;其中randomid是Nimbus生成的隨機UUID;
2)Client通過RPC向Nimbus提交Topology DAG及配置信息;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
public static void submitTopology( String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException { if(!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid."); } stormConf = new HashMap(stormConf); stormConf.putAll(Utils.readCommandLineOpts()); Map conf = Utils.readStormConfig(); conf.putAll(stormConf); try { String serConf = JSONValue.toJSONString(stormConf); if(localNimbus!=null) { LOG.info("Submitting topology " + name + " in local mode"); localNimbus.submitTopology(name, null, serConf, topology); } else { //1.向Nimbus提交jar包 submitJar(conf); NimbusClient client = NimbusClient.getConfiguredClient(conf); try { LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); //2.提交topology DAG及序列化后的配置信息serconf(json) client.getClient().submitTopology(name, submittedJar, serConf, topology); } finally { client.close(); } } LOG.info("Finished submitting topology: " + name); } catch(TException e) { throw new RuntimeException(e); } } |
其中RPC和數據序列化通過跨語言服務框架Thrift(http://wiki.apache.org/thrift/)實現。JStorm的服務定義在other/storm.thrift里。
Nimbus端接收到Client提交上來的Topology計算邏輯代碼jar包后如前面所述將jar包暫存在目錄$jstorm_local_dir/nimbus/inbox/stormjar-{$randomid}.jar;
Nimbus端接收到Client提交上來的Topology DAG和配置信息后:
1)簡單合法性檢查;主要檢查是否存在相同TopologyName的Topology,如果存在,拒絕Topology提交。
2)生成topologyid;生成規則:TopologyName-counter-currenttime;
3)序列化配置文件和Topology代碼;
4)Nimbus本地准備運行時所需數據;
5)向Zookeeper注冊Topology和Task;
6)將Tasks壓入分配隊列等待TopologyAssign分配;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
@SuppressWarnings("unchecked") @Override public void submitTopology(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, TException { …… try { //1.檢測topologyName是否已經存在,如果存在相同名稱的topology則拒絕提交 checkTopologyActive(data, topologyname, false); } …… //2.根據topologyname構造topologyid(=topologyname-$counter-$ctime) int counter = data.getSubmittedCount().incrementAndGet(); String topologyId = topologyname + "-" + counter + "-" + TimeUtils.current_time_secs(); //3.根據輸入參數jsonConf重組配置數據 Map serializedConf = (Map) JStormUtils.from_json(jsonConf); if (serializedConf == null) { throw new InvalidTopologyException(""); } serializedConf.put(Config.STORM_ID, topologyId); Map stormConf; try { stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology); } catch (Exception e1) { throw new TException(errMsg); } Map totalStormConf = new HashMap(conf); totalStormConf.putAll(stormConf); StormTopology newtopology = new StormTopology(topology); //4.檢查topology的合法性,包括componentid檢查和spout/bolt不能為空檢查 // this validates the structure of the topology Common.validate_basic(newtopology, totalStormConf, topologyId); try { StormClusterState stormClusterState = data.getStormClusterState(); //5.在nimbus的本地准備所有topology相關數據 //包括$storm-local-dir/nimbus/stormdist/topologyid/{tormjar.jar,stormcode.ser,stormconf.ser} // create $storm-local-dir/nimbus/topologyId/xxxx files setupStormCode(conf, topologyId, uploadedJarLocation, stormConf, newtopology); //6.向zk寫入task信息 //6.1新建目錄$zkroot/taskbeats/topologyid //6.2寫文件$zkroot/tasks/topologyid/taskid 內容為對應task的taskinfo[內容:componentid] // generate TaskInfo for every bolt or spout in ZK // $zkroot /tasks/topoologyId/xxx setupZkTaskInfo(conf, topologyId, stormClusterState); //7.任務分配事件壓入待分配隊列 // make assignments for a topology TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTopologyId(topologyId); assignEvent.setScratch(false); assignEvent.setTopologyName(topologyname); TopologyAssign.push(assignEvent); } …… } |
3.任務調度
Topology被成功提交后會壓入Nimbus中TopologyAssign的FIFO隊列,后台任務調度線程對隊列中的Topology逐個進行任務調度。
從0.9.0開始,JStorm提供非常強大的調度功能,基本上可以滿足大部分的需求,同時支持自定義任務調度策略。JStorm的資源不再僅是Worker的端口,而從CPU/Memory/Disk/Net等四個維度綜合考慮。
jstorm-0.7.1的任務調度策略仍主要以Worker端口/Net單一維度調度。
任務調度需要解決的問題是:如何將Topology DAG中各個計算節點和集群資源匹配,才能發揮高效的邏輯處理。0.7.1的策略是:
1、將集群中的資源排序:按照空閑worker數從小到大的順序重排節點,節點內部按照端口大小順序排列;
2、Topology中需要分配的任務(重新分配的Topology時大多任務不再需要分配)逐個映射到上述排好序的資源里。
任務調度核心邏輯如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
public static List sortSlots( Set allSlots, int needSlotNum) { Map> nodeMap = new HashMap>(); // group by first // 按照節點進行組織Map> : nodeid -> ports for (NodePort np : allSlots) { String node = np.getNode(); List list = nodeMap.get(node); if (list == null) { list = new ArrayList(); nodeMap.put(node, list); } list.add(np); } //每一個nodeid按照端口的大小進行排序 for (Entry> entry : nodeMap.entrySet()) { List ports = entry.getValue(); Collections.sort(ports); } //收集所有的workers List> splitup = new ArrayList>(nodeMap.values()); //按照節點可用worker數從小到大排序 //1.assignTasks-Map supInfos //2.availSlots : splitup/List> Collections.sort(splitup, new Comparator> () { public int compare(List o1, List o2) { return o1.size() - o2.size(); } }); /* * splitup目前的狀態(A-F表示節點,1-h表示端口) * |A| |B| |C| |D| |E| |F| *--|---|---|---|---|---|-- * |1| |2| |3| |4| |5| |6| * |7| |8| |9| |0| |a| * |b| |c| |d| |e| * |f| |g| * |h| * 經過interleave_all收集到的sortedFreeSlots為: * 1-2-3-4-5-6-7-8-9-0-a-b-c-d-e-f-g-h */ List sortedFreeSlots = JStormUtils.interleave_all(splitup); //比較sortedFreeSlots.size和needSlotNum的大小分配workers if (sortedFreeSlots.size() needSlotNum return sortedFreeSlots.subList(0, needSlotNum); } |
4.任務監控
初始化Nimbus時后台會隨之啟動一個稱為MonitorRunnable的線程,該線程的作用是定期檢查所有運行Topology的任務Tasks是否存在Dead的狀態。一旦發現Topology中存在Dead的任務Task,MonitorRunnable將該Topology置為StatusType.monitor,等待任務分配線程對該Topology中的Dead任務進行重新分配。
MonitorRunnable線程默認10s執行一次檢查,主要邏輯如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
@Override public void run() { //1.獲取jstorm對zk的操作接口 StormClusterState clusterState = data.getStormClusterState(); try { // Attetion, here don't check /ZK-dir/taskbeats to // get active topology list //2.通過$zkroot/assignments/獲取所有需要檢查active topology List active_topologys = clusterState.assignments(null); if (active_topologys == null) { LOG.info("Failed to get active topologies"); return; } for (String topologyid : active_topologys) { LOG.debug("Check tasks " + topologyid); // Attention, here don't check /ZK-dir/taskbeats/topologyid to // get task ids //3.通過$zkroot/tasks/topologyid獲取組成topology的所有tasks List taskIds = clusterState.task_ids(topologyid); if (taskIds == null) { LOG.info("Failed to get task ids of " + topologyid); continue; } boolean needReassign = false; for (Integer task : taskIds) { //4.檢查task是否為Dead狀態,主要是task心跳是否超時 boolean isTaskDead = NimbusUtils.isTaskDead(data, topologyid, task); if (isTaskDead == true) { needReassign = true; break; } } if (needReassign == true) { //5.如果Topology里有Dead狀態的Task則topology狀態置為monitor等待任務分配線程重新分配 NimbusUtils.transition(data, topologyid, false, StatusType.monitor); } } } catch (Exception e) { // TODO Auto-generated catch block LOG.error(e.getCause(), e); } } |
四、結語
本文簡單介紹了Nimbus在整個JStorm系統中扮演的角色,及其實現邏輯和關鍵流程的源碼剖析,希望能夠對剛接觸JStorm的同學有所幫助。文中難免存在不足和錯誤,歡迎交流指導。
五、參考文獻
[1]Storm社區. http://Storm.incubator.apache.org/
[2]JStorm源碼. https://github.com/alibaba/jStorm/
[3]Storm源碼. https://github.com/nathanmarz/Storm/
[4]Jonathan Leibiusky, Gabriel Eisbruch, etc. Getting Started with Storm.http://shop.oreilly.com/product/0636920024835.do. O’Reilly Media, Inc.
[5]Xumingming Blog. http://xumingming.sinaapp.com/
[6]量子恆道官方博客. http://blog.linezing.com/