Storm-源碼分析- Scheduler (backtype.storm.scheduler)


首先看看IScheduler接口的定義, 主要實現兩個接口, prepare和schedule

對於schedule的參數注釋寫的非常清楚,
topologies包含所有topology的靜態信息, 而cluster中包含了topology的運行態信息
根據他們就可以來判斷如何assignment

package backtype.storm.scheduler;
import java.util.Map;
public interface IScheduler {   
    void prepare(Map conf);
    
    /**
     * Set assignments for the topologies which needs scheduling. The new assignments is available 
     * through <code>cluster.getAssignments()</code>
     *
     *@param topologies, all the topologies in the cluster, some of them need schedule. Topologies object here 
     *       only contain static information about topologies. Information like assignments, slots are all in
     *       the <code>cluster</code>object.
     *@param cluster, the cluster these topologies are running in. <code>cluster</code> contains everything user
     *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current 
     *       assignments for all the topologies etc. User can set the new assignment for topologies using
     *       <code>cluster.setAssignmentById</code>
     */
    void schedule(Topologies topologies, Cluster cluster);
}

DefaultScheduler

DefaultScheduler, 實現backtype.storm.scheduler.IScheduler接口

(ns backtype.storm.scheduler.DefaultScheduler  
  (:gen-class
    :implements [backtype.storm.scheduler.IScheduler]))
(defn -prepare [this conf]
  )
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
  (default-schedule topologies cluster))

 

下面看看default-schedule做了些什么?

(defn default-schedule [^Topologies topologies ^Cluster cluster]
  (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)] ;;1,取出需要scheduling的topologies
    (doseq [^TopologyDetails topology needs-scheduling-topologies
            :let [topology-id (.getId topology)
                  available-slots (->> (.getAvailableSlots cluster)
                                       (map #(vector (.getNodeId %) (.getPort %))))
                  all-executors (->> topology
                                     .getExecutors
                                     (map #(vector (.getStartTask %) (.getEndTask %)))
                                     set)
                  alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
                  alive-executors (->> alive-assigned vals (apply concat) set)
                  can-reassign-slots (slots-can-reassign cluster (keys alive-assigned))
                  total-slots-to-use (min (.getNumWorkers topology)
                                          (+ (count can-reassign-slots) (count available-slots)))
                  bad-slots (if (or (> total-slots-to-use (count alive-assigned)) 
                                    (not= alive-executors all-executors))
                                (bad-slots alive-assigned (count all-executors) total-slots-to-use)
                                [])]]
      (.freeSlots cluster bad-slots)
      (EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))
 

1,取出需要scheduling的topologies (cluster.needsScheduling)

判斷是否需要scheduling, 滿足(or)
現在已經assigned的worker數小於配置的worker數 (dead slot或上次分配是可用slot不夠)
all executors > assigned executors(新的topology或已分配的executor dead(沒有hb))
public boolean needsScheduling(TopologyDetails topology) {
    int desiredNumWorkers = topology.getNumWorkers();
    int assignedNumWorkers = this.getAssignedNumWorkers(topology);
    if (desiredNumWorkers > assignedNumWorkers) {
        return true;
    }
    return this.getUnassignedExecutors(topology).size() > 0;
}

2, 對於每個需要scheduling的topology

2.1 找出cluster中所有可用的slots, 從每個SupervisorDetails中讀出可用的slots(即assignedports - usedPorts)

available-slots, ([node1 port2] [node2 port2])

2.2 讀出該topology所有的executors
all-executors , ([1 3] [4 4] [5 7])

2.3 從cluster中讀出該topology的assignment關系

因為前面只將alive executors的assignment關系記錄到cluster中, 所以從alive-assigned可用推出alive-executors
alive-assigned, node+port->executor, {[node1 port1] [1 3], [node2 port1] [5 7]}
alive-executors, ([1 3] [5 7])

2.4 找出topology當前運行的slots中哪些是可用的 (slots-can-reassign)
alive executors是有可能跑在dead slot上的, 所以不是所有alive executors的slot都可用
reassign的條件, node不在cluster的blacklist, port是否在supervisor的allPort中(即不是dead port), 即這個slot是可用的
可用的slot, 就可以用於reassign

2.5 total-slots-to-use應該等於(available-slots + can-reassign-slots)
當然最多slots數不能大於topology配置的worker number, 在可用slot數不夠的情況下, 可能小於

2.6找出bad slots
針對不合理或bad的slots assignment關系, 找出相應的slots
並在下一步釋放掉這些不合理的slots assignment
一般兩種情況, 前一次分配時可用slots不夠, 所以沒有達到配置的數目;使用中某slot dead, 導致alive slot減少

if (or (> total-slots-to-use (count alive-assigned));;當前可用slots數大於當前assign的slots數
       (not= alive-executors all-executors));;某些executors死了, 表明肯定有壞的slots, 或新的topology, 還沒有分配
   (bad-slots alive-assigned (count all-executors) total-slots-to-use) [])

 

(defn- bad-slots [existing-slots num-executors num-workers]
  (if (= 0 num-workers)
    '()
    (let [distribution (atom (integer-divided num-executors num-workers))
          keepers (atom {})]
      (doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]]
        (when (pos? (get @distribution executor-count 0)) ;;是否在正常的distribution中可以找到, 找到說明這個slot分配合理, 需要keep
          (swap! keepers assoc node+port executor-list) ;;slot分配合理, 所以加到keeper中
          (swap! distribution update-in [executor-count] dec) ;;並且該分配比例的份數減一
          ))
      (->> @keepers
           keys
           (apply dissoc existing-slots) ;;在exisiting-slots中除去keeper, 剩下的都是bad-slots
           keys
           (map (fn [[node port]]
                  (WorkerSlot. node port)))))))

例子, 7個executor, 3個worker, 那么正常情況下, ((2 2)(3 1)), 1份3個, 2份2個
所以check所有現有的assignment, 把符合正常分配比例的加到keeper上, 比如這個case如果出現1個或2份3個都是不符合比例的
這些slot都被認為是bad slots

 

3 free bad slots

所謂的free, 就是在SchedulerAssignmentImpl中, 把所有bad slot上的executors從executorToSlot中刪除
slot只要沒有executor占用就是free

    Map<ExecutorDetails, WorkerSlot> executorToSlot; 
    /**
     * Release the slot occupied by this assignment.
     * @param slot
     */
    public void unassignBySlot(WorkerSlot slot) {
        List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
        for (ExecutorDetails executor : this.executorToSlot.keySet()) {
            WorkerSlot ws = this.executorToSlot.get(executor);
            if (ws.equals(slot)) {
                executors.add(executor);
            }
        }       
        // remove
        for (ExecutorDetails executor : executors) {
            this.executorToSlot.remove(executor);
        }
    }

4 EvenScheduler/schedule-topologies-evenly

這個function是doseq的經典應用, 兩層doseq的嵌套
第一層doseq的處理函數, 仍然是一個doseq
第二層doseq的處理函數, .assign

(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster]
  (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
    (doseq [^TopologyDetails topology needs-scheduling-topologies
            :let [topology-id (.getId topology)
                  new-assignment (schedule-topology topology cluster)
                  node+port->executors (reverse-map new-assignment)]]
      (doseq [[node+port executors] node+port->executors
              :let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port))
                    executors (for [[start-task end-task] executors]
                                (ExecutorDetails. start-task end-task))]]
        (.assign cluster slot topology-id executors)))))


4.1 調用schedule-topology

(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
  (let [topology-id (.getId topology)
        available-slots (->> (.getAvailableSlots cluster)
                             (map #(vector (.getNodeId %) (.getPort %))))
        all-executors (->> topology
                          .getExecutors
                          (map #(vector (.getStartTask %) (.getEndTask %)))
                          set)
        alive-assigned (get-alive-assigned-node+port->executors cluster topology-id) ;;必須重新計算, 因為剛剛free的slot assignment關系
        total-slots-to-use (min (.getNumWorkers topology)
                                (+ (count available-slots) (count alive-assigned)))
        reassign-slots (take (- total-slots-to-use (count alive-assigned)) ;;上一步已經把bad slot都free, 仍然alive說明改slot不需要被reassign
                             (sort-slots available-slots))
        reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned)))))
        reassignment (into {}
                           (map vector
                                reassign-executors
                                ;; for some reason it goes into infinite loop without limiting the repeat-seq
                                (repeat-seq (count reassign-executors) reassign-slots)))]
    (when-not (empty? reassignment)
      (log-message "Available slots: " (pr-str available-slots))
      )
    reassignment))

 

reassign-slots,
a. 計算現在可以用於assignment的slots數, 之所以不直接使用available-slots, 因為有worker number限制, 所以可能小於available-slots
b. sort-slots, 按照port對slots進行排序

(defn sort-slots [all-slots] ;'(["n1" "p1"] ["n1" "p2"] ["n1" "p3"] ["n2" "p1"] ["n3" "p1"] ["n3" "p2"])
  (let [split-up (vals (group-by first all-slots))]
    (apply interleave-all split-up) ;'(["n1" "p1"] ["n2" "p1"] ["n3" "p1"] ["n1" "p2"] ["n3" "p2"] ["n1" "p3"])
    ))
c.從排過序的list取前(a.)個slots, 之所以前面按port排序, 可以使executors盡量分布在不同的node上

reassign-executors, 現在沒有被assign的executors

assignment的過程非常簡單, 就是對reassign-executors 和 reassign-slots做map, 注釋中解釋為什么要加count, 其實應該是不用加的, 因為理論上是當一個coll結束就會停止, 但某種原因似乎這里會停不下來.
之所以需要repeat-seq, 是因為executors往往多於slots

(map vector #{[1,3] [4,4] [5,6]}(repeat-seq 3 '(["n1" "p1"] ["n1" "p2"])))
([[4 4] ["n1" "p1"]] [[5 6] ["n1" "p2"]] [[1 3] ["n1" "p1"]])

4.2 將新的assignment封裝成WorkerSlot和ExecutorDetails

4.3 最終將新的assignment結果放到SchedulerAssignmentImpl的executorToSlot中去

    /**
     * Assign the slot to executors.
     * @param slot
     * @param executors
     */
    public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
        for (ExecutorDetails executor : executors) {
            this.executorToSlot.put(executor, slot);
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM