Storm入門教程
1. Storm基礎
Storm
Storm主要特點
Storm基本概念
Storm調度器
Storm配置
Guaranteeing Message Processing(消息處理保障機制)
Daemon Fault Tolerance(守護線程容錯機制)
理解Storm拓撲的並行
Tutorial
Local模式
本地模式的通用配置:
在生產環境中運行Topologies
通用配置
殺死topology
更新運行中的topology
監控topology
Local模式
本地模式模擬Storm集群用於測試和開發topologies。在本地模式運行topologies 類似於在集群中運行topologies 。
為了創建進程內的集群,可用LocalCluster類實現,如下:
import org.apache.storm.LocalCluster;
LocalCluster cluster = new LocalCluster();
你可以用LocalCluster 的submitTopology 方法提交topologies 。對於用StormSubmitter類中的方法,該類用於向Storm集群中提交topologies ,submitTopology 方法參數列表為topology名稱,topology配置和topology對象,你可以用killTopology 方法根據Topology 的名稱結束Topology ,如下所示:
cluster.shutdown();
本地模式的通用配置:
1.Config.TOPOLOGY_MAX_TASK_PARALLELISM:這個配置項是單個組件執行線程的最大值,生產環境中的topologies 並行度(成千上百個線程)較大導致負載過度,所以需要在本地模式中盡量測試。這個配置項允許你容易控制並行度。
2.Config.TOPOLOGY_DEBUG:當設置為true時,Storm將記錄任一spout或bolt發送的tuple。這對於調試尤其有用。
在生產環境中運行Topologies
在生產集群中運行Topologies和本地模式一樣,步驟如下:
1)定義topology(如果用Java那么用類TopologyBuilder來定義topology)
2)用類StormSubmitter 將topology提交到集群中。StormSubmitter 接受topology的名稱、topology的配置和topology本身。如下:
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);
3)將你的代碼及其依賴打成jar包(除了Storm jars,因為所有的worker節點上已存在類路徑上)
如果你用Maven,那么 Maven Assembly Plugin插件能為你打jar包。在pom.xml中添加如下內容;
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.path.to.main.Class</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
然后運行mvn assembly:assembly得到合適的jar包。確認jar包中排除了Storm jars因為集群中的類路徑上已經存在。
4)用storm 客戶端程序提交topology 到集群,指定jar路徑、運行的類以及參數,如下:
storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
storm jar將提交jar到集群中並配置類StormSubmitter和集群通信。在本例中,在上述例子中上傳了jar或會調用org.me.MyTopology類的主函數,參數列表為"arg1", "arg2", and "arg3".
通用配置
topology的配置項有很多,你可以在這里找到。前綴為“TOPOLOGY”的配置項可以被覆寫(集群中的其他配置項不能被覆寫)。下面是TOPOLOGY的常用配置:
1.Config.TOPOLOGY_WORKERS:改項設置了用於執行topology的worker進程數。例如,如果你設置為25,那么集群中將會有25個Java進程用於執行所有的任務。如果topology中所有的並行度為150,那么每個worker進程將有6個線程。
2.Config.TOPOLOGY_ACKER_EXECUTORS:該項配置了executors的數目,它用於跟蹤元組樹和監控spout元組已經被完全處理。Ackers 是Storm可靠性模型的一部分。如果沒有設置該項或設置為空,Storm將設置acker executor和worker的數目相等。如果這個變量設置為0,Storm將立馬響應元組只要他們從spout發送出來,不能保證可靠性。
3.Config.TOPOLOGY_MAX_SPOUT_PENDING:該項設置了單個spout 任務所能接受的元組數目(只要沒有acked的元組或失敗的都算)。強烈推薦你配置該項防止隊列持續增大。
4.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS:spout元組被認定失敗的超時時間。該項默認是30s,對於大多數topologies是有效的。
5.Config.TOPOLOGY_SERIALIZATIONS:你可以給Storm注冊更多的序列化器,這樣你就可以自定義元組類型。
殺死topology
殺死topology,如下命令:
storm kill {stormname}
stormname和你提交時的topology保持一致。
Storm不會立即殺死topology。相反,它會使所有的spouts失效,這樣它們不會發送更多的元組,然后等Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 秒后殺死所有的workers,這允許topology有足夠的時間處理正在執行的tuple。
更新運行中的topology
為了更新運行中的topology,當前唯一的選擇是殺死當前的topology然后重新提交。一個計划中的特性是用storm swap命令切換運行中的topology為新的topology,確保最小的故障時間。
監控topology
監控topology最好用Storm UI。Storm UI提供了任務中的錯誤發生信息和關於每個組件的吞吐量和延遲性能的細粒度的狀態。當然,你也可以看集群中worker的日志。