storm從入門到放棄(一),storm介紹


 背景:目前就職於國內最大的IT咨詢公司,恰巧又是畢業季,所在部門招了20多個應屆畢業生,本人要跟部門新人進行為期一個月的大數據入職培訓,特此將整理的文檔分享出來。

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/7274361.html

微信:intsmaze

避免微信回復重復咨詢問題,技術咨詢請博客留言。

Storm是一個開源的分布式實時計算系統,可以簡單、可靠的處理大量的數據流。Storm支持水平擴展,具有高容錯性,保證每個消息都會得到處理。

Storm核心組件

Nimbus負責資源分配和任務調度,Nimbus對任務的分配信息會落到zookeeper上面的目錄下。
 
Supervisor負責去zookeeper上的指定目錄接受nimbus分配的任務,啟動和停止屬於自己管理的worker進程。(它是當前物理機器上的管理者)--通過配置文件設置當前supervisor上啟動多少個worker

Worker運行具體處理組件邏輯的進程。Worker運行的任務類型只有兩種,一種是Spout任務,一種是Bolt任務。

Taskworker中每一個spout/bolt的線程稱為一個task. storm0.8之后,task不再與物理線程對應,不同spout/bolttask可能會共享一個物理線程,該線程稱為executor
 
Storm一些概念
Topologies : 拓撲,也俗稱一個任務。(可以理解為一個storm集群)
Spouts : 拓撲的消息源。
Bolts : 拓撲的處理邏輯單元。(一個Bolt類會在集群里面很多機器上並發執行
Spouts ,Bolts 可以理解為storm中的兩個組件
tuple:消息元組(是在Spouts ,Bolts中傳遞數據的一種封裝的格式
Streams : 流
Stream groupings :流的分組策略
Tasks : 任務處理單元
Executor :工作線程
Workers :工作進程
Configuration : topology的配置
Storm中的Workers
一個topology可能會在一個或者多個worker(工作進程)里面執行;一個進程里面會啟動多個Executor :工作線程。
每個worker是一個物理JVM並且執行整個topology的一部分;在一個物理節點上可以運行一個或多個獨立的JVM 進程。一個Topology可以包含一個或多個worker(並行的跑在不同的物理機上), 所以worker process就是執行一個topology的子集, 並且worker只能對應於一個topology。比如,對於並行度是300的topology來說,如果我們使用50個工作進程來執行,那么每個工作進程會處理其中的6個tasks;
Storm會盡量均勻的工作分配給所有的worker;一個Executor:工作線程里面可以運行多個相同的task實例。
 
Storm中的Tasks
每一個spout和bolt會被當作很多task在整個集群里執行;每一個executor對應到一個線程,在這個線程上運行多個task;stream grouping則是定義怎么從一堆task發射tuple到另外一堆task;可以調用TopologyBuilder類的setSpout和setBolt來設置並行度(也就是有多少個task)
 
Executors (threads) 
在一個worker JVM進程中運行着多個Java線程。一個executor線程可以執行一個或多個tasks。但一般默認每個executor只執行一個task。(開發中也不建議一個executor里面執行多個task.)一個worker可以包含一個或多個executor, 每個component (spout或bolt)至少對應於一個executor, 所以可以說executor執行一個compenent的子集, 同時一個executor只能對應於一個component。
 
Storm中的Stream
  消息流stream是storm里的關鍵抽象;一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分布式的方式並行地創建和處理;通過對stream中tuple序列中每個字段命名來定義stream。
  在默認的情況下,tuple的字段類型可以是:integer,long,short, byte,string,double,float,boolean和byte array;可以自定義類型(只要實現相應的序列化器)。
  數據從一個節點傳到另一個節點,數據是要被序列化的,但在storm中,數據序列化之前,消息必須按照一定的格式傳遞,這個格式就是一個一個的消息元組。消息元組是源源不斷的發送的,這個元組就類似一個list,里面有若干個字段。
 

Storm編程模型

有向無環圖
public class RandomSentenceSpout extends BaseRichSpout {

    public void nextTuple() {
        collector.emit(new Values("+ - * % /"));
        Utils.sleep(50000);
    }
    ......
}
public class SplitSentenceBolt extends BaseBasicBolt {
    
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = (String)input.getValueByField("intsmaze");
        System.out.println(Thread.currentThread().getId()+"    "+sentence);        
    }
    ......
}
public class TwoBolt extends BaseBasicBolt { 
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = (String)input.getValueByField("intsmaze");
        System.out.println(Thread.currentThread().getId()+"    "+sentence);
    }
      ......
}
public class WordCountTopologyMain {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout1", new RandomSentenceSpout(),1);
        builder.setBolt("two", new TwoBolt(),1).shuffleGrouping("spout1");
        builder.setBolt("split1", new SplitSentenceBolt(),2).shuffleGrouping("spout1");

        Config conf = new Config();
        conf.setDebug(false);
        conf.setMaxTaskParallelism(3);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("word-count", conf, builder.createTopology());
        }
    }
}

可以發現spout每隔一段時間間隔發一份數據,這份數據會被兩個bolt同時接收,而不是說這次A bolt接收下次B bolt接收。 同一個bolt業務邏輯如果設置了並行度,他們才會根據分組策略依次接收上游發來的消息。

----------------84     + - * % /  這個是tow bolt接收
----------------78     + - * % /  這個是split1 bolt 中78線程接收的
----------------80     + - * % /  這個是split1 bolt中線程80接收的。
----------------84     + - * % /
----------------78     + - * % /
----------------84     + - * % /  
 
Storm7stream grouping
  Shuffle Grouping: 隨機分組,隨機派發stream里面的tuple,保證每個bolt接收到的tuple數目大致相同。
  Fields Grouping:按字段分組,比如按userid來分組,具有同樣userid的tuple會被分到相同的Bolts里的一個task,而不同的userid則會被分配到不同的bolts里的task。
  All Grouping:廣播發送,對於每一個tuple,所有的bolts都會收到。
  Global Grouping:全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。
  Non Grouping:不分組,這stream grouping個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執行。
  Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)。
  Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。
conf.setNumWorkers(4) 表示設置了4個worker來執行整個topology的所有組件
builder.setBolt("boltA-intsmaze",new BoltA(),  4)  ---->指明 boltA組件的線程數excutors總共有4個
builder.setBolt("boltB-intsmaze",new BoltB(),  4) ---->指明 boltB組件的線程數excutors總共有4個
builder.setSpout("randomSpout-intsmaze",new RandomSpout(),  2) ---->指明randomSpout組件的線程數excutors總共有2個
-----意味着整個topology中執行所有組件的總線程數為4+4+2=10個
----worker數量是4個,有可能會出現這樣的負載情況,worker-1有2個線程,worker-2有2個線程,worker-3有3個線程,worker-4有3個線程
如果指定某個組件的具體task並發實例數
builder.setSpout("randomspout-intsmaze", new RandomWordSpout(), 4).setNumTasks(8);
----意味着對於這個組件的執行線程excutor來說,一個excutor將執行8/4=2個task,默認情況一個線程執行一個task.

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM