Storm入門3-集群搭建


 


【storm集群的搭建以及將開發好的拓撲提交到集群上運行的方法】

  在上一篇文章中,我們的拓撲直接運行,並在程序開始時候自動啟動一個本地"集群"來運行拓撲。LocalCluster這種方式一般用於本地的開發和調試。而在實際的生產環境中,我們一般是有搭建好的storm集群,我們開發完topology后會提交到集群中的主節點nimbus,由nimbus來向supervisor分發代碼,並啟動woker來運行topology.下面我們將在本地搭建一個storm運行環境,並將開發好的WordCountTopology提交到本地"集群"上來運行。

  本地"集群"搭建

  1. 安裝好JDK並配置系統環境變量;http://www.cnblogs.com/jonyo/p/5656386.html
  2. 安裝zookepeer並配置系統環境變量;【待加。。。。】
  3. 下載storm壓縮包,解壓放到 /usr/local/下,在.zshrc中配置相關的環境變量
     export STORM_HOME=/usr/local/storm            
     export PATH=$PATH:$STORM_HOME/bin
  4. 進入到storm/conf下,修改storm.yaml配置文件,在末尾添加
    storm.zookeeper.servers:
        - "localhost"
    storm.local.dir: "/usr/local/storm/data"
    storm.zookeeper.port: 2181
    nimbus.host: "localhost"
    ui.port: 8080
    supervisor.slots.ports:
        - 6700
        - 6701
        - 6702
        - 6703
    

    這個文件主要是對storm使用的zookepeer地址,集群的nimbus,supervisor等信息進行了相關配置

  5. 步驟1~4完成以后,按照下面就可以啟動集群了
    (1)啟動zookepeer: 在/usr/local/zookeeper/bin下執行sh zkServer.sh start   接着執行sh zkServer.sh status查看啟動狀態,出現Mode: standalone即表示成功
    (2)啟動nimbus進程:在/usr/local/storm/bin下執行 sh storm nimbus,會看到nimbus啟動信息

    (3)啟動supervisor進程,由於都是在同一台機器上啟動的,需要重新開啟一個控制台。在/usr/local/storm/bin下執行 sh storm supervisor

    (4)為了便於查看集群的信息,將ui也啟動,在/usr/local/storm/bin下執行 sh storm ui, 然后通過localhost:8080可以查看集群狀態

    UI界面,可以對拓撲進行管理,查看集群狀態等等;由於沒有任何拓撲提交,可以看到Topology Summary是空的,沒有信息

    (5)將之前的WordCountTopology.java文件修改如下
     1 //        //建立本地集群,利用LocalCluster,storm在程序啟動時會在本地自動建立一個集群,不需要用戶自己再搭建,方便本地開發和debug
     2 //        LocalCluster cluster = new LocalCluster();
     3 //
     4 //        //創建拓撲實例,並提交到本地集群進行運行
     5 //        cluster.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology());
     6 
     7         try {
     8             //submitTopology方法負責發送Topology到集群,[新增的代碼]
     9             StormSubmitter.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology());
    10         } catch (Exception e) {
    11             e.printStackTrace();
    12         }

    修改完成后,利用maven打成jar包:mvn compile       mvn clean package

    (6)將打出的jar包(拓撲)提交到集群上執行
    storm jar /Users/.../workspace/storm-learning/target/storm-learning-1.0-SNAPSHOT.jar  wordCount.WordCountTopology  wordCountTest
    在此打開localhost:8080會發現WordCountTopology在執行,並顯示出執行的狀態和相關信息




    拓撲提交到nimbus之后都發生了什么事情?即通過storm jar xxxxx.jar  xxx.Main agrs這個命令之后操作是怎么樣的?

    storm jar會執行.../storm/bin目錄下的storm.py腳本文件里定義的def jar(jarfile, klass, *args),在這個腳本中,通過一系列的設置,讓后續main方法中的調用的StormSubmitter.submitTopology(name, conf, builder.createTopology())找到jar所在的地址,然后通過soket傳輸,將Jar文件上傳到nimbus,nimbus在接收到jar文件后,存放到數據目錄的inbox目錄(inbox是在用戶設置的storm.local.dir變量所指定的目錄的nimbus下)
    inbox用於存放提交的Jar文件,每個Jar被重命名為stormjar+32位的uuid, 比如的的執行提交后是stormjar-a22e9169-e377-45db-9a07-a97357ccfbef.jar

    stormdist存放的是啟動topology后生成的文件,每個topology都分配一個唯一的id,ID的規則是“name-計數-時間戳”。啟動后的topology的jar文件名命名為storm.jar ,而它的配置經過java序列化后存放在stormconf.ser文件,而stormcode.ser是將topology本身序列化后存放的文件。這些文件在部署的時候,supervisor會從這個目錄下載這些文件,然后在supervisor本地執行這些代碼。

    進入重點,topology任務的分配過程(zookeeper路徑說明忽略root):
    1.在zookeeper上創建/taskheartbeats/{storm id} 路徑,用於任務的心跳檢測。storm對zookeeper的一個重要應用就是利用zk的臨時節點做存活檢測。task將定時刷新節點的時間戳,然后nimbus會檢測這個時間戳是否超過timeout設置。
    2.從topology中獲取bolts,spouts設置的並行數目以及全局配置的最大並行數,然后產生task id列表,如[1 2 3 4]
    3.在zookeeper上創建/tasks/{strom id}/{task id}路徑,並存儲task信息
    4.開始分配任務(內部稱為assignment), 具體步驟:
     (1)從zk上獲得已有的assignment(新的toplogy當然沒有了)
     (2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個worker的端口。
     (3)將任務均勻地分配給可用的worker,這里有兩種情況:
     (a)task數目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最終是這樣分配
    {1: [host1:port1] 2 : [host2:port1]
             3 : [host1:port1] 4 : [host2:port1]}
    ,可以看到任務平均地分配在兩個worker上。
    (b)如果task數目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先會將woker排序,將不同host間隔排列,保證task不會全部分配到同一個worker上,也就是將worker排列成
    [host1:port1 host2:port1 host1:port2 host2:port2]
    ,然后分配任務為
    {1: host1:port1 , 2 : host2:port2}

    (4)記錄啟動時間
    (5)判斷現有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
    5.啟動topology,所謂啟動,只是將zookeeper上/storms/{storm id}對應的數據里的active設置為true。
    6.nimbus會檢查task的心跳,如果發現task心跳超過超時時間,那么會重新跳到第4步做re-assignment。

    最后的topology任務的分配過程 參考:http://blog.csdn.net/xiaolang85/article/details/38065185


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM