TopologyMaster:
處理拓撲的一些基本信息和工作,比如更新心跳信息,拓撲指標信息更新等
NimbusServer:
**
*
* NimbusServer work flow: 1. cleanup interrupted topology delete /storm-local-dir/nimbus/topologyid/stormdis delete /storm-zk-root/storms/topologyid
*
* 2. set /storm-zk-root/storms/topology stats as run
*
* 3. start one thread, every nimbus.monitor.reeq.secs set /storm-zk-root/storms/ all topology as monitor. when the topology's status is monitor, nimubs would
* reassign workers 4. start one threa, every nimubs.cleanup.inbox.freq.secs cleanup useless jar
*
* @author version 1: Nathan Marz version 2: Lixin/Chenjun version 3: Longda
*
*/
Nimbus在啟動時先去讀取配置文件,然后初始化follower線程,每個follower分別到zk注冊,在Znode下創建臨時節點記錄已啟動的時間,另外一個節點在初始化線程時,先后調用兩個方法:(FollowerRunnable.java)
data.getStormClusterState().update_nimbus_slave(hostPort, data.uptime());
data.getStormClusterState().update_nimbus_detail(hostPort, null);
然后Nimbus會判斷集群是否已經存在leader,如果不存在則經歷一次選舉過程,如果有,則去檢查leader的配置並同步,過程可以參照筆記stormNimbus集群
在啟動nimbus前初始化,初始化(NimbusServer.init())
1、清理一些中斷了的topology(nimbus目錄下/storm.local.dir/stormdist下存在,zk中storms/topologyid中不存在的topology): 刪除ZK上相關信息(清理tasks/topologyid; storms/topologyid; assignments/topologyid這個三個目錄)。
2、初始化任務分派會啟動一個后台線程:TopologyAssign,下面分析一下這個后台線程怎樣分配任務的:(doTopologyAssignment)
1、創建一個任務調度器(DefaultTopologyScheduler)
2、創建任務,將配置和topology的信息初始化TopologyAssignEvent,用默認的任務調度器指派任務(DefaultTopologyScheduler.assignTasks)
3、通過初始化過的TopologyAssignEvent初始化上下文(TopologyAssignContext context = prepareTopologyAssign(event)
4、在prepareTopologyAssign方法中,獲取當前所有在運行的supervisor *****!!!!!非常重要的方法,深入研究
4、 指派任務時,先根據任務分派類型(Rebalance/new/monitor)判斷需要處理的task: (TopologyAssign)
如果是new類型,則將context中緩存的所有task都加入任務列表;
如果是rebalance,將所有不在運行的task加入任務列表
如果是monitor,將所有死掉task加入任務列表
5、任務列表初始化成功后,獲取topology節點的配置信息,然后在zk上創建節點,存放superVisor和對應host信息,worker信息等
因為所有的指令通過thrift的回調都通過ServiceHandler,當調用submitTopology時,會真正調用到submitTopologyWithOpts,代碼邏輯基於thrift的框架開發
thread.setName("TopologyAssign");
thread.setDaemon(true);
thread.start();
分析一下submitTopologyWithOpts的執行過程和邏輯:
如果在pendingSubmitTopologys的隊列中有同名的topology,則拋異常
// create /local-dir/nimbus/topologyId/xxxx files
setupStormCode(conf, topologyId, uploadedJarLocation, stormConf, normalizedTopology);
// generate TaskInfo for every bolt or spout in ZK
// /ZK/tasks/topoologyId/xxx
setupZkTaskInfo(conf, topologyId, stormClusterState);
// make assignments for a topology
LOG.info("Submit for " + topologyName + " with conf " + serializedConf);
makeAssignment(topologyName, topologyId, options.get_initial_status());
// when make assignment for a topology,so remove the topologyid form
// pendingSubmitTopologys
data.getPendingSubmitTopologys().remove(topologyId);
// push start event after startup
StartTopologyEvent startEvent = new StartTopologyEvent();
this.data.getMetricRunnable().pushEvent(startEvent);
notifyTopologyActionListener(topologyName, "submitTopology");
將startEvent添加到隊列之后,觸發submitTopology的action.
private void init(Map conf) throws Exception {
NimbusUtils.cleanupCorruptTopologies(data);
initTopologyAssign();
initTopologyStatus();
initCleaner(conf);
serviceHandler = new ServiceHandler(data);
if (!data.isLocalMode()) {
//data.startMetricThreads();
initMonitor(conf);
initThrift(conf);
}
}
Supervisor:
* Supevisor workflow 1. write SupervisorInfo to ZK
*
* 2. Every 10 seconds run SynchronizeSupervisor 2.1 download new topology 2.2 release useless worker 2.3 assgin new task to /local-dir/supervisor/localstate
* 2.4 add one syncProcesses event
*
* 3. Every supervisor.monitor.frequency.secs run SyncProcesses 3.1 kill useless worker 3.2 start new worker
*
* 4. create heartbeat thread every supervisor.heartbeat.frequency.secs, write SupervisorInfo to ZK
1、supervisor清空臨時目錄和文件等,然后創建心跳對象,將supervisor_id,ip,端口等更新到心跳對象,然后調用update方法在zookeep上創建臨時節點,用以保存supervisor的信息,在zk上注冊完成后,創建supervisor的同步線程,同步線程中創建worker
supervisor啟動worker過程:
主要類: SyncProcessEvent
public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds) {
LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(lastTime));
lastTime = TimeUtils.current_time_secs();
try {
/**
* Step 1: get assigned tasks from localstat Map<port(type Integer), LocalAssignment>
*/
if (localAssignments == null) {
localAssignments = new HashMap<>();
}
LOG.debug("Assigned tasks: " + localAssignments);
/**
* Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat Map<workerid [WorkerHeartbeat, state]>
*/
Map<String, StateHeartbeat> localWorkerStats;
try {
localWorkerStats = getLocalWorkerStats(conf, localState, localAssignments);
} catch (Exception e) {
LOG.error("Failed to get Local worker stats");
throw e;
}
LOG.debug("Allocated: " + localWorkerStats);
/**
* Step 3: kill Invalid Workers and remove killed worker from localWorkerStats
*/
Map<String, Integer> taskCleaupTimeoutMap;
Set<Integer> keepPorts = null;
try {
taskCleaupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
keepPorts = killUselessWorkers(localWorkerStats, localAssignments, taskCleaupTimeoutMap);
localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleaupTimeoutMap);
} catch (IOException e) {
LOG.error("Failed to kill workers", e);
}
// check new workers
checkNewWorkers(conf);
// check which topology need update
checkNeedUpdateTopologys(localWorkerStats, localAssignments);
// start new workers
startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds);
} catch (Exception e) {
LOG.error("Failed Sync Process", e);
// throw e
}
}
TopologySubmitter: