storm簡介
Storm是一個分布式實時流式計算平台,支持水平擴展,通過追加機器就能提供並發數進而提高處理能力;同時具備自動容錯機制,能自動處理進程、機器、網絡等異常。
它可以很方便地對流式數據進行實時處理和分析,能運用在實時分析、在線數據挖掘、持續計算以及分布式 RPC 等場景下。Storm 的實時性可以使得數據從收集到處理展示在秒級別內完成,從而為業務方決策提供實時的數據支持。
storm vs spark streaming
storm適用場景
- 需要純實時,不能忍受1秒以上延遲的場景下使用,比如金融系統
- 對於延遲需求很高的純粹的流處理工作負載
- 需求主要集中在流處理與CEP(即復雜事件處理)式處理層面
- 若還需要針對高峰低峰時間段,動態調整實時計算程序的並行度,以最大限度利用集群資源(通常是在小型公司,集群資源緊張的情況),也可以考慮用Storm
- 如果一個大數據應用系統,它就是純粹的實時計算,不需要在中間執行SQL交互式查詢、復雜的transformation算子等,那么用Storm是比較好的選擇
spark streaming適用場景
- 如果對上述適用於Storm的幾點,一條都不滿足的實時場景,即:不要求純實時,不要求動態調整並行度等,那么可以考慮使用Spark Streaming
- 考慮使用Spark Streaming最主要的一個因素,應該是針對整個項目進行宏觀的考慮,即:如果一個項目除了實時計算之外,還包括了離線批處理、交互式查詢等業務功能,而且實時計算中,可能還會牽扯到高延遲批處理、交互式查詢等功能,那么就應該首選Spark生態,用Spark Core開發離線批處理,用Spark SQL開發交互式查詢,用Spark Streaming開發實時計算,三者可以無縫整合,給系統提供非常高的可擴展性 Spark Streaming與Storm的優劣分析事實上,Spark Streaming絕對談不上比Storm優秀。
- 必須利用交互式shell通過API調用實現數據探索
技術特點對比
對比項 | storm | spark |
---|---|---|
處理方式 | 流式數據處理、移動數據(數據流入計算節點) | 批處理數據、移動計算(針對數據形成任務進行計算) |
延遲性 | >=100ms | 2s左右 |
吞吐量 | Low | High |
容錯性 | ack組件進行數據流的跟蹤,開銷大 | 通過lineage以及在內存維護兩份數據備份進行容錯 |
事務性 | 通過跟蹤機制能保證每個記錄至少被處理一次,如果需要保證狀態只更新一次的話,需要由用戶自己來實現。 | 保證數據只被處理一次,並且是在批處理的層次級別。對於statefull的計算,對事務性比較高的話,Spark streaming要更好一些。 |
動態調整並行度 | 支持 | 不支持 |
數據處理保證 | at least once(實現采用record-level acknowledgments),Trident可以支持storm 提供exactly once語義。 | exactly once(實現采用Chandy-Lamport 算法,即marker-checkpoint ) |
如果對延遲要求不高的情況下,建議使用Spark Streaming,豐富的高級API,使用簡單,天然對接Spark生態棧中的其他組件,吞吐量大,部署簡單,UI界面也做的更加智能,社區活躍度較高,有問題響應速度也是比較快的,比較適合做流式的ETL,而且Spark的發展勢頭也是有目共睹的,相信未來性能和功能將會更加完善。
storm基本概念
storm做使用過程中,雖然定義了一些基本概念,其實只定義了一些接口,讓大家去實現即可。spark streaming相當於是定義了一些數據結構,需要大家靈活掌握和使用,入門難度相對比較大。
tuple元組
tuple是storm的主要數據結構,並且是storm中使用的最基本單元、數據模型和元組
tuple描述
tuple就是一個值列表,tuple中的值可以是任何類型的,動態類型的tuple的fields可以不用聲明,默認情況下,storm中的tuple支持私有類型、字符串、字節數組等作為它的字段值,如果使用其他類型,就需要序列化該類型。
tuple的字段默認類型有:integer、float、double、long、short、string、byte、binary
tuple可以理解成鍵值對,例如、創建一個bolt要發送2個字段(命名為double和triple),其中鍵就是定義declareOutputFields方法中的fields對象,值就是在emit方法中發送的values對象。
spout:接收數據的入口
- 繼承BaseRichSpout(無多余代碼,推薦)/實現IRichSpout接口
- open(Map conf, TopologyContext context, SpoutOutputCollector collector):初始化
- nextTuple():通過output collector發射emit元組tuples.它不能阻塞,當沒有數據發送時,spout將sleep休眠短暫的時間。
- declareOutputFields():聲明傳遞的數據字段名稱。
//demo
public class AccessPointSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
String message = "Test Message!";
collector.emit(new Values(message));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
}
發送消息用的emit()在這里提供了四種不同的實現方式,但是要注意的是,只有提供了messageId參數,Storm才會追蹤這條消息是否發送成功。
發射tuple時,如果spout提供一個message-id,則通過這個id來追蹤該tuple
接下來,storm跟蹤該tuple的樹形結構是否成功創建,並根據messageid調用spout中的ack函數,以確認tuple是否被完全處理,如果tuple超時,則調用spout的fail方法。
bolt
bolt是數據數據的最小單元。
- Bolts的主要方法是execute,它以一個tuple作為輸入,bolts使用OutputCollector來發射tuple,bolts必須要為它處理的每一個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。
- 一般處理流程:接收數據tuple-->處理數據-->發射數據(可選),處理完成后要發送ack通知storm.
通常情況下,實現一個Bolt,可以實現IRichBolt接口或繼承BaseRichBolt,如果不想自己處理結果反饋,可以實現IBasicBolt接口或繼承BaseBasicBolt,它實際上相當於自動做掉了prepare方法和collector.emit.ack(inputTuple);
topology
Storm中的拓撲,實際上就是一個有向圖的計算。拓撲中節點包含數據的邏輯處理;節點之間的邊顯示數據如何在節點直接流動。簡單拓撲圖如下所示:
復雜拓撲:
Understanding the Parallelism of a Storm Topology
理解 Storm 拓撲的並行度(parallelism)概念
在一個 Storm 集群中,Storm 主要通過以下三個部件來運行拓撲:
- 工作進程(worker processes)
- 執行器(executors)
- 任務(tasks)
它們關系如下圖所示:
在 Worker 中運行的是拓撲的一個子集。一個 worker 進程是從屬於某一個特定的拓撲的,在 worker 進程中會運行一個或者多個與拓撲中的組件相關聯的 executor。一個運行中的拓撲就是由這些運行於 Storm 集群中的很多機器上的進程組成的。
一個 executor 是由 worker 進程生成的一個線程。在 executor 中可能會有一個或者多個 task,這些 task 都是為同一個組件(spout 或者 bolt)服務的。
task 是實際執行數據處理的最小工作單元(注意,task 並不是線程) —— 在你的代碼中實現的每個 spout 或者 bolt 都會在集群中運行很多個 task。在拓撲的整個生命周期中每個組件的 task 數量都是保持不變的,不過每個組件的 executor 數量卻是有可能會隨着時間變化。在默認情況下 task 的數量是和 executor 的數量一樣的,也就是說,默認情況下 Storm 會在每個線程上運行一個 task。
拓撲示例
Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout");
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt");
StormSubmitter.submitTopology(
"mytopology",
conf,
topologyBuilder.createTopology()
);
圖中是一個包含有兩個 worker 進程的拓撲。其中,藍色的 BlueSpout 有兩個 executor,每個 executor 中有一個 task,並行度為 2;綠色的 GreenBolt 有兩個 executor,每個 executor 有兩個 task,並行度也為2;而黃色的YellowBolt 有 6 個 executor,每個 executor 中有一個 task,並行度為 6,因此,這個拓撲的總並行度就是 2 + 2 + 6 = 10。具體分配到每個 worker 就有 10 / 2 = 5 個 executor。
它們之間的關系如下所示:
- Topology----(N)--work processes----(N)--executors(threads)----(N)--tasks(default one task per executor,也可以一個executor跑多個task,task對應spout、Bolt)
總並行度等於總executors
emit中的幾個ID
//tuple是必傳,其他可以為空
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
return _delegate.emit(streamId, tuple, messageId);
}
- componentId:spout.bolt(setspout()、setBolt()) ----名稱
- tuple: streamId:一般省略 (每個消息流在定義的時候會被分配給一個id,因為單向消息流使用的相當普遍, OutputFieldsDeclarer定義了一些方法讓你可以定義一個stream而不用指定這個id。在這種情況下這個stream會分配個值為‘default’默認的id )
- messageId:emit() 作用:如果指定了,則是可靠的spout,tuple被處理完后會調用ack or fail。
_collector.emit(new Values(sentence), sentence);
_collector.emit(new Values(sentence)),則后面需要
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
Stream Grouping
Part of defining a topology is specifying for each bolt which streams it should receive as input. A stream grouping defines how that stream should be partitioned among the bolt's tasks.拓撲中規定了Bolt接收哪個流作為輸入數據。流分組定義做bolt中如何對流分組。真實的流分組情況如下所示:
- Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
- Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.
- Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.
- All grouping: The stream is replicated across all the bolt's tasks. Use this grouping with care.
- Global grouping: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
- None grouping: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).
- Direct grouping: This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).
- Local or shuffle grouping: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.
Storm里面有8種類型的stream grouping:
- Shuffle Grouping: 隨機分組, 隨機派發stream里面的tuple, 保證每個bolt接收到的tuple數目相同。
- Fields Grouping:按字段分組, 比如按userid來分組, 具有同樣userid的tuple會被分到相同的Bolts, 而不同的userid則會被分配到不同的Bolts。
- Partial Key grouping:The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.它是部分按字段分組,和按字段分組類似,但是能負載均衡到多個bolts.當輸入數據傾斜,資源利用率更好。
- All Grouping: 廣播發送, 對於每一個tuple, 所有的Bolts都會收到。
- Global Grouping: 全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。
- Non Grouping: 不分組, 這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執行。
- Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者決定由消息接收者的哪個task處理這個消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來或者處理它的消息的taskid (OutputCollector.emit方法也會返回taskid)
-
- Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)。
- Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。
storm集群的組件
storm代碼如何打jar包
這里不能像MR一樣,將需要jar包放在jar包lib目錄下。
本人嘗試了4種方式:
- 把jar放入~/.storm. The storm jars and configs in ~/.storm are put on the classpath. ----此種方式不行
- fatjar,要指定mainclass
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!--<mainClass>com.ziyun.storm.accesspoint.AccessPointTopo</mainClass>-->
<mainClass />
<!--<mainClass>com.ziyun.mq.demo.Publisher</mainClass>-->
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>${project.artifactId}-${project.version}</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
- 瘦jar, 依賴jar要在集群中分發,需要對jar包進行細致管理和分發。
<!--依賴jar包單獨生成到指定目錄-->
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
- 生成一個zip包,(打包時storm代碼和依賴jar包分成兩個文件夾),上傳代碼后解壓。再配合這兩個參數一起使用,即將依賴jar包添加到classpath中。
storm jar /ddhome/usr/job/storm/zy-storm-accesspoint-1.0.0.jar com.ziyun.storm.accesspoint.AccessPointTopo --jars "./lib/*"
個人建議:改寫python腳本(storm/bin/storm.py),將jar包中lib加入classpath,如同MR將依賴jar包放在父jar的lib目錄下即可運行。
注意:跟端口有關的操作,要查看提供方的防火牆端口是否開放!!!
參考文獻
- storm Tutorial
- Storm 的可靠性保證測試--美團
- Flink,Storm,Spark Streaming三種流框架的對比分析
- Spark、Hadoop、Storm對比
- 流處理旅程——storm之tuple介紹
- Storm tuple發送機制
- Problems running Storm with additional classpath
tips:本文屬於自己學習和實踐過程的記錄,很多圖和文字都粘貼自網上文章,沒有注明引用請包涵!如有任何問題請留言或郵件通知,我會及時回復。