Component包含Executor(threads)的個數
在StormBase中的num-executors, 這對應於你寫topology代碼時, 為每個component指定的並發數(通過setBolt和setSpout)
Component和Task的對應關系, (storm-task-info)
默認你可以不指定task數, 那么task和executor為1:1關系
當然也可以通過ComponentConfigurationDeclarer#setNumTasks()去設置TOPOLOGY_TASKS
這個函數, 首先讀出所有components
對每個component, 讀出ComponentComm中的json_conf, 然后從里面讀出上面設置的TOPOLOGY_TASKS
最后用遞增序列產生taskid, 並最終生成component和task的對應關系
如果不設置, task數等於executor數, 后面分配就很容易, 否則就涉及task分配問題
(defn storm-task-info "Returns map from task -> component id" [^StormTopology user-topology storm-conf] (->> (system-topology! storm-conf user-topology) all-components (map-val (comp #(get % TOPOLOGY-TASKS) component-conf)) (sort-by first) (mapcat (fn [[c num-tasks]] (repeat num-tasks c))) (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1))) (into {}) ))
首先產生system-topology!, 因為system-topology!會增加系統components, acker, systemBolt, metricsBlot, 這些也都是topology中不可缺少的部分, 所以單純使用用戶定義的topology是不夠的
然后取出topology里面所有component
(defn all-components [^StormTopology topology]
(apply merge {}
(for [f thrift/STORM-TOPOLOGY-FIELDS]
(.getFieldValue topology f)
)))
所以結果就是下面3個map, merge在一起的集合
struct StormTopology {
//ids must be unique across maps
// #workers to use is in conf
1: required map<string, SpoutSpec> spouts;
2: required map<string, Bolt> bolts;
3: required map<string, StateSpoutSpec> state_spouts;
}
使用map-value對map中的component進行如下操作
取出component里面的ComponentComm對象(.getcommon), 並讀出json_conf, 最終讀出conf中TOPOLOGY-TASKS
(defn component-conf [component]
(->> component
.get_common
.get_json_conf
from-json))
struct ComponentCommon {
1: required map<GlobalStreamId, Grouping> inputs;
2: required map<string, StreamInfo> streams; //key is stream id
3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
// component specific configuration
4: optional string json_conf;
}
輸出{component-string:tasknum}, 按component-string排序, 再進行mapcat
{c1 3, c2 2, c3 1} –> (c1,c1,c1,c2,c2,c3)
再加上遞增編號, into到map, {1 c1, 2 c1, 3 c1, 4 c2, 5 c2, 6 c3}
Topology中, Task和Executor的分配關系, (compute-executors)
上面已經產生, component->executors 和 component->task, 現在根據component對應的task和executor個數進行task分配(到executor)
默認是1:1分配, 但如果設置了task數,
比如對於c1, 2個executor, 3個tasks [1 2 3], 分配結果就是['(1 2) ‘(3)]
最終to-executor-id, 列出每個executor中task id的范圍([(first task-ids) (last task-ids)])
(defn- compute-executors [nimbus storm-id]
(let [conf (:conf nimbus)
storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
component->executors (:component->executors storm-base) ;從storm-base中獲取每個component配置的(executor)線程數
storm-conf (read-storm-conf conf storm-id)
topology (read-storm-topology conf storm-id)
task->component (storm-task-info topology storm-conf)]
(->> (storm-task-info topology storm-conf)
reverse-map ;{“c1” [1,2,3], “c2” [4,5], “c3” 6}
(map-val sort)
(join-maps component->executors) ; {"c1" ‘(2 [1 2 3]), "c2" ‘(2 [4 5]), "c3" ‘(1 6)}
(map-val (partial apply partition-fixed)) ; {"c1" ['(1 2) '(3)], "c2" ['(4) '(5)], "c3" ['(6)]}
(mapcat second) ;((1 2) (3) (4) (5) (6))
(map to-executor-id) ;([1 2] [3 3] [4 4] [5 5] [6 6])
)))
partition-fixed, 將aseq分成max-num-chunks份
思路,
7整除3, 2余1
所以, 分成3份, 每份2個, 還余一個
把這個放到第一份里面,
所以, 有1份的2+1個, 有(3-1)份的2個這里使用integer-divided(7 3), ([3 1] [2 2]) , 剛開始比較難理解, 其實函數名起的不好, 這里不光除, 已經做了划分
返回的結果的意思是, 1份3個, 2份2個接着就是使用split-at, loop划分
(defn partition-fixed “(partition-fixed 3 '( 1 2 3 4 5 6 7)) [(1 2 3) (4 5) (6 7)]” [max-num-chunks aseq] (if (zero? max-num-chunks) [] (let [chunks (->> (integer-divided (count aseq) max-num-chunks) (#(dissoc % 0)) (sort-by (comp - first)) (mapcat (fn [[size amt]] (repeat amt size))) )] (loop [result [] [chunk & rest-chunks] chunks data aseq] (if (nil? chunk) result (let [[c rest-data] (split-at chunk data)] (recur (conj result c) rest-chunks rest-data)))))))
Topology中, Executor和component的關系, (compute-executor->component ), 根據(executor:task)關系和(task:component)關系join
(defn- compute-executor->component [nimbus storm-id]
(let [conf (:conf nimbus)
executors (compute-executors nimbus storm-id)
topology (read-storm-topology conf storm-id)
storm-conf (read-storm-conf conf storm-id)
task->component (storm-task-info topology storm-conf)
executor->component (into {} (for [executor executors
:let [start-task (first executor)
component (task->component start-task)]]
{executor component}))]
executor->component)) ;{[1 2] “c1”, [3 3] “c1”, [4 4] “c2”, [5 5] “c2”, [6 6] “c3”}
最終目的就是獲得executor->component關系, 用於后面的assignment, 其中每個executor包含task范圍[starttask, endtask]
