Storm 流式計算框架


1. 簡介

  • 是一個分布式, 高容錯的 實時計算框架

  • Storm進程常駐內存, 永久運行

  • Storm數據不經過磁盤, 在內存中流轉, 通過網絡直接發送給下游

  • 流式處理(streaming) 與 批處理(batch)

    • 批處理(batch): MapReduce

    • 微批處理(MircroBatch): Spark (性能上近似 Streaming, 但是還是有所不及)

    • 流(streaming): Storm, Flink(其實Flink也可以做批處理)

    • Storm MapReduce
      流式處理 批處理
      毫秒級 分鍾級
      DAG模型 Map+Reduce模型
      常駐運行 反復啟停
  1. Storm 計算模型

    img

    • Topology - DAG 有向無環圖

      • 例圖: (Spout: 噴嘴)

      img

      • 對Storm實時計算邏輯進行封裝

      • 由一系列通過數據流相互關聯的Spout、Bolt鎖組成的拓撲結構

      • 生命周期: 此拓撲只要啟動就會一直在集群中運行, 直到手動將其kill, 否則不會終止

        (與MapReduce中的Job的區別: MR中的Job在計算機執行完成就會終止)

    • Tuple - 元組

      • Stream中最小的數據組成單元(熟悉python的一定不會陌生)
    • Stream - 數據流

      • 從Spout 中源源不斷傳遞數據給Bolt、以及上一個Bolt傳遞數據給下一個Bolt, 所形成的的數據通道為Stream
      • 在聲明Stream時需要給其指定一個Id (默認為Default)
        • 實際開發場景中, 多使用單一數據流, 此時不需要單獨指定StreamId
    • Spout - 數據源

      • 拓撲中數據流的來源。一般會從指定外部的數據源讀取元組 (Tuple) 發送到拓撲(Topology) 中。

      • 一個Spout可以發送多個數據流(Stream)

        • 可以先通過OutputFieldsDeclarer中的declare方法聲明定義的不同數據流, 發送數據時通過SpoutOutputCollector 中的 emit 方法指定數據流 Id (streamId) 參數將數據發送出去
      • Spout 中最核心的方法是 nextTuple, 該方法會被Storm線程不斷調用、主動從數據源拉取數據, 再通過emit 方法將數據生成元組 (Tuple) 發送給之后的 Bolt 計算

    • Bolt - 對數據流進行處理的組件

      • 拓撲中數據處理均由Bolt完成。對於簡單的任務或者數據流轉換, 單個Bolt可以簡單實現; 更加復雜場景往往需要多個Bolt分多個步驟完成

      • 一個Bolt可以發送多個數據流(Stream)

        • 可先通過OutputFieldsDeclarer中的declare方法聲明定義的不同數據流, 發送數據時通過收集器(Collector)中的emit方法指定數據流Id(streamId) 參數將數據發送出去
      • Bolt 中最核心的方法是execute方法, 該方法負責接收到一個元組 (Tuple) 數據 以及 真正實現核心的業務邏輯

    • 簡單的WorldCount實例

      • WordCountSpout
      package com.ronnie.storm.wordCount;
      
      import backtype.storm.spout.SpoutOutputCollector;
      import backtype.storm.task.TopologyContext;
      import backtype.storm.topology.OutputFieldsDeclarer;
      import backtype.storm.topology.base.BaseRichSpout;
      import backtype.storm.tuple.Fields;
      import backtype.storm.tuple.Values;
      
      import java.util.Map;
      import java.util.Random;
      
      
      public class WordCountSpout extends BaseRichSpout {
      
          private Random random = new Random();
      
          SpoutOutputCollector collector;
      
          String[] lines = {
                  "Well done Gaben well fucking done",
                  "What is going wrong with you Bro",
                  "You are so fucking retard",
                  "What the hell is it",
                  "hadoop spark storm flink",
                  "mysql oracle memcache redis mongodb"
          };
      
          @Override
          public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
              this.collector = collector;
          }
      
          /**
           *  1. storm 會一直(死循環)調用此方法
           *  2. 每次調用此方法, 往下游發輸出
           *
           *  while(flag){
           *      nextTuple();
           *  }
           */
          @Override
          public void nextTuple() {
              int index = random.nextInt(lines.length);
      
              String line = lines[index];
      
              System.out.println("line: " + line);
      
              collector.emit(new Values(line));
      
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("liner"));
          }
      }
      
      • WordCountSplit
      package com.ronnie.storm.wordCount;
      
      import backtype.storm.task.OutputCollector;
      import backtype.storm.task.TopologyContext;
      import backtype.storm.topology.OutputFieldsDeclarer;
      import backtype.storm.topology.base.BaseRichBolt;
      import backtype.storm.tuple.Fields;
      import backtype.storm.tuple.Tuple;
      import backtype.storm.tuple.Values;
      
      import java.util.Map;
      
      public class WordCountSplit extends BaseRichBolt {
      
          // 提升作用域
          OutputCollector collector;
      
          @Override
          public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
              System.err.println(this + "=============================");
              this.collector = collector;
          }
      
          @Override
          public void execute(Tuple input) {
              // 從域中獲取數據, 要與之前Spout中 declareOutputFields 的域名稱一致
              String line = input.getStringByField("liner");
      
              // 根據什么分離
              String[] words = line.split(" ");
      
              for (String word: words){
      
                  // Value是一個ArrayList, 其中存的對象要與后面聲明的域中屬性相對應
                  collector.emit(new Values(word,"ronnie"));
              }
          }
      
          /**
           *  Fields中 的名稱 與 前面 value 中的屬性相應
           * @param declarer
           */
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("word", "name"));
          }
      }
      
      • WordCount

        package com.ronnie.storm.wordCount;
        
        import backtype.storm.task.OutputCollector;
        import backtype.storm.task.TopologyContext;
        import backtype.storm.topology.OutputFieldsDeclarer;
        import backtype.storm.topology.base.BaseRichBolt;
        import backtype.storm.tuple.Tuple;
        
        import java.util.HashMap;
        import java.util.Map;
        
        public class WordCount extends BaseRichBolt {
        
            Map<String, Integer> result = new HashMap<>();
        
            /**
             *  初始化任務
             * @param map
             * @param topologyContext
             * @param outputCollector
             */
            @Override
            public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        
            }
        
            /**
             *  最核心方法
             *  上游傳tuple數據給它, 並調用此方法
             * @param input
             */
            @Override
            public void execute(Tuple input) {
                String word = input.getString(0);
                Integer integer = result.get(word);
        
                if (null == integer){
                    integer = 1;
                } else {
                    integer += 1;
                }
                result.put(word, integer);
        
                System.err.println(word + " : " + integer);
            }
        
            /**
             *  聲明輸出的域類型
             * @param outputFieldsDeclarer
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        
            }
        }
        
      • WordCountTopology

      package com.ronnie.storm.wordCount;
      
      import backtype.storm.Config;
      import backtype.storm.LocalCluster;
      import backtype.storm.StormSubmitter;
      import backtype.storm.generated.AlreadyAliveException;
      import backtype.storm.generated.AuthorizationException;
      import backtype.storm.generated.InvalidTopologyException;
      import backtype.storm.generated.StormTopology;
      import backtype.storm.topology.TopologyBuilder;
      import backtype.storm.tuple.Fields;
      
      public class WordCountTopology {
          public static void main(String[] args) {
              TopologyBuilder topologyBuilder = new TopologyBuilder();
      
              topologyBuilder.setSpout("wcSpout", new WordCountSpout());
              // setBolt 的第三個參數為並行量 setNumTasks 修改 任務數量為 4
              topologyBuilder.setBolt("wcSplit", new WordCountSplit(), 2).setNumTasks(4).shuffleGrouping("wcSpout");
      
              topologyBuilder.setBolt("wcCount", new WordCount(), 5).fieldsGrouping("wcSplit", new Fields("word"));
      
              StormTopology topology = topologyBuilder.createTopology();
      
      
      
              Config config = new Config();
      
              // 修改配置文件中的worker數量為3
              config.setNumWorkers(3);
      
              // 只要參數存在
              if (args.length > 0){
                  try {
                      StormSubmitter.submitTopology(args[0],config,topology);
                  } catch (AlreadyAliveException e) {
                      e.printStackTrace();
                  } catch (InvalidTopologyException e) {
                      e.printStackTrace();
                  } catch (AuthorizationException e) {
                      e.printStackTrace();
                  }
              } else {
                  // 不存在就執行本地任務
                  LocalCluster localCluster = new LocalCluster();
      
                  localCluster.submitTopology("wordCount", config, topology);
              }
      
          }
      }
      
      
    • 最后可將任務打成 jar 包傳送到linux系統上(已經部署好storm集群), 再通過命令行執行任務

      [root@node01 storm-0.10.0] bin/storm jar /opt/ronnie/wc.jar com.ronnie.storm.wordCount.WordCountTopology wc
      
      # 在storm目錄下 bin/storm jar jar文件目錄 包結構.任務類 任務參數
      
  2. Storm 架構設計

    img

    • Nimbus

      • 資源調度
      • 任務分配
      • 接收jar包
    • Supervisor

      • 接收Nimbus分配的任務

      • 啟動、停止自己管理的worker進程 (當前supervisor上的work數量可通過配置文件設定)

    • Worker

      • 運行具體處理運算組件的進程 (每個Worker對應執行一個Topology 的子集)

      • worker 任務類型:

        • spout 任務
        • bolt 任務
      • 啟動 executor

        • executor是 worker JVM 進程中的一個java線程, 一般默認每個executor負責執行一個task任務
    • Zookeeper

    • 與Hadoop架構對比

      Hadoop Storm
      主節點 ResourceManager Nimbus
      從節點 NodeManager Supervisor
      應用程序 Job Topology
      工作進程 Child Worker
      計算模型 Map/Reduce Spout/Bolt
  3. Storm 任務提交流程

    1571316194962

  4. Storm 本地目錄樹

    1571316290955

    1571316319867

  5. Storm DRPC

    • DRPC (Distributed RPC)

      • 分布式遠程過程調用

      • DRPC 是通過一個 DRPC 服務端(DRPC server)來實現分布式RPC功能的。

      • DRPC Server 負責接收 RPC 請求, 並將該請求發送到Storm中運行的Topology, 等待接收 Topology 發送的處理結果, 並將該結果返回給發送請求的客戶端。

      • DRPC設計目的:

        • 為了充分利用Storm的計算能力實現高密度的並行實時計算。
          • (Storm 接收若干個數據流輸入, 數據在Topology 當中運行完成, 然后通過DRPC將結果進行輸出。)
      • 客戶端通過向 DRPC 服務器發送執行函數的名稱以及該函數的參數來獲取處理結果。

        • 實現該函數的拓撲使用一個DRPCSpout 從 DRPC 服務器中接收一盒函數調用流。 DRPC 服務器 會為每個函數調用都標記一個唯一的id。
        • 隨后拓撲會執行函數來計算結果, 並在拓撲的最后使用一個名為 ReturnResults 的 Bolt 連接到 DRPC服務器, 根據函數調用的結果返回。

        img

        1571321154730

  6. Storm 容錯機制

    • 集群節點宕機

      • Nimbus服務器

        • 單點故障的話后續版本可以通過將nimbus.host: 改為 nimbus.seeds: ["node01", "node02"] 來設置備份節點解決
      • 非Nimbus服務器

        • 故障時, 該節點上所有Task任務都會超時, Nimbus會將這些Task重新分配到其他服務器上運行
    • 進程掛了

      • Worker
        • 掛掉時, Supervisor 會重新啟動這個進程。
        • 如果啟動過程中仍然一直失敗, 並且無法向Nimbus發送心跳, Nimbus會將該Worker重新分配到其他服務器上
      • Supervisor
        • 無狀態
          • 所有的狀態信息都存放在Zookeeper中管理
        • 快速失敗
          • 每當遇到任何異常情況, 都會自動毀滅
      • Nimbus
        • 無狀態
        • 快速失敗
    • 消息的完整性

      • 從Spout中發出的Tuple, 以及基於他所產生的Tuple
      • 由這些消息構成了一顆tuple樹
      • 當這顆tuple樹發送完成, 並且樹當中每一條消息都被正確地處理, 就表明spout發送的消息被完整地處理過了, 即該消息具有完整性。(Completation 有興趣的可以去Flink-client 看看 源碼中 CompletationFuture的使用)
    • 消息完整性的實現機制

      • Acker

        • Storm的拓撲中特殊的一些任務
        • 負責跟蹤每個Spout發出的Tuple的DAG (有向五環圖)
  7. Storm 並發機制

    • 基本組件

      • Worker - 進程

        • 一個Topology會包含一盒或多個Worker (每個Worker進程只能從屬於一個特定的Topology)
        • 這些Worker進程會並行跑在集群中的不同服務器上, 即一個Topology其實是由並行運行在Storm集群中多台服務器上的進程所組成
      • Executor - 線程

        • Executor是由Worker進程中生成的一個線程
        • 每個Worker進程中會運行拓撲當中的一個或多個Executor線程
        • 一個Executor線程中可以執行一個或多個Task任務, 但是這些Task任務都是對應着同一個組件(Spout、Bolt)
      • Task

        • 實際執行數據處理的最小單元
        • 每個task即為一個Spout或者一個Bolt
        • Task數量在整個Topology聲明周期中保持不變, Executor數量key變化或手動調整
        • 默認情況下, Task數量和Executor是相同的, 即每個Executor線程中默認運行一個Task任務
      • 設置參數

        • Worker進程數

          • Config.setNumWorkers(int workers)
        • Executor線程數

          • TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint)
          • TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)
          • parallelism_hint(並行量) 即為 executor 線程數
        • Task數量

          • ComponentConfigurationDeclarer.setNumTasks(Number val)
    • Rebalance - 重平衡

      • 動態調整Topology拓撲的Worker進程數量、以及Executor線程數量
      • 兩種調整方式: 通過Storm UI && 通過Storm CLI
  8. Storm 通信機制

    • Worker進程間的數據通信

      • ZMQ

        • ZeroMQ 開源的消息傳遞框架, 並不是消息隊列(MessageQueue)
      • Netty

        • Nettty 是基於NIO(Not Blocked Input Output)的網絡框架(是對NIO包的一種封裝, 因為原生API不是很好用),更加高效。

        • Storm 0.9版本之后使用Netty是因為ZMQ的license和Storm的license不兼容。

    • Worker內部的數據通信

      • Disruptor(干擾者? wtf)
        • 實現了 “隊列” 的功能
          • 可以理解為一種時間監聽或者消息處理機制, 即在隊列當中一邊由生產者放入消息數據, 另一邊消費者並行去除消息數據進行處理
  9. Storm Grouping -- 流數據流分組(數據分發策略)

  • Shuffle Grouping

    • 隨機分組, 隨機派發stream中的tuple, 保證每個bolt task接收到的tuple數目大致相同。
    • 輪詢, 平均分配
  • Fields Grouping

    • 按字段分組, 比如, 按 "user-id" 分組, 那么具有同樣"user-id" 的 tuple 會被分到相同的Bolt中的一個, 而不同的"user-id" 則可能會被分配到不同的task
  • All Grouping

    • 廣播發送, 對於每一個tuple, 所有的bolts都會收到
  • Global Grouping

    • 全局分組, 把tuple分配給task id 最低的task
  • None Grouping

    • 不分組, 這個分組的意思是說stream不關系到底怎樣分組。 目前這種分組和shuffle grouping是一樣的效果。 有一點不同的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個線程里面去執行 (未來Storm如果可能的話會這樣設計)
  • Direct Grouping

    • 指向型分組, 這是一種比較特別的分組方法, 用這種分組意味着消息(tuple) 的發送者指定由消息接收者的哪個task處理這個消息。 只有被聲明為Direct Stream 的消息流可以聲明這種分組方法。
    • 這種消息tuple必須使用 emitDirect 方法來發射。 消息處理者可以通過TopologyContext 來獲取處理它的task的id(OutputCollector.emit 方法也會返回task的id)
  • Local or shuffle Grouping

    • 本地或隨機分組。 如果目標bolt有一個或者多個task與源bolt的task在同一個工作進程中, tuple將會被隨機發送給這些同進程中的tasks。 否則, 和普通的Shuffle Grouping行為一致
  • CustomGrouping

    • 自定義, 相當於mapreduce那里自己去實現一個partition一樣。
  1. Flume + Kafka + Storm 架構設計

1571360380589

1571360413458

  • 采集層: 實現日志收集, 使用負載均衡策略

  • 消息隊列: 作用是解耦及做不同速度系統緩沖

  • 實時處理單元: 用Storm來進行數據處理, 最終數據流入DB中

  • 展示單元: 數據可視化, 使用WEB框架展示


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM