阿里的STORM——JSTORM


看介紹文檔貌似挺好:
https://github.com/alibaba/jstorm

 

Storm 和JStorm

阿里擁有自己的實時計算引擎

  1. 類似於hadoop 中的MR

  2. 開源storm響應太慢

  3. 開源社區的速度完全跟不上Ali的需求

  4. 降低未來運維成本

  5. 提供更多技術支持,加快內部業務響應速度

現有Storm無法滿足一些需求

  1. 現有storm調度太簡單粗暴,無法定制化

  2. Storm 任務分配不平衡

  3. RPC OOM一直沒有解決

  4. 監控太簡單

  5. 對ZK 訪問頻繁

JStorm相比Storm更穩定

  1. Nimbus 實現HA:當一台nimbus掛了,自動熱切到備份nimbus

  2. 原生Storm RPC:Zeromq 使用堆外內存,導致OS 內存不夠,Netty 導致OOM;JStorm底層RPC 采用netty + disruptor保證發送速度和接受速度是匹配的

  3. 新上線的任務不會沖擊老的任務:新調度從cpu,memory,disk,net 四個角度對任務進行分配,已經分配好的新任務,無需去搶占老任務的cpu,memory,disk和net

  4. Supervisor主線

  5. Spout/Bolt 的open/prepar

  6. 所有IO, 序列化,反序列化

  7. 減少對ZK的訪問量:去掉大量無用的watch;task的心跳時間延長一倍;Task心跳檢測無需全ZK掃描。

JStorm相比Storm調度更強大

  1. 徹底解決了storm 任務分配不均衡問題

  2. 從4個維度進行任務分配:CPU、Memory、Disk、Net

  3. 默認一個task,一個cpu slot。當task消耗更多的cpu時,可以申請更多cpu slot

  4. 默認一個task,一個memory slot。當task需要更多內存時,可以申請更多內存slot

  5. 默認task,不申請disk slot。當task 磁盤IO較重時,可以申請disk slot

  6. 可以強制某個component的task 運行在不同的節點上

  7. 可以強制topology運行在單獨一個節點上

  8. 可以自定義任務分配,提前預約任務分配到哪台機器上,哪個端口,多少個cpu slot,多少內存,是否申請磁盤

  9. 可以預約上一次成功運行時的任務分配,上次task分配了什么資源,這次還是使用這些資源

JStorm相比Storm性能更好

JStorm 0.9.0 性能非常的好,使用netty時單worker 發送最大速度為11萬QPS,使用zeromq時,最大速度為12萬QPS。

  • JStorm 0.9.0 在使用Netty的情況下,比Storm 0.9.0 使用netty情況下,快10%, 並且JStorm netty是穩定的而Storm 的Netty是不穩定的

  • 在使用ZeroMQ的情況下, JStorm 0.9.0 比Storm 0.9.0 快30%

性能提升的原因:

  1. Zeromq 減少一次內存拷貝

  2. 增加反序列化線程

  3. 重寫采樣代碼,大幅減少采樣影響

  4. 優化ack代碼

  5. 優化緩沖map性能

  6. Java 比clojure更底層

JStorm的其他優化點

  1. 資源隔離。不同部門,使用不同的組名,每個組有自己的Quato;不同組的資源隔離;采用cgroups 硬隔離

  2. Classloader。解決應用的類和Jstorm的類發生沖突,應用的類在自己的類空間中

  3. Task 內部異步化。Worker 內部全流水線模式,Spout nextTuple和ack/fail運行在不同線程

 具體如何實現,請參考本ID的的博文系列  【jstorm-源碼解析】


 

概敘 & 應用場景

JStorm 是一個分布式實時計算引擎。

JStorm 是一個類似Hadoop MapReduce的系統, 用戶按照指定的接口實現一個任務,然后將這個任務遞交給JStorm系統,Jstorm將這個任務跑起來,並且按7 * 24小時運行起來,一旦中間一個worker 發生意外故障, 調度器立即分配一個新的worker替換這個失效的worker。

因此,從應用的角度,JStorm 應用是一種遵守某種編程規范的分布式應用。從系統角度, JStorm一套類似MapReduce的調度系統。 從數據的角度, 是一套基於流水線的消息處理機制。

實時計算現在是大數據領域中最火爆的一個方向,因為人們對數據的要求越來越高,實時性要求也越來越快,傳統的Hadoop Map Reduce,逐漸滿足不了需求,因此在這個領域需求不斷。

優點

在Storm和JStorm出現以前,市面上出現很多實時計算引擎,但自storm和JStorm出現后,基本上可以說一統江湖: 究其優點:

  • 開發非常迅速, 接口簡單,容易上手,只要遵守Topology,Spout, Bolt的編程規范即可開發出一個擴展性極好的應用,底層rpc,worker之間冗余,數據分流之類的動作完全不用考慮。
  • 擴展性極好, 當一級處理單元速度,直接配置一下並發數,即可線性擴展性能
  • 健壯, 當worker失效或機器出現故障時, 自動分配新的worker替換失效worker
  • 數據准確性, 可以采用Acker機制,保證數據不丟失。 如果對精度有更多一步要求,采用事務機制,保證數據准確。

應用場景

JStorm處理數據的方式是基於消息的流水線處理, 因此特別適合無狀態計算,也就是計算單元的依賴的數據全部在接受的消息中可以找到, 並且最好一個數據流不依賴另外一個數據流。

因此,常常用於

  • 日志分析,從日志中分析出特定的數據,並將分析的結果存入外部存儲器如數據庫。目前,主流日志分析技術就使用JStorm或Storm
  • 管道系統, 將一個數據從一個系統傳輸到另外一個系統, 比如將數據庫同步到Hadoop
  • 消息轉化器, 將接受到的消息按照某種格式進行轉化,存儲到另外一個系統如消息中間件
  • 統計分析器, 從日志或消息中,提煉出某個字段,然后做count或sum計算,最后將統計值存入外部存儲器。中間處理過程可能更復雜。
 

如何安裝

安裝步驟

  • Downloads下載relase包
  • 搭建Zookeeper集群
  • 安裝Python 2.6
  • 安裝Java
  • 安裝zeromq
  • 安裝Jzmq
  • 配置$JSTORM_HOME/conf/storm.yaml
  • 搭建web ui
  • 啟動JStorm集群

搭建Zookeeper集群

本處不細描敘Zookeeper安裝步驟

搭建JStorm集群

安裝python 2.6

-s $HOME/.pythonbrew/etc/bashrc && source $HOME/.pythonbrew/etc/bashrc

pythonbrew install 2.6.7

pythonbrew switch 2.6.7

安裝java

注意,如果當前系統是64位系統,則需要下載java 64位,如果是32為系統,則下載32位java

安裝zeromq(如果不使用zeromq, 可以不安裝zeromq)

wget http://download.zeromq.org/zeromq-2.1.7.tar.gz

tar zxf zeromq-2.1.7.tar.gz

cd zeromq-2.1.7

./configure

make

sudo make install

sudo ldconfig

如果沒有root權限,或當前用戶無sudo權限時,執行 “ ./configure --prefix=/home/xxxxx” 替換 “./configure”, 其中/home/xxxx 為安裝目標目錄

安裝jzmq(如果不使用zeromq, 可以不安裝jzmq)

git clone git://github.com/nathanmarz/jzmq.git

cd jzmq

./autogen.sh

./configure

make

make install

如果沒有root權限,或當前用戶無sudo權限時,執行 “ ./configure --prefix=/home/xxxx --with-zeromq=/home/xxxx” 替換 “./configure”, 其中/home/xxxx 為安裝目標目錄

安裝JStorm

假設以jstorm-0.9.3.zip為例

unzip jstorm-0.9.3.zip

vi ~/.bashrc

export JSTORM_HOME=/XXXXX/XXXX

export PATH=$PATH:$JSTORM_HOME/bin

配置$JSTORM_HOME/conf/storm.yaml

配置項:

  • storm.zookeeper.servers: 表示zookeeper 的地址,
  • nimbus.host: 表示nimbus的地址
  • storm.zookeeper.root: 表示jstorm在zookeeper中的根目錄,當多個JStorm共享一個ZOOKEEPER時,需要設置該選項,默認即為“/jstorm”
  • storm.local.dir: 表示jstorm臨時數據存放目錄,需要保證jstorm程序對該目錄有寫權限
  • java.library.path: zeromq 和java zeromq library的安裝目錄,默認"/usr/local/lib:/opt/local/lib:/usr/lib"
  • supervisor.slots.ports: 表示supervisor 提供的端口slot列表,注意不要和其他端口發生沖突,默認是68xx,而storm的是67xx
  • supervisor.disk.slot: 表示提供數據目錄,當一台機器有多塊磁盤時,可以提供磁盤讀寫slot,方便有重IO操作的應用。
  • topology.enable.classloader: false, 默認關閉classloader,如果應用的jar與jstorm的依賴的jar發生沖突,比如應用使用thrift9,但jstorm使用thrift7時,就需要打開classloader
  • nimbus.groupfile.path: 如果需要做資源隔離,比如數據倉庫使用多少資源,技術部使用多少資源,無線部門使用多少資源時,就需要打開分組功能, 設置一個配置文件的絕對路徑,改配置文件如源碼中group_file.ini所示
  • storm.local.dir: jstorm使用的本地臨時目錄,如果一台機器同時運行storm和jstorm的話, 則不要共用一個目錄,必須將二者分離開

在提交jar的節點上執行:

#mkdir ~/.jstorm
#cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm

安裝JStorm web ui

必須使用tomcat 7.0 或以上版本, 注意不要忘記拷貝 ~/.jstorm/storm.yaml

web ui 可以和nimbus不在同一個節點

mkdir ~/.jstorm

cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm

下載tomcat 7.x (以apache-tomcat-7.0.37 為例)

tar -xzf apache-tomcat-7.0.37.tar.gz

cd apache-tomcat-7.0.37

cd webapps

cp $JSTORM_HOME/jstorm-ui-0.9.3.war ./

mv ROOT ROOT.old

ln -s jstorm-ui-0.9.3 ROOT

cd ../bin

./startup.sh

啟動JStorm

  • 在nimbus 節點上執行 “nohup jstorm nimbus &”, 查看$JSTORM_HOME/logs/nimbus.log檢查有無錯誤
  • 在supervisor節點上執行 “nohup jstorm supervisor &”, 查看$JSTORM_HOME/logs/supervisor.log檢查有無錯誤 

基本概念

在JStorm中有對於流stream的抽象,流是一個不間斷的無界的連續tuple,注意JStorm在建模事件流時,把流中的事件抽象為tuple即元組,后面會解釋JStorm中如何使用tuple。

STREAM

Spout/Bolt

JStorm認為每個stream都有一個stream源,也就是原始元組的源頭,所以它將這個源頭抽象為spout,spout可能是連接消息中間件(如MetaQ, Kafka, TBNotify等),並不斷發出消息,也可能是從某個隊列中不斷讀取隊列元素並裝配為tuple發射。

有了源頭即spout也就是有了stream,那么該如何處理stream內的tuple呢,同樣的思想JStorm將tuple的中間處理過程抽象為Bolt,bolt可以消費任意數量的輸入流,只要將流方向導向該bolt,同時它也可以發送新的流給其他bolt使用,這樣一來,只要打開特定的spout(管口)再將spout中流出的tuple導向特定的bolt,又bolt對導入的流做處理后再導向其他bolt或者目的地。

我們可以認為spout就是一個一個的水龍頭,並且每個水龍頭里流出的水是不同的,我們想拿到哪種水就擰開哪個水龍頭,然后使用管道將水龍頭的水導向到一個水處理器(bolt),水處理器處理后再使用管道導向另一個處理器或者存入容器中。

spoutbolt

Topology

topology

對應上文的介紹,我們可以很容易的理解這幅圖,這是一張有向無環圖,JStorm將這個圖抽象為Topology即拓撲(的確,拓撲結構是有向無環的),拓撲是Jstorm中最高層次的一個抽象概念,它可以被提交到Jstorm集群執行,一個拓撲就是一個數據流轉換圖,圖中每個節點是一個spout或者bolt,圖中的邊表示bolt訂閱了哪些流,當spout或者bolt發送元組到流時,它就發送元組到每個訂閱了該流的bolt(這就意味着不需要我們手工拉管道,只要預先訂閱,spout就會將流發到適當bolt上)。 插個位置說下Jstorm的topology實現,為了做實時計算,我們需要設計一個拓撲圖,並實現其中的Bolt處理細節,JStorm中拓撲定義僅僅是一些Thrift結構體,這樣一來我們就可以使用其他語言來創建和提交拓撲。

Tuple

JStorm將流中數據抽象為tuple,一個tuple就是一個值列表value list,list中的每個value都有一個name,並且該value可以是基本類型,字符類型,字節數組等,當然也可以是其他可序列化的類型。拓撲的每個節點都要說明它所發射出的元組的字段的name,其他節點只需要訂閱該name就可以接收處理。

Worker/Task

Worker和Task是JStorm中任務的執行單元, 一個worker表示一個進程,一個task表示一個線程, 一個worker可以運行多個task。

資源slot

在JStorm中,資源類型分為4種, CPU, Memory,Disk, Port, 不再局限於Storm的port。 即一個supervisor可以提供多少個CPU slot,多少個Memory slot, 多少個Disk slot, 多少個Port slot

  • 一個worker就消耗一個Port slot, 默認一個task會消耗一個CPU slot和一個Memory slot
  • 當task執行任務較重時,可以申請更多的CPU slot,
  • 當task需要更多內存時,可以申請更多的內存slot,
  • 當task 磁盤讀寫較多時,可以申請磁盤slot,則該磁盤slot給該task獨享。 

應用例子

最簡單的JStorm例子分為4個步驟:

生成Topology

Map conf new HashMp();//topology所有自定義的配置均放入這個MapTopologyBuilder builder new TopologyBuilder();//創建topology的生成器int spoutParal = get("spout.parallel", 1);//獲取spout的並發設置SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME
,
 
new SequenceSpout(), spoutParal);//創建Spout, 其中new SequenceSpout() 為真正spout對象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 為spout的名字,注意名字中不要含有空格int boltParal = get("bolt.parallel", 1);//獲取bolt的並發設置BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal)
.shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);//創建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 為bolt名字,TotalCount 為bolt對象,boltParal為bolt並發數,//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), //表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的數據,並且以shuffle方式,//即每個spout隨機輪詢發送tuple到下一級bolt中int ackerParal = get("acker.parallel", 1);Config.setNumAckers(conf, ackerParal);//設置表示acker的並發數int workerNum = get("worker.num", 10);
conf .put(Config.TOPOLOGY_WORKERS, workerNum);//表示整個topology將使用幾個worker
 
conf .put(Config.STORM_CLUSTER_MODE, "distributed");//設置topolog模式為分布式,這樣topology就可以放到JStorm集群上運行StormSubmitter.submitTopology(streamName, conf,
                builder .createTopology());//提交topology

IRichSpout

IRichSpout 為最簡單的Spout接口

IRichSpout
    @ Override public void open(Map conf, TopologyContext context, SpoutOutputCollector
collector) {
    }
 
    @ Override public void
close() {
    }
 
    @ Override public void
activate() {
    }
 
    @ Override public void
deactivate() {
    }
 
    @ Override public void
nextTuple() {
    }
 
    @ Override public void ack(Object
msgId) {
    }
 
    @ Override public void fail(Object
msgId) {
    }
 
    @ Override public void declareOutputFields(OutputFieldsDeclarer
declarer) {
    }
 
    @ Override public Map<String, Object>
getComponentConfiguration() {
 
return null;
    }

其中注意:

  • spout對象必須是繼承Serializable, 因此要求spout內所有數據結構必須是可序列化的
  • spout可以有構造函數,但構造函數只執行一次,是在提交任務時,創建spout對象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內容將攜帶到每一個task內(因為提交任務時將spout序列化到文件中去,在worker起來時再將spout從文件中反序列化出來)。
  • open是當task起來后執行的初始化動作
  • close是當task被shutdown后執行的動作
  • activate 是當task被激活時,觸發的動作
  • deactivate 是task被deactive時,觸發的動作
  • nextTuple 是spout實現核心, nextuple完成自己的邏輯,即每一次取消息后,用collector 將消息emit出去。
  • ack, 當spout收到一條ack消息時,觸發的動作,詳情可以參考 ack機制
  • fail, 當spout收到一條fail消息時,觸發的動作,詳情可以參考 ack機制
  • declareOutputFields, 定義spout發送數據,每個字段的含義
  • getComponentConfiguration 獲取本spout的component 配置

Bolt

IRichBolt
    @ Override public void prepare(Map stormConf, TopologyContext context, OutputCollector
collector) {
    }
 
    @ Override public void execute(Tuple
input) {
    }
 
    @ Override public void
cleanup() {
    }
 
    @ Override public void declareOutputFields(OutputFieldsDeclarer
declarer) {
    }
 
    @ Override public Map<String, Object>
getComponentConfiguration() {
 
return null;
    } 
}

其中注意:

  • bolt對象必須是繼承Serializable, 因此要求spout內所有數據結構必須是可序列化的
  • bolt可以有構造函數,但構造函數只執行一次,是在提交任務時,創建bolt對象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內容將攜帶到每一個task內(因為提交任務時將bolt序列化到文件中去,在worker起來時再將bolt從文件中反序列化出來)。
  • prepare是當task起來后執行的初始化動作
  • cleanup是當task被shutdown后執行的動作
  • execute是bolt實現核心, 完成自己的邏輯,即接受每一次取消息后,處理完,有可能用collector 將產生的新消息emit出去。 ** 在executor中,當程序處理一條消息時,需要執行collector.ack, 詳情可以參考 ack機制 ** 在executor中,當程序無法處理一條消息時或出錯時,需要執行collector.fail ,詳情可以參考 ack機制
  • declareOutputFields, 定義bolt發送數據,每個字段的含義
  • getComponentConfiguration 獲取本bolt的component 配置

編譯

在Maven中配置

               < dependency>
            < groupId>com.alibaba.jstorm</groupId>
            < artifactId>jstorm-client</artifactId>
            < version>0.9.3.1</version>
            < scope>provided</scope>
        </dependency>
 
          <dependency>
            < groupId>com.alibaba.jstorm</groupId>
            < artifactId>jstorm-client-extension</artifactId>
            < version>0.9.3.1</version>
            < scope>provided</scope>
        </ dependency>

如果找不到jstorm-client和jstorm-client-extension包,可以自己下載jstorm源碼進行編譯,請參考 源碼編譯

打包時,需要將所有依賴打入到一個包中

< build>
        <plugins>
 
            <plugin>
                < artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        < descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            < mainClass>storm.starter.SequenceTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        < id>make-assembly</id>
                        < phase>package</phase>
                        <goals>
                            < goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                < groupId>org.apache.maven.plugins</groupId>
                < artifactId>maven-compiler-plugin</artifactId>
                < configuration>
                    < source>1.6</source>
                    < target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </ build>

提交jar

JStorm vs Storm vs flume vs S4 選型

jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter

  • xxxx.jar 為打包后的jar
  • com.alibaba.xxxx.xx 為入口類,即提交任務的類
  • parameter即為提交參數
 

JStorm VS Storm 請參看 JStorm 0.9.0 介紹.pptx

JStorm 比Storm更穩定,更強大,更快, storm上跑的程序,一行代碼不變可以運行在jstorm上。

Flume 是一個成熟的系統,主要focus在管道上,將數據從一個數據源傳輸到另外一個數據源, 系統提供大量現成的插件做管道作用。當然也可以做一些計算和分析,但插件的開發沒有Jstorm便捷和迅速。

S4 就是一個半成品,健壯性還可以,但數據准確性較糟糕,無法保證數據不丟失,這個特性讓S4 大受限制,也導致了S4開源很多年,但發展一直不是很迅速。

AKKA 是一個actor模型,也是一個不錯的系統,在這個actor模型基本上,你想做任何事情都沒有問題,但問題是你需要做更多的工作,topology怎么生成,怎么序列化。數據怎么流(隨機,還是group by)等等。

Spark 是一個輕量的內存MR, 更偏重批量數據處理


 

0.9.0 性能測試

 JStorm 0.9.0 性能非常的好, 使用netty時單worker 發送最大速度為11萬QPS, 使用zeromq時,最大速度為12萬QPS.

結論

  • JStorm 0.9.0 在使用Netty的情況下,比Storm 0.9.0 使用netty情況下,快10%, 並且JStorm netty是穩定的而Storm 的Netty是不穩定的
  • 在使用ZeroMQ的情況下, JStorm 0.9.0 比Storm 0.9.0 快30%

原因

  • Zeromq 減少一次內存拷貝
  • 增加反序列化線程
  • 重寫采樣代碼,大幅減少采樣影響
  • 優化ack代碼
  • 優化緩沖map性能
  • Java 比clojure更底層

測試

測試樣例

測試樣例為https://github.com/longdafeng/storm-examples

測試環境

5 台 16核, 98G 物理機

uname -a :
Linux dwcache1 2.6.32-220.23.1.tb735.el5.x86_64 #1 SMP Tue Aug 14 16:03:04 CST 2012 x86_64 x86_64 x86_64 GNU/Linux

測試結果

  • JStorm with netty, Spout 發送QPS 為 11萬

jstorm.0.9.0.netty

  • storm with netty, Spout 應用發送QPS 為 10萬 (截圖為上層應用的QPS, 沒有包括發送到ack的QPS, Spout發送QPS 正好為上層應用QPS的2倍)

storm.0.9.0.netty

  • JStorm with zeromq, Spout 發送QPS 為12萬

jstorm.0.9.0.zmq

  • Storm with zeromq, Spout 發送QPS 為9萬(截圖為上層應用的QPS, 沒有包括發送到ack的QPS, Spout發送QPS 正好為上層應用QPS的2倍)

storm.0.9.0.zmq


資源硬隔離

cgroups是control groups的縮寫,是Linux內核提供的一種可以限制, 記錄, 隔離進程組(process groups)所使用的物理資源(如:cpu,memory,IO 等等)的機制。

在Jstorm中,我們使用cgroup進行cpu硬件資源的管理。使用前,需要做如下檢查和配置。

  • 檢查/etc/passwd 文件中當前用戶的uid和gid, 假設當前用戶是admin, 則看/etc/passwd文件中admin的uid和gid是多少
  • cgroup功能在當前系統的內核版本是否支持

    檢查/etc/cgconfig.conf是否存在。如果不存在, 請“yum install libcgroup”,如果存在,設置cpu子系統的掛載目錄位置, 以及修改該配置文件中相應的uid/gid為啟動jstorm用戶的uid/gid, 本例子中以500為例, 注意是根據第一步來進行設置的。

  mount {   
      cpu = /cgroup/cpu;
  }
 
 
  group jstorm {
       perm {
               task {
                      uid = 500;
                      gid = 500;
               }
               admin {
                      uid = 500;
                      gid = 500;
               }
       }
       cpu {
       }
  }
  • 然后啟動cgroup服務
service cgconfig restart
chkconfig --level 23456 cgconfig on

Note: cgconfig.conf只能在root模式下修改。

或者直接執行命令

這是一個cgconfig.conf配置文件例子。比如jstorm的啟動用戶為admin,admin在當前 系統的uid/gid為500(查看/etc/passwd 可以查看到uid和gid),那么相對應cpu子系統的jstorm目錄uid/gid也需要設置為相同的值。 以便jstorm有相應權限可以在這個目錄下為jstorm的每個需要進行資源隔離的進程創建對應 的目錄和進行相關設置。

mkdir /cgroup/cpu
mount  -t cgroup -o cpu none /cgroup/cpu
mkdir /cgroup/cpu/jstorm
chown admin:admin /cgroup/cpu/jstorm

. 在jstorm配置文件中打開cgroup, 配置storm.yaml

   supervisor.enable.cgroup: true

常見問題

 

性能問題

參考性能優化

資源不夠

當報告 ”No supervisor resource is enough for component “, 則意味着資源不夠 如果是僅僅是測試環境,可以將supervisor的cpu 和memory slot設置大,

在jstorm中, 一個task默認會消耗一個cpu slot和一個memory slot, 而一台機器上默認的cpu slot是(cpu 核數 -1), memory slot數(物理內存大小 * 75%/1g), 如果一個worker上運行task比較多時,需要將memory slot size設小(默認是1G), 比如512M, memory.slot.per.size: 535298048

#if it is null, then it will be detect by system
supervisor.cpu.slot.num: null
 
#if it is null, then it will be detect by system
supervisor.mem.slot.num: null
 
# support disk slot
# if it is null, it will use $(storm.local.dir)/worker_shared_data
supervisor.disk.slot: null

序列化問題

所有spout,bolt,configuration, 發送的消息(Tuple)都必須實現Serializable, 否則就會出現序列化錯誤.

如果是spout或bolt的成員變量沒有實現Serializable時,但又必須使用時, 可以對該變量申明時,增加transient 修飾符, 然后在open或prepare時,進行實例化

seriliazble_error

Log4j 沖突

0.9.0 開始,JStorm依舊使用Log4J,但storm使用Logbak,因此應用程序如果有依賴log4j-over-slf4j.jar, 則需要exclude 所有log4j-over-slf4j.jar依賴,下個版本將自定義classloader,就不用擔心這個問題。

SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError.
SLF4J: See also
http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
Exception in thread "main" java.lang.ExceptionInInitializerError
        at org.apache.log4j.Logger.getLogger(Logger.java:39)
        at org.apache.log4j.Logger.getLogger(Logger.java:43)
        at com.alibaba.jstorm.daemon.worker.Worker.<clinit>(Worker.java:32)
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also
http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
        at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
        ... 3 more
Could not find the main class: com.alibaba.jstorm.daemon.worker.Worker.  Program will exit.

類沖突

如果應用程序使用和JStorm相同的jar 但版本不一樣時,建議打開classloader, 修改配置文件

topology.enable.classloader: true

或者

ConfigExtension.setEnableTopologyClassLoader(conf, true);

JStorm默認是關掉classloader,因此JStorm會強制使用JStorm依賴的jar

提交任務后,等待幾分鍾后,web ui始終沒有顯示對應的task

有3種情況:

用戶程序初始化太慢

如果有用戶程序的日志輸出,則表明是用戶的初始化太慢或者出錯,查看日志即可。 另外對於MetaQ 1.x的應用程序,Spout會recover ~/.meta_recover/目錄下文件,可以直接刪除這些消費失敗的問題,加速啟動。

通常是用戶jar沖突或初始化發生問題

打開supervisor 日志,找出啟動worker命令,單獨執行,然后檢查是否有問題。類似下圖:

fail_start_worker

檢查是不是storm和jstorm使用相同的本地目錄

檢查配置項 ”storm.local.dir“, 是不是storm和jstorm使用相同的本地目錄,如果相同,則將二者分開

提示端口被綁定

有2種情況:

多個worker搶占一個端口

假設是6800 端口被占, 可以執行命令 “ps -ef|grep 6800” 檢查是否有多個進程, 如果有多個進程,則手動殺死他們

系統打開太多的connection

Linux對外連接端口數限制,TCP client對外發起連接數達到28000左右時,就開始大量拋異常,需要

# echo "10000 65535" > /proc/sys/net/ipv4/ip_local_port_range


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM