Storm-源碼分析-Topology Submit-Nimbus-mk-assignments


什么是"mk-assignment”, 主要就是產生executor->node+port關系, 將executor分配到哪個node的哪個slot上(port代表slot, 一個slot可以run一個worker進程, 一個worker包含多個executor線程)

先搞清什么是executor, 參考Storm-源碼分析- Component ,Executor ,Task之間關系 

 

;; get existing assignment (just the executor->node+port map) -> default to {}
;; filter out ones which have a executor timeout
;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors should be in each slot (e.g., 4, 4, 4, 5)
;; only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
;; edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be reassigned to. worst comes to worse the executor will timeout and won't assign here next time around
(defnk mk-assignments [nimbus :scratch-topology-id nil]
  (let [conf (:conf nimbus)
        storm-cluster-state (:storm-cluster-state nimbus)
        ^INimbus inimbus (:inimbus nimbus) 
        ;; 1. 讀出所有active topology信息 (read all the topologies)
        topology-ids (.active-storms storm-cluster-state) ;;讀出所有topology的ids
        topologies (into {} (for [tid topology-ids]
                              {tid (read-topology-details nimbus tid)})) ;;{tid, TopologyDetails.}
        topologies (Topologies. topologies)
        ;; 2. 讀出當前的assignemnt情況(read all the assignments)
        assigned-topology-ids (.assignments storm-cluster-state nil) ;;已經被assign的tids
        existing-assignments (into {} (for [tid assigned-topology-ids]
                                        ;; for the topology which wants rebalance (specified by the scratch-topology-id)
                                        ;; we exclude its assignment, meaning that all the slots occupied by its assignment
                                        ;; will be treated as free slot in the scheduler code.
                                        (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
                                          {tid (.assignment-info storm-cluster-state tid nil)})))
        ;;3. 根據取到的Topology和Assignement情況, 對當前topology進行新的assignment (make the new assignments for topologies)
        topology->executor->node+port (compute-new-topology->executor->node+port
                                       nimbus
                                       existing-assignments
                                       topologies
                                       scratch-topology-id)
               
        now-secs (current-time-secs)
        
        basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
        
        ;; construct the final Assignments by adding start-times etc into it
        new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
                                        :let [existing-assignment (get existing-assignments topology-id)
                                              all-nodes (->> executor->node+port vals (map first) set)
                                              node->host (->> all-nodes
                                                              (mapcat (fn [node]
                                                                        (if-let [host (.getHostName inimbus basic-supervisor-details-map node)]
                                                                          [[node host]]
                                                                          )))
                                                              (into {}))
                                              all-node->host (merge (:node->host existing-assignment) node->host)
                                              reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port)
                                              start-times (merge (:executor->start-time-secs existing-assignment)
                                                                (into {}
                                                                      (for [id reassign-executors]
                                                                        [id now-secs]
                                                                        )))]]
                                   {topology-id (Assignment.
                                                 (master-stormdist-root conf topology-id)
                                                 (select-keys all-node->host all-nodes)
                                                 executor->node+port
                                                 start-times)}))]

    ;; tasks figure out what tasks to talk to by looking at topology at runtime
    ;; only log/set when there's been a change to the assignment
    (doseq [[topology-id assignment] new-assignments
            :let [existing-assignment (get existing-assignments topology-id)
                  topology-details (.getById topologies topology-id)]]
      (if (= existing-assignment assignment)
        (log-debug "Assignment for " topology-id " hasn't changed")
        (do
          (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
          (.set-assignment! storm-cluster-state topology-id assignment)
          )))
    (->> new-assignments
          (map (fn [[topology-id assignment]]
            (let [existing-assignment (get existing-assignments topology-id)]
              [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))] 
              )))
          (into {})
          (.assignSlots inimbus topologies))
    )) 

1. 讀出所有active topology信息

先使用active-storms去zookeeper上讀到所有active的topology的ids
然后使用read-topology-details讀出topology的更多的詳細信息,
並最終封裝成TopologyDetails, 其中包含關於topology的所有信息, 包含id, conf, topology對象, work數, component和executor關系

(active-storms [this]
  (get-children cluster-state STORMS-SUBTREE false) ;"/storms”
  )
(defn read-topology-details [nimbus storm-id]
  (let [conf (:conf nimbus)
        storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil) ;從zookeeper讀出storm-base的內容
        topology-conf (read-storm-conf conf storm-id) ;從storm本地目錄中讀出topology的配置
        topology (read-storm-topology conf storm-id) ;從storm本地目錄中讀出topology的對象(反序列化)
       executor->component (->> (compute-executor->component nimbus storm-id) ;讀出executor和component的對應關系
                                      (map-key (fn [[start-task end-task]]
                                              (ExecutorDetails. (int start-task) (int end-task)))))] ;將executor封裝成ExecutorDetials對象
    (TopologyDetails. storm-id
                      topology-conf
                      topology
                      (:num-workers storm-base)
                      executor->component
                      )))

最終將topologies信息, 封裝成Topologies, 提供根據tid或name的對topology的檢索

public class Topologies {
    Map<String, TopologyDetails> topologies;
    Map<String, String> nameToId;
}

 

2. 讀出當前的assignemnt情況

從Assignment的定義可以看出, Assignment主要就是executor和host+port的對應關系

(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])
 
image

 

StormClusterState相關的都是去Zookeeper上面讀寫數據

    (reify
     StormClusterState    
     (assignments [this callback]
        (when callback
          (reset! assignments-callback callback))
        (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback))) ;/assignments
      )

讀出所有/assignments下面的topology信息

assigned-topology-ids (.assignments storm-cluster-state nil) ;讀出所有的topology ids
existing-assignments (into {} (for [tid assigned-topology-ids] ;根據topologyid, 讀出具體的信息(scratch topology概念,需要rebalance的topology,當前assignment都已經無效, 所以不需要讀)
                                 ;; for the topology which wants rebalance (specified by the scratch-topology-id)
                                 ;; we exclude its assignment, meaning that all the slots occupied by its assignment
                                 ;; will be treated as free slot in the scheduler code.
                                 (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
                                   {tid (.assignment-info storm-cluster-state tid nil)})))
 

3. 根據取到的Topology和Assignement情況, 對當前topology進行新的assignment

主要就是調用compute-new-topology->executor->node+port, 在真正調用scheduler.schedule 之前, 需要做些准備工作

3.1 ~3.6, topology assignment情況
a. 從zk獲取topology中executors的assignment信息, 但是assignment是靜態信息.
    我們還需要知道, assign完后這些executor是否在工作, 更新executor的hb, 並找出alive-executors, 這部分assignment才是有效的assignment, 所以僅僅將alive-executors封裝生成topology->scheduler-assignment
b. 在check topology assignment中, 發現的dead slot
    對於那些沒有hb的executor, 我們認為是slot產生了問題, 稱為dead slot, 后面需要避免再往dead slot分配executor (dead slot可能有alive-executors存在)

3.7~3.8, supervisor的情況

根據supervisor的hb, 獲取當前alive的supervisor的狀況SupervisorDetails, 主要是hostname, 和allports(配置的ports – dead slots)

3.9, cluster, topology的運行態信息, 包含上面的兩點信息

cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)

 

(defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
  (let [conf (:conf nimbus)
        storm-cluster-state (:storm-cluster-state nimbus)
        ;;3.1  取出所有已經assignment的topology的executors信息
        ;;所有已經assignment的Topology所包含的executor, {t1 #([1 2] [3 3]), t2 #([1 2] [3 3])}
        topology->executors (compute-topology->executors nimbus (keys existing-assignments));;只包含存在assignment信息的, 所以新的或scratch Topology都不算
        ;;3.2 更新所有executors的heartbeats cache(更新nimbus-data的heartbeats-cache) 
        ;; update the executors heartbeats first. 
        _ (update-all-heartbeats! nimbus existing-assignments topology->executors) ;;只是為了在let中提前調用update-all-heartbeats!, 所以使用'_' 
       ;; 3.3  過濾topology->executors, 保留alive的
        topology->alive-executors (compute-topology->alive-executors nimbus
                                                                     existing-assignments
                                                                     topologies
                                                                     topology->executors
                                                                     scratch-topology-id)
        ;;3.4 找出dead slots 
        supervisor->dead-ports (compute-supervisor->dead-ports nimbus
                                                               existing-assignments
                                                               topology->executors
                                                               topology->alive-executors)
        ;;3.5 生成alive executor的SchedulerAssignment
        topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus
                                                                               existing-assignments
                                                                               topology->alive-executors)
        ;;3.6 找出missing-assignment-topologies, 需要從新assign                                          
        missing-assignment-topologies (->> topologies
                                           .getTopologies ;;返回TopologyDetials.
                                           (map (memfn getId)) ;;get topologyid
                                           (filter (fn [t]
                                                      (let [alle (get topology->executors t)
                                                            alivee (get topology->alive-executors t)]
                                                            (or (empty? alle)
                                                                (not= alle alivee)
                                                                (< (-> topology->scheduler-assignment
                                                                       (get t)
                                                                       num-used-workers )
                                                                   (-> topologies (.getById t) .getNumWorkers)
                                                                   ))
                                                            ))))
        ;;3.7 all-scheduling-slots, 找出所有supervisor在conf中已配置的slots        
        all-scheduling-slots (->> (all-scheduling-slots nimbus topologies missing-assignment-topologies)
                                  (map (fn [[node-id port]] {node-id #{port}}))
                                  (apply merge-with set/union))
        ;;3.8 生成所有supervisors的SupervisorDetails  
        supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
        ;;3.9 生成cluster        
        cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)

        ;;3.10 call scheduler.schedule to schedule all the topologies
        ;; the new assignments for all the topologies are in the cluster object.
        _ (.schedule (:scheduler nimbus) topologies cluster)
        new-scheduler-assignments (.getAssignments cluster)
        ;; add more information to convert SchedulerAssignment to Assignment
        new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)]
    ;; print some useful information.
    (doseq [[topology-id executor->node+port] new-topology->executor->node+port
            :let [old-executor->node+port (-> topology-id
                                          existing-assignments
                                          :executor->node+port)
                  reassignment (filter (fn [[executor node+port]]
                                         (and (contains? old-executor->node+port executor)
                                              (not (= node+port (old-executor->node+port executor)))))
                                       executor->node+port)]]
      (when-not (empty? reassignment)
        (let [new-slots-cnt (count (set (vals executor->node+port)))
              reassign-executors (keys reassignment)]
          (log-message "Reassigning " topology-id " to " new-slots-cnt " slots")
          (log-message "Reassign executors: " (vec reassign-executors)))))

    new-topology->executor->node+port))

3.1  取出所有已經assignment的topology的executors信息

這里的實現有些問題, compute-topology->executors會調用compute-executors重新計算一般, 其實從topologies里面直接就可以取到

 

3.2 更新所有executors的heartbeats cache(更新nimbus-data的heartbeats-cache)

具體過程是, 從Zookeeper通過get-worker-heartbeat讀出所有executors最新的heartbeats信息(通過executor->node+port可以對應到worker), 並使用swap!將最新的heartbeats信息更新到nimbus的全局變量heartbeats-cache中

 

3.3  過濾topology->executors, 保留alive的

調用compute-topology->alive-executors

(defn- compute-topology->alive-executors [nimbus existing-assignments topologies topology->executors scratch-topology-id]
  "compute a topology-id -> alive executors map"
  (into {} (for [[tid assignment] existing-assignments
                 :let [topology-details (.getById topologies tid)
                       all-executors (topology->executors tid)
                       alive-executors (if (and scratch-topology-id (= scratch-topology-id tid));;這里其實不會出現scratch-topology, 前面都已經過濾過
                                         all-executors
                                         (set (alive-executors nimbus topology-details all-executors assignment)))]]
             {tid alive-executors})))

調用alive-executors, 來通過剛剛更新的heartbeats cache來判斷executor是否alive

    (->> all-executors
        (filter (fn [executor]
          (let [start-time (get executor-start-times executor)
                nimbus-time (-> heartbeats-cache (get executor) :nimbus-time)]
            (if (and start-time
                   (or
                    (< (time-delta start-time)
                       (conf NIMBUS-TASK-LAUNCH-SECS))
                    (not nimbus-time)
                    (< (time-delta nimbus-time)
                       (conf NIMBUS-TASK-TIMEOUT-SECS))
                    ))
              true
              (do
                (log-message "Executor " storm-id ":" executor " not alive")
                false))
            )))
        doall))) ;doall很重要, 確保真正filter每個executor, 否則只會產生lazy-seq

3.4 找出dead slots

首先slot就是對node+port的抽象封裝, 一個slot可以運行一個worker, 所以在supervisor分配多少slot就可以運行多少worker
而對於executor是線程, 所以往往dead executor意味着, 這個workerslot dead.

;; TODO: need to consider all executors associated with a dead executor (in same slot) dead as well,
;; don't just rely on heartbeat being the same

調用compute-supervisor->dead-ports, 邏輯
找到dead-executors, dead-executors (set/difference all-executors alive-executors)
把dead-executors 對應的node+port都當成dead slots

public class WorkerSlot {
    String nodeId;
    int port;
}
判斷dead-slots的邏輯, 很簡單
dead-slots (->> (:executor->node+port assignment) ; [executor [node port]]
                 (filter #(contains? dead-executors (first %)))
                 vals)]] ;返回所有values組成的seq

最終返回所有dead slots, {nodeid #{port1, port2},…}

 

3.5 生成alive executor的SchedulerAssignment

“convert assignment information in zk to SchedulerAssignment, so it can be used by scheduler api”

把alive executor的assignment(executor->node+port), 轉化並封裝為SchedulerAssignmentImpl, 便於后面scheduler使用

    public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
        this.topologyId = topologyId;
        this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
    } 
SchedulerAssignmentImpl, 記錄了topology中所有executor, 以及每個executor對應的workerslot, 可見executor作為assignment的單位

3.6 找出missing-assignment-topologies, 需要從新assign (當前邏輯沒有用到, 在sechduler里面會自己判斷(判斷邏輯相同))

什么叫missing-assignment, 滿足下面任一條件

topology->executors, 其中沒有該topolgy, 說明該topology沒有assignment信息, 新的或scratch
topology->executors != topology->alive-executors, 說明有executor dead
topology->scheduler-assignment中的實際worker數小於topology配置的worker數 (可能上次assign的時候可用slot不夠, 也可能由於dead slot造成)

 

3.7 all-scheduling-slots, 找出所有supervisor在conf中已配置的slots

(defn- all-scheduling-slots
  [nimbus topologies missing-assignment-topologies]
  (let [storm-cluster-state (:storm-cluster-state nimbus)
        ^INimbus inimbus (:inimbus nimbus)        
        supervisor-infos (all-supervisor-info storm-cluster-state nil)
        supervisor-details (dofor [[id info] supervisor-infos]
                             (SupervisorDetails. id (:meta info)))

        ret (.allSlotsAvailableForScheduling inimbus
                     supervisor-details
                     topologies
                     (set missing-assignment-topologies)
                     )
        ]
    (dofor [^WorkerSlot slot ret]
      [(.getNodeId slot) (.getPort slot)]
      )))

3.7.1 all-supervisor-info

從zk上讀到每個supervisor的info, supervisor的hb, 返回{supervisorid, info}

SupervisorInfo的定義,

(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs])

參考下面設置SupervisorInfo的代碼(mk-supervisor), 可以知道每個字段的意思

(SupervisorInfo. (current-time-secs) ;;hb時間
                 (:my-hostname supervisor) ;;機器名
                 (:assignment-id supervisor) ;;assignment-id = supervisor-id, 每個supervisor生成的uuid
                 (keys @(:curr-assignment supervisor)) ;;supervisor上當前使用的ports (curr-assignment, port->executors)
                 (.getMetadata isupervisor) ;;在conf里面配置的supervisor的ports
                 (conf SUPERVISOR-SCHEDULER-META) ;;用戶在conf里面配置的supervior相關的metadata,比如name,可以任意kv
                 ((:uptime supervisor)))))] ;;closeover了supervisor啟動時間的fn, 調用可以算出uptime, 正常運行時間
 
(defn- all-supervisor-info
  ([storm-cluster-state] (all-supervisor-info storm-cluster-state nil))
  ([storm-cluster-state callback]
     (let [supervisor-ids (.supervisors storm-cluster-state callback)] ;;從zk的superviors目錄下讀出所有superviors-id
       (into {}
             (mapcat
              (fn [id]
                (if-let [info (.supervisor-info storm-cluster-state id)] ;;從zk讀取某supervisor的info
                  [[id info]]
                  ))
              supervisor-ids))
       )))

3.7.2 SupervisorDetails

將supervisor-info封裝成SupervisorDetails, (SupervisorDetails. id (:meta info)))

public class SupervisorDetails {
    String id; //supervisor-id
    /**
     * hostname of this supervisor
     */
    String host;
    Object meta; 
    /**
     * meta data configured for this supervisor
     */
    Object schedulerMeta;
    /**
     * all the ports of the supervisor
     */
    Set<Integer> allPorts;
}

3.7.3 allSlotsAvailableForScheduling
此處inimbus的實現是standalone-nimbus, 參考nimbus launch-server!的參數

(defn standalone-nimbus []
  (reify INimbus
    (prepare [this conf local-dir]
      )
    (allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
      (->> supervisors
           (mapcat (fn [^SupervisorDetails s]
                     (for [p (.getMeta s)] ;;meta里面放的是conf里面配置的ports list, 對每一個封裝成WorkerSlot
                       (WorkerSlot. (.getId s) p)))) ;;可見nodeid就是supervisorid, nnid, 而不是ip
           set ))
    (assignSlots [this topology slots]
      )
    (getForcedScheduler [this]
      nil )
    (getHostName [this supervisors node-id]
      (if-let [^SupervisorDetails supervisor (get supervisors node-id)]
        (.getHost supervisor)))
    ))

這只用到supervisors參數, 把每個supervisor中配置的workerslot取出, 合並為set返回

最終得到的是supervisor中配置的所有slots的nodeid+port的集合, {node1 #{port1 port2 port3}, node2 #{port1 port2}}

當然這只是給出了allSlotsAvailableForScheduling最簡單的實現, 可以通過更改這里的邏輯來change slots的選擇策略, 比如在某些情況下, 某些slots非available

 

3.8 生成SupervisorDetails

關鍵是填上all-ports, all-scheduling-slots – dead-ports

(defn- read-all-supervisor-details [nimbus all-scheduling-slots supervisor->dead-ports]
    (let [storm-cluster-state (:storm-cluster-state nimbus)
        supervisor-infos (all-supervisor-info storm-cluster-state)
        ;;在all-scheduling-slots中有, 但是在supervisor-infos(zk的hb)沒有的supervisor
;;什么情況下會有這種case, 當前實現all-scheduling-slots本身就來自supervisor-infos, 應該不存在這種case
        nonexistent-supervisor-slots (apply dissoc all-scheduling-slots (keys supervisor-infos)) 
        ;;生成supervisor-details, 參考前面supervisor-info和supervisor-details的定義
        all-supervisor-details (into {} (for [[sid supervisor-info] supervisor-infos
                                              :let [hostname (:hostname supervisor-info)
                                                    scheduler-meta (:scheduler-meta supervisor-info)
                                                    dead-ports (supervisor->dead-ports sid)
                                                    ;; hide the dead-ports from the all-ports
                                                    ;; these dead-ports can be reused in next round of assignments
                                                    all-ports (-> (get all-scheduling-slots sid)
                                                                  (set/difference dead-ports) ;;去除dead-ports,
                                                                  ((fn [ports] (map int ports))))
                                                    supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
                                          {sid supervisor-details}))]
    (merge all-supervisor-details 
           (into {}
              (for [[sid ports] nonexistent-supervisor-slots]
                [sid (SupervisorDetails. sid nil ports)]))
           )))

3.9 生成cluster

package backtype.storm.scheduler;
public class Cluster {
    /**
     * key: supervisor id, value: supervisor details
     */
    private Map<String, SupervisorDetails>   supervisors;
    /**
     * key: topologyId, value: topology's current assignments.
     */
    private Map<String, SchedulerAssignmentImpl> assignments;

    /**
     * a map from hostname to supervisor id.
     */
    private Map<String, List<String>>        hostToId;
    
    private Set<String> blackListedHosts = new HashSet<String>();
    private INimbus inimbus;
}

3.10 調用scheduler.schedule

Storm-源碼分析- Scheduler


3.11 轉化new assignment的格式, 打印相應的提示信息


調用compute-topology->executor->node+port, "convert {topology-id -> SchedulerAssignment} to {topology-id -> {executor [node port]}}"

和existing-assignments進行比較, 打印出reassignment的結果

 

4. 將新的assignment結果存儲到Zookeeper

根據Assignment的定義, 除了executor->node+port以外, 還有些輔助信息, 比如start-time

(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])

所以首先補充這些輔助信息, 主要就是更新reassign-executors的start time, 並封裝成Assignment record

如果新的assignment有變化, 更新到Zookeeper上

(.set-assignment! storm-cluster-state topology-id assignment)

最終調用INimbus.assignSlots, 用於在zookeeper上assignment change之后, 做后續處理

而standalone-nimbus中assignSlots沒有做實際的操作


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM