Strom的結構
Storm與傳統關系型數據庫
傳統關系型數據庫是先存后計算,而storm則是先算后存,甚至不存
傳統關系型數據庫很難部署實時計算,只能部署定時任務統計分析窗口數據
關系型數據庫重視事務,並發控制,相對來說Storm比較簡陋
Storm不Hadoop,Spark等是流行的大數據方案
與Storm關系密切的語言:核心代碼用clojure書寫,實用程序用python開發,使用java開發拓撲
topology
Storm集群中有兩種節點,一種是控制節點(Nimbus節點),另一種是工作節點(Supervisor節點)。所有Topology任務的 提交必須在Storm客戶端節點上進行(需要配置 storm.yaml文件),由Nimbus節點分配給其他Supervisor節點進行處理。 Nimbus節點首先將提交的Topology進行分片,分成一個個的Task,並將Task和Supervisor相關的信息提交到 zookeeper集群上,Supervisor會去zookeeper集群上認領自己的Task,通知自己的Worker進程進行Task的處理。
和同樣是計算框架的MapReduce相比,MapReduce集群上運行的是Job,而Storm集群上運行的是Topology。但是Job在運行結束之后會自行結束,Topology卻只能被手動的kill掉,否則會一直運行下去
Storm不處理計算結果的保存,這是應用代碼需要負責的事情,如果數據不大,你可以簡單地保存在內存里,也可以每次都更新數據庫,也可以采用NoSQL存儲。這部分事情完全交給用戶。
數據存儲之后的展現,也是你需要自己處理的,storm UI 只提供對topology的監控和統計。
總體的Topology處理流程圖為:
zookeeper集群
storm使用zookeeper來協調整個集群, 但是要注意的是storm並不用zookeeper來傳遞消息。所以zookeeper上的負載是非常低的,單個節點的zookeeper在大多數情況下 都已經足夠了, 但是如果你要部署大一點的storm集群, 那么你需要的zookeeper也要大一點。關於如何部署zookeeper,可以看http://zookeeper.apache.org/doc /r3.3.3/zookeeperAdmin.html
部署zookeeper有些需要注意的地方:
1、對zookeeper做好監控非常重要, zookeeper是fail-fast的系統,只要出現什么錯誤就會退出, 所以實際場景中要監控,更多細節看http://zookeeper.apache.org/doc/r3.3.3 /zookeeperAdmin.html#sc_supervision
2、實際場景中要配置一個cron job來壓縮zookeeper的數據和業務日志。zookeeper自己是不會去壓縮這些的,所以你如果不設置一個cron job, 那么你很快就會發現磁盤不夠用了,更多細節可以查看http://zookeeper.apache.org/doc/r3.3.3 /zookeeperAdmin.html#sc_maintenance
Component
Storm中,Spout和Bolt都是Component。所以,Storm定義了一個名叫IComponent的總接口
全家普如下:綠色部分是我們最常用、比較簡單的部分。紅色部分是與事務相關的
Spout
Spout是Stream的消息產生源, Spout組件的實現可以通過繼承BaseRichSpout類或者其他Spout類來完成,也可以通過實現IRichSpout接口來實現
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
open()方法 -- 初始化方法
close() -- 在該spout將要關閉時調用。但是不保證其一定被調用,因為在集群中supervisor節點,可以使用kill -9來殺死worker進程。只有當Storm是在本地模式下運行,如果是發送停止命令,可以保證close的執行
ack(Object msgId) -- 成功處理tuple時回調的方法,通常情況下,此方法的實現是將消息隊列中的消息移除,防止消息重放
fail(Object msgId) -- 處理tuple失敗時回調的方法,通常情況下,此方法的實現是將消息放回消息隊列中然后在稍后時間里重放
nextTuple() -- 這是Spout類中最重要的一個方法。發射一個Tuple到Topology都是通過這個方法來實現的。調用此方法時,storm向spout發出請求, 讓spout發出元組(tuple)到輸出器(ouput collector)。這種方法應該是非阻塞的,所以spout如果沒有元組發出,這個方法應該返回。nextTuple、ack 和fail 都在spout任務的同一個線程中被循環調用。 當沒有元組的發射時,應該讓nextTuple睡眠一個很短的時間(如一毫秒),以免浪費太多的CPU。
繼承了BaseRichSpout后,不用實現close、 activate、 deactivate、 ack、 fail 和 getComponentConfiguration 方法,只關心最基本核心的部分。
通常情況下(Shell和事務型的除外),實現一個Spout,可以直接實現接口IRichSpout,如果不想寫多余的代碼,可以直接繼承BaseRichSpout
Bolt
Bolt類接收由Spout或者其他上游Bolt類發來的Tuple,對其進行處理。Bolt組件的實現可以通過繼承BasicRichBolt類或者IRichBolt接口等來完成
prepare方法 -- 此方法和Spout中的open方法類似,在集群中一個worker中的task初始化時調用。 它提供了bolt執行的環境
declareOutputFields方法 -- 用於聲明當前Bolt發送的Tuple中包含的字段(field),和Spout中類似
cleanup方法 -- 同ISpout的close方法,在關閉前調用。同樣不保證其一定執行。
execute方法 -- 這是Bolt中最關鍵的一個方法,對於Tuple的處理都可以放到此方法中進行。具體的發送是通過emit方法來完成的。execute接受一個 tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果。
Storm提供了IBasicBolt接口,其目的就是實現該接口的Bolt不用在代碼中提供反饋結果了,Storm內部會自動反饋成功。如果你確實要反饋失敗,可以拋出FailedException
通常情況下,實現一個Bolt,可以實現IRichBolt接口或繼承BaseRichBolt,如果不想自己處理結果反饋,可以實現 IBasicBolt接口或繼承BaseBasicBolt,它實際上相當於自動實現了collector.emit.ack(inputTuple)
Topology運行流程
(1)Storm提交后,會把代碼首先存放到Nimbus節點的inbox目錄下,之后,會把當前Storm運行的配置生成一個 stormconf.ser文件放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化之后的Topology代碼文件
(2) 在設定Topology所關聯的Spouts和Bolts時,可以同時設置當前Spout和Bolt的executor數目和task數目,默認情況下, 一個Topology的task的總和是和executor的總和一致的。之后,系統根據worker的數目,盡量平均的分配這些task的執行。 worker在哪個supervisor節點上運行是由storm本身決定的
(3)任務分配好之后,Nimbus節點會將任務的信息提交到zookeeper集群,同時在zookeeper集群中會有workerbeats節點,這里存儲了當前Topology的所有worker進程的心跳信息
(4)Supervisor 節點會不斷的輪詢zookeeper集群,在zookeeper的assignments節點中保存了所有Topology的任務分配信息、代碼存儲目 錄、任務之間的關聯關系等,Supervisor通過輪詢此節點的內容,來領取自己的任務,啟動worker進程運行
(5)一個Topology運行之后,就會不斷的通過Spouts來發送Stream流,通過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。
最后一步會不間斷的執行,除非手動結束Topology。
Topology運行方式
在開始創建項目之前,了解Storm的操作模式(operation modes)是很重要的。 Storm有兩種運行方式
本地運行的提交方式,例:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
分布式提交方式,例:
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
需要注意的是,在Storm代碼編寫完成之后,需要打包成jar包放到Nimbus中運行,打包的時候,不需要把依賴的jar都打迚去,否則如果把依賴的 storm.jar包打進去的話,運行時會出現重復的配置文件錯誤導致Topology無法運行。因為Topology運行之前,會加載本地的 storm.yaml 配置文件。
運行的命令如下: storm jar StormTopology.jar mainclass [args]
storm守護進程的命令
Nimbus: storm nimbus 啟動nimbus守護進程
Supervisor: storm supervisor 啟動supervisor守護迚程
UI:storm ui 這將啟動stormUI的守護進程,為監測storm集群提供一個基於web的用戶界面。
DRPC: storm drpc 啟動DRPC的守護進程
storm管理命令
JAR:storm jar topology_jar topology_class [arguments...]
jar命令是用於提交一個集群拓撲.它運行指定參數的topology_class中的main()方法,上傳topology_jar到nimbus, 由nimbus發布到集群中。一旦提交,storm將激活拓撲並開始處理topology_class 中的main()方法,main()方法負責調用StormSubmitter.submitTopology()方法,並提供一個唯一的拓撲(集群)的 名。如果一個擁有該名稱的拓撲已經存在於集群中,jar命令將會失敗。常見的做法是在使用命令行參數來指定拓撲名稱,以便拓撲在提交的時候被命名。
KILL:storm kill topology_name [-w wait_time]
殺死一個拓撲,可以使用kill命令。它會以一種安全的方式銷毀一個拓撲,首先停用拓撲,在等待拓撲消息的時間段內允許拓撲完成當前的數據流。執行 kill命令時可以通過-w [等待秒數]指定拓撲停用以后的等待時間。也可以在Storm UI 界面上實現同樣的功能
Deactivate:storm deactivate topology_name
停用拓撲時,所有已分發的元組都會得到處理,spouts的nextTuple方法將不會被調用。也可以在Storm UI 界面上實現同樣的功能
Activate:storm activate topology_name
啟動一個停用的拓撲。也可以在Storm UI 界面上實現同樣的功能
Rebalance:storm rebalance topology_name [-w wait_time] [-n worker_count] [-e component_name=executer_count]...
rebalance使你重新分配集群任務。這是個很強大的命令。比如,你向一個運行中的集群增加了節點。rebalance命令將會停用拓撲,然后在相應超時時間之后重分配worker,並重啟拓撲
例:storm rebalance wordcount-topology -w 15 -n 5 -e sentence-spout=4 -e split-bolt=8
還有其他管理命令,如:Remoteconfvalue、REPL、Classpath等
新建storm項目注意事項
為了開發storm項目,你的classpath里面需要有storm的jar包。最推薦的方式是使用Maven,不使用maven的話你可以手動把storm發行版里面的所有的jar包添加到classpath
storm-starter項目使用Leiningen作為build和依賴管理工具,你可以下載這個腳本 (https://raw.githubusercontent.com/technomancy/leiningen/stable/bin /lein)來安裝Leiningen, 把它加入到你的PATH, 使它可執行。要拉取storm的所有依賴包,簡單地在項目的根目錄執行 lein deps 就可以了