apache Storm學習之二-基本概念介紹


2.1 Storm基本概念

在運行一個Storm任務之前,需要了解一些概念:

  1. Topologies
  2. Streams
  3. Spouts
  4. Bolts
  5. Stream groupings
  6. Reliability
  7. Tasks
  8. Workers
  9. Configuration

 

  Storm集群和Hadoop集群表面上看很類似。但是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這兩者之間是非常不一樣的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。

在Storm的集群里面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面運行一個叫Nimbus后台程序,它的作用類似Hadoop里面的JobTracker。Nimbus負責在集群里面分發代碼,分配計算任務給機器, 並且監控狀態。

每一個工作節點上面運行一個叫做Supervisor的節點。Supervisor會監聽分配給它那台機器的工作,根據需要啟動/關閉工作進程。每一個工作進程執行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程組成。

  Nimbus和Supervisor之間的所有協調工作都是通過Zookeeper集群完成。另外,Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。所有的狀態要么在zookeeper里面, 要么在本地磁盤上。這也就意味着你可以用kill -9來殺死Nimbus和Supervisor進程, 然后再重啟它們,就好像什么都沒有發生過。這個設計使得Storm異常的穩定。

2.1.1 Topologies

  一個topology是spouts和bolts組成的圖, 通過stream groupings將圖中的spouts和bolts連接起來,如下圖:

  一個topology會一直運行直到你手動kill掉,Storm自動重新分配執行失敗的任務, 並且Storm可以保證你不會有數據丟失(如果開啟了高可靠性的話)。如果一些機器意外停機它上面的所有任務會被轉移到其他機器上。

運行一個topology很簡單。首先,把你所有的代碼以及所依賴的jar打進一個jar包。然后運行類似下面的這個命令:

 storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

 

這個命令會運行主類: backtype.strom.MyTopology, 參數是arg1arg2。這個類的main函數定義這個topology並且把它提交給Nimbus。storm jar負責連接到Nimbus並且上傳jar包。

Topology的定義是一個Thrift結構,並且Nimbus就是一個Thrift服務, 你可以提交由任何語言創建的topology。上面的方面是用JVM-based語言提交的最簡單的方法。

2.1.2 Streams

  消息流stream是storm里的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分布式的方式並行地創建和處理。通過對stream中tuple序列中每個字段命名來定義stream。在默認的情況下,tuple的字段類型可以是:integer,long,short, byte,string,double,float,boolean和byte array。你也可以自定義類型(只要實現相應的序列化器)。

每個消息流在定義的時候會被分配給一個id,因為單向消息流使用的相當普遍, OutputFieldsDeclarer定義了一些方法讓你可以定義一個stream而不用指定這個id。在這種情況下這個stream會分配個值為‘default’默認的id 。

Storm提供的最基本的處理stream的原語是spout和bolt。你可以實現spout和bolt提供的接口來處理你的業務邏輯。

2.1.3  Spouts

  消息源spout是Storm里面一個topology里面的消息生產者。一般來說消息源會從一個外部源讀取數據並且向topology里面發出消息:tuple。Spout可以是可靠的也可以是不可靠的。如果這個tuple沒有被storm成功處理,可靠的消息源spouts可以重新發射一個tuple, 但是不可靠的消息源spouts一旦發出一個tuple就不能重發了。

消息源可以發射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,然后使用SpoutOutputCollector來發射指定的stream。

Spout類里面最重要的方法是nextTuple。要么發射一個新的tuple到topology里面或者如果已經沒有新的tuple,簡單的返回。要注意的是nextTuple方法不能阻塞,因為storm在同一個線程上面調用所有消息源spout的方法。

  (The main method on spouts is nextTuple. nextTuple either emits a new tuple into the topology or simply returns if there are no new tuples to emit. It is imperative that nextTuple does not block for any spout implementation, because Storm calls all the spout methods on the same thread.)

另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,否則調用fail。storm只對可靠的spout調用ack和fail。

2.1.4  Bolts

  所有的消息處理邏輯被封裝在bolts里面。Bolts可以做很多事情:過濾,聚合,查詢數據庫等等。

All processing in topologies is done in bolts. Bolts can do anything from filtering, functions, aggregations, joins, talking to databases, and more. 

  Bolts可以簡單的做消息流的傳遞。復雜的消息流處理往往需要很多步驟,從而也就需要經過很多bolts。比如算出一堆圖片里面被轉發最多的圖片就至少需要兩步:第一步算出每個圖片的轉發數量。第二步找出轉發最多的前10個圖片。(如果要把這個過程做得更具有擴展性那么可能需要更多的步驟)。

  Bolts可以發射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。

  Bolts的主要方法是execute, 它以一個tuple作為輸入,bolts使用OutputCollector來發射tuple,bolts必須要為它處理的每一個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。 一般的流程是: bolts處理一個輸入tuple,  發射0個或者多個tuple, 然后調用ack通知storm自己已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack。

Resources:

2.1.5  Stream groupings

定義一個topology的其中一步是定義每個bolt接收什么樣的流作為輸入。stream grouping就是用來定義一個stream應該如果分配數據給bolts上面的多個tasks。

Storm里面有7種類型的stream grouping

  1. Shuffle Grouping: 隨機分組, 隨機派發stream里面的tuple,保證每個bolt接收到的tuple數目大致相同。(Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.)
  2. Fields Grouping:按字段分組, 比如按userid來分組, 具有同樣userid的tuple會被分到相同的Bolts里的一個task, 而不同的userid則會被分配到不同的bolts里的task。(The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.)
  3. All Grouping:廣播發送,對於每一個tuple,所有的bolts都會收到。
  4. Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.
  5.  Global Grouping:全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。
  6. Non Grouping:不分組,這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執行。(Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).)
  7. Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)。
  8. Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。

 

Resources:

 

  • TopologyBuilder: use this class to define topologies
  • InputDeclarer: this object is returned whenever setBolt is called on TopologyBuilder and is used for declaring a bolt's input streams and how those streams should be grouped
  • CoordinatedBolt: this bolt is useful for distributed RPC topologies and makes heavy use of direct streams and direct groupings

 

2.1.6  Reliability

  Storm保證每個tuple會被topology完整的執行。Storm會追蹤由每個spout tuple所產生的tuple樹(一個bolt處理一個tuple之后可能會發射別的tuple從而形成樹狀結構),並且跟蹤這棵tuple樹什么時候成功處理完。每個topology都有一個消息超時的設置,如果storm在這個超時的時間內檢測不到某個tuple樹到底有沒有執行成功, 那么topology會把這個tuple標記為執行失敗,並且過一會兒重新發射這個tuple。

  為了利用Storm的可靠性特性,在你發出一個新的tuple以及你完成處理一個tuple的時候你必須要通知storm。這一切是由OutputCollector來完成的。通過emit方法來通知一個新的tuple產生了,通過ack方法通知一個tuple處理完成了。

Storm的可靠性我們在第四章會深入介紹。

2.1.7  Tasks

  每一個spout和bolt會被當作很多task在整個集群里執行。每一個executor對應到一個線程,在這個線程上運行多個task,而stream grouping則是定義怎么從一堆task發射tuple到另外一堆task。你可以調用TopologyBuilder類的setSpout和setBolt來設置並行度(也就是有多少個task)。

2.1.8  Workers

  一個topology可能會在一個或者多個worker(工作進程)里面執行,每個worker是一個物理JVM並且執行整個topology的一部分。比如,對於並行度是300的topology來說,如果我們使用50個工作進程來執行,那么每個工作進程會處理其中的6個tasks。Storm會盡量均勻的工作分配給所有的worker。

Resources:

2.1.9 Configuration

  Storm里面有一堆參數可以配置來調整Nimbus, Supervisor以及正在運行的topology的行為,一些配置是系統級別的,一些配置是topology級別的。default.yaml里面有所有的默認配置。你可以通過定義個storm.yaml在你的classpath里來覆蓋這些默認配置。並且你也可以在代碼里面設置一些topology相關的配置信息(使用StormSubmitter)。

2.2  構建Topology

1. 實現的目標:

我們將設計一個topology,來實現對一個句子里面的單詞出現的頻率進行統計。這是一個簡單的例子,目的是讓大家對於topology快速上手,有一個初步的理解。

2. 設計Topology結構:

在開始開發Storm項目的第一步,就是要設計topology。確定好你的數據處理邏輯,我們今天將的這個簡單的例子,topology也非常簡單。整個topology如下:

整個topology分為三個部分:

KestrelSpout:數據源,負責發送sentence

Splitsentence:負責將sentence切分

Wordcount:負責對單詞的頻率進行累加

3. 設計數據流

這個topology從kestrel queue讀取句子,並把句子划分成單詞,然后匯總每個單詞出現的次數,一個tuple負責讀取句子,每一個tuple分別對應計算每一個單詞出現的次數,大概樣子如下所示:

4. 代碼實現:

1) 構建maven環境:

為了開發storm topology, 你需要把storm相關的jar包添加到classpath里面去: 要么手動添加所有相關的jar包, 要么使用maven來管理所有的依賴。storm的jar包發布在Clojars(一個maven庫), 如果你使用maven的話,把下面的配置添加在你項目的pom.xml里面。

<repository>

    <id>clojars.org</id>

    <url>http://clojars.org/repo</url>

</repository>

<dependency>

     <groupId>storm</groupId>

    <artifactId>storm</artifactId>

     <version>0.5.3</version>

     <scope>test</scope>

</dependency>

2) 定義topology:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(1, new KestrelSpout(“kestrel.backtype.com”,22133,

                                                                                    ”sentence_queue”,

                                                                                    new StringScheme()));

builder.setBolt(2, new SplitSentence(), 10)

       .shuffleGrouping(1);

builder.setBolt(3, new WordCount(), 20)

       .fieldsGrouping(2, new Fields(“word”));

這種topology的spout從句子隊列中讀取句子,在kestrel.backtype.com位於一個Kestrel的服務器端口22133。

Spout用setSpout方法插入一個獨特的id到topology。 Topology中的每個節點必須給予一個id,id是由其他bolts用於訂閱該節點的輸出流。 KestrelSpout在topology中id為1。

setBolt是用於在Topology中插入bolts。 在topology中定義的第個bolts 是切割句子的bolts。 這個bolts 將句子流轉成成單詞流。

讓我們看看SplitSentence實施:

public class SplitSentence implements IBasicBolt{

        public void prepare(Map conf, TopologyContext context) {

         }

       public void execute(Tuple tuple, BasicOutputCollector collector) {

              String sentence = tuple.getString(0);

               for(String word: sentence.split(“ ”)) {

                        collector.emit(new Values(word));

                  }

             }

         public void cleanup() {

        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

                declarer.declare(new Fields(“word”));

             }

 }

  關鍵的方法 execute方法。 正如你可以看到,它將句子拆分成單詞,並發出每個單詞作為一個新的元組。 另一個重要的方法是declareOutputFields其中宣布bolts輸出元組的架構。 在這里宣布,它發出一個域為word的元組

setBolt的最后一個參數是你想為bolts的並行量。 SplitSentence bolts 是10個並發,這將導致在storm集群中有十個線程並行執行。 你所要做的的是增加bolts的並行量在遇到topology的瓶頸時。

  setBolt方法返回一個對象,用來定義bolts的輸入。 例如SplitSentence螺栓訂閱組件“1”使用隨機分組的輸出流。 “1”是指已經定義KestrelSpout 我將解釋在某一時刻的隨機分組的一部分。 到目前為止,最要緊的是SplitSentence bolts會消耗KestrelSpout發出的每一個元組。

下面在讓我們看看wordcount的實現:

public class WordCount implements IBasicBolt {

        private Map<String, Integer> _counts = new HashMap<String, Integer>();

        public void prepare(Map conf, TopologyContext context) {

        }

       public void execute(Tuple tuple, BasicOutputCollector collector) {

              String word = tuple.getString(0);

              int count;

              if(_counts.containsKey(word)) {

                     count = _counts.get(word);

              } else {

                     count = 0;

     }

              count++;

     _counts.put(word, count);

              collector.emit(new Values(word, count));

       }

       public void cleanup() {

       }

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

              declarer.declare(new Fields(“word”, “count”));

       }

}

  SplitSentence對於句子里面的每個單詞發射一個新的tuple, WordCount在內存里面維護一個單詞->次數的mapping, WordCount每收到一個單詞, 它就更新內存里面的統計狀態。

5. 運行Topology

storm的運行有兩種模式: 本地模式和分布式模式.

1) 本地模式:

  storm用一個進程里面的線程來模擬所有的spout和bolt. 本地模式對開發和測試來說比較有用。 你運行storm-starter里面的topology的時候它們就是以本地模式運行的, 你可以看到topology里面的每一個組件在發射什么消息。

2) 分布式模式:

  storm由一堆機器組成。當你提交topology給master的時候, 你同時也把topology的代碼提交了。master負責分發你的代碼並且負責給你的topolgoy分配工作進程。如果一個工作進程掛掉了, master節點會把認為重新分配到其它節點。

3) 下面是以本地模式運行的代碼:

         Config conf = new Config();

         conf.setDebug(true);

         conf.setNumWorkers(2);

         LocalCluster cluster = new LocalCluster();

         cluster.submitTopology(“test”, conf, builder.createTopology());

          Utils.sleep(10000);

          cluster.killTopology(“test”);

          cluster.shutdown();

首先, 這個代碼定義通過定義一個LocalCluster對象來定義一個進程內的集群。提交topology給這個虛擬的集群和提交topology給分布式集群是一樣的。通過調用submitTopology方法來提交topology, 它接受三個參數:要運行的topology的名字,一個配置對象以及要運行的topology本身。

topology的名字是用來唯一區別一個topology的,這樣你然后可以用這個名字來殺死這個topology的。前面已經說過了, 你必須顯式的殺掉一個topology, 否則它會一直運行。

Conf對象可以配置很多東西, 下面兩個是最常見的:

 TOPOLOGY_WORKERS(setNumWorkers) 定義你希望集群分配多少個工作進程給你來執行這個topology. topology里面的每個組件會被需要線程來執行。每個組件到底用多少個線程是通過setBolt和setSpout來指定的。這些線程都運行在工作進程里面. 每一個工作進程包含一些節點的一些工作線程。比如, 如果你指定300個線程,60個進程, 那么每個工作進程里面要執行6個線程, 而這6個線程可能屬於不同的組件(Spout, Bolt)。你可以通過調整每個組件的並行度以及這些線程所在的進程數量來調整topology的性能。

 TOPOLOGY_DEBUG(setDebug), 當它被設置成true的話, storm會記錄下每個組件所發射的每條消息。這在本地環境調試topology很有用, 但是在線上這么做的話會影響性能的。

結論:

  本章從storm的基本對象的定義,到廣泛的介紹了storm的開發環境,從一個簡單的例子講解了topology的構建和定義。希望大家可以從本章的內容對storm有一個基本的理解和概念,並且已經可以構建一個簡單的topology!!

 

 

參考官網: storm.apache.org/documentation/Concepts.html   

參考:http://blog.linezing.com/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM