【storm集群的搭建以及將開發好的拓撲提交到集群上運行的方法】
在上一篇文章中,我們的拓撲直接運行,並在程序開始時候自動啟動一個本地"集群"來運行拓撲。LocalCluster這種方式一般用於本地的開發和調試。而在實際的生產環境中,我們一般是有搭建好的storm集群,我們開發完topology后會提交到集群中的主節點nimbus,由nimbus來向supervisor分發代碼,並啟動woker來運行topology.下面我們將在本地搭建一個storm運行環境,並將開發好的WordCountTopology提交到本地"集群"上來運行。
本地"集群"搭建
- 安裝好JDK並配置系統環境變量;http://www.cnblogs.com/jonyo/p/5656386.html
- 安裝zookepeer並配置系統環境變量;【待加。。。。】
- 下載storm壓縮包,解壓放到 /usr/local/下,在.zshrc中配置相關的環境變量
export STORM_HOME=/usr/local/storm
export PATH=$PATH:$STORM_HOME/bin - 進入到storm/conf下,修改storm.yaml配置文件,在末尾添加
storm.zookeeper.servers: - "localhost" storm.local.dir: "/usr/local/storm/data" storm.zookeeper.port: 2181 nimbus.host: "localhost" ui.port: 8080 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
這個文件主要是對storm使用的zookepeer地址,集群的nimbus,supervisor等信息進行了相關配置
- 步驟1~4完成以后,按照下面就可以啟動集群了
(1)啟動zookepeer: 在/usr/local/zookeeper/bin下執行sh zkServer.sh start 接着執行sh zkServer.sh status查看啟動狀態,出現Mode: standalone即表示成功
(2)啟動nimbus進程:在/usr/local/storm/bin下執行 sh storm nimbus,會看到nimbus啟動信息
(3)啟動supervisor進程,由於都是在同一台機器上啟動的,需要重新開啟一個控制台。在/usr/local/storm/bin下執行 sh storm supervisor
(4)為了便於查看集群的信息,將ui也啟動,在/usr/local/storm/bin下執行 sh storm ui, 然后通過localhost:8080可以查看集群狀態
UI界面,可以對拓撲進行管理,查看集群狀態等等;由於沒有任何拓撲提交,可以看到Topology Summary是空的,沒有信息
(5)將之前的WordCountTopology.java文件修改如下
1 // //建立本地集群,利用LocalCluster,storm在程序啟動時會在本地自動建立一個集群,不需要用戶自己再搭建,方便本地開發和debug 2 // LocalCluster cluster = new LocalCluster(); 3 // 4 // //創建拓撲實例,並提交到本地集群進行運行 5 // cluster.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology()); 6 7 try { 8 //submitTopology方法負責發送Topology到集群,[新增的代碼] 9 StormSubmitter.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology()); 10 } catch (Exception e) { 11 e.printStackTrace(); 12 }
修改完成后,利用maven打成jar包:mvn compile mvn clean package
(6)將打出的jar包(拓撲)提交到集群上執行
storm jar /Users/.../workspace/storm-learning/target/storm-learning-1.0-SNAPSHOT.jar wordCount.WordCountTopology wordCountTest
在此打開localhost:8080會發現WordCountTopology在執行,並顯示出執行的狀態和相關信息
拓撲提交到nimbus之后都發生了什么事情?即通過storm jar xxxxx.jar xxx.Main agrs這個命令之后操作是怎么樣的?
storm jar會執行.../storm/bin目錄下的storm.py腳本文件里定義的def jar(jarfile, klass, *args),在這個腳本中,通過一系列的設置,讓后續main方法中的調用的StormSubmitter.submitTopology(name, conf, builder.createTopology())找到jar所在的地址,然后通過soket傳輸,將Jar文件上傳到nimbus,nimbus在接收到jar文件后,存放到數據目錄的inbox目錄(inbox是在用戶設置的storm.local.dir變量所指定的目錄的nimbus下)
inbox用於存放提交的Jar文件,每個Jar被重命名為stormjar+32位的uuid, 比如的的執行提交后是stormjar-a22e9169-e377-45db-9a07-a97357ccfbef.jar
stormdist存放的是啟動topology后生成的文件,每個topology都分配一個唯一的id,ID的規則是“name-計數-時間戳”。啟動后的topology的jar文件名命名為storm.jar ,而它的配置經過java序列化后存放在stormconf.ser文件,而stormcode.ser是將topology本身序列化后存放的文件。這些文件在部署的時候,supervisor會從這個目錄下載這些文件,然后在supervisor本地執行這些代碼。
進入重點,topology任務的分配過程(zookeeper路徑說明忽略root):
1.在zookeeper上創建/taskheartbeats/{storm id} 路徑,用於任務的心跳檢測。storm對zookeeper的一個重要應用就是利用zk的臨時節點做存活檢測。task將定時刷新節點的時間戳,然后nimbus會檢測這個時間戳是否超過timeout設置。
2.從topology中獲取bolts,spouts設置的並行數目以及全局配置的最大並行數,然后產生task id列表,如[1 2 3 4]
3.在zookeeper上創建/tasks/{strom id}/{task id}路徑,並存儲task信息
4.開始分配任務(內部稱為assignment), 具體步驟:
(1)從zk上獲得已有的assignment(新的toplogy當然沒有了)
(2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個worker的端口。
(3)將任務均勻地分配給可用的worker,這里有兩種情況:
(a)task數目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最終是這樣分配
{1: [host1:port1] 2 : [host2:port1],可以看到任務平均地分配在兩個worker上。
3 : [host1:port1] 4 : [host2:port1]}
(b)如果task數目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先會將woker排序,將不同host間隔排列,保證task不會全部分配到同一個worker上,也就是將worker排列成
[host1:port1 host2:port1 host1:port2 host2:port2],然后分配任務為
{1: host1:port1 , 2 : host2:port2}
(4)記錄啟動時間
(5)判斷現有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
5.啟動topology,所謂啟動,只是將zookeeper上/storms/{storm id}對應的數據里的active設置為true。
6.nimbus會檢查task的心跳,如果發現task心跳超過超時時間,那么會重新跳到第4步做re-assignment。
最后的topology任務的分配過程 參考:http://blog.csdn.net/xiaolang85/article/details/38065185