看介紹文檔貌似挺好:
https://github.com/alibaba/jstorm
阿里擁有自己的實時計算引擎
-
類似於hadoop 中的MR
-
開源storm響應太慢
-
開源社區的速度完全跟不上Ali的需求
-
降低未來運維成本
-
提供更多技術支持,加快內部業務響應速度
現有Storm無法滿足一些需求
-
現有storm調度太簡單粗暴,無法定制化
-
Storm 任務分配不平衡
-
RPC OOM一直沒有解決
-
監控太簡單
-
對ZK 訪問頻繁
JStorm相比Storm更穩定
-
Nimbus 實現HA:當一台nimbus掛了,自動熱切到備份nimbus
-
原生Storm RPC:Zeromq 使用堆外內存,導致OS 內存不夠,Netty 導致OOM;JStorm底層RPC 采用netty + disruptor保證發送速度和接受速度是匹配的
-
新上線的任務不會沖擊老的任務:新調度從cpu,memory,disk,net 四個角度對任務進行分配,已經分配好的新任務,無需去搶占老任務的cpu,memory,disk和net
-
Supervisor主線
-
Spout/Bolt 的open/prepar
-
所有IO, 序列化,反序列化
-
減少對ZK的訪問量:去掉大量無用的watch;task的心跳時間延長一倍;Task心跳檢測無需全ZK掃描。
JStorm相比Storm調度更強大
-
徹底解決了storm 任務分配不均衡問題
-
從4個維度進行任務分配:CPU、Memory、Disk、Net
-
默認一個task,一個cpu slot。當task消耗更多的cpu時,可以申請更多cpu slot
-
默認一個task,一個memory slot。當task需要更多內存時,可以申請更多內存slot
-
默認task,不申請disk slot。當task 磁盤IO較重時,可以申請disk slot
-
可以強制某個component的task 運行在不同的節點上
-
可以強制topology運行在單獨一個節點上
-
可以自定義任務分配,提前預約任務分配到哪台機器上,哪個端口,多少個cpu slot,多少內存,是否申請磁盤
-
可以預約上一次成功運行時的任務分配,上次task分配了什么資源,這次還是使用這些資源
JStorm相比Storm性能更好
JStorm 0.9.0 性能非常的好,使用netty時單worker 發送最大速度為11萬QPS,使用zeromq時,最大速度為12萬QPS。
-
JStorm 0.9.0 在使用Netty的情況下,比Storm 0.9.0 使用netty情況下,快10%, 並且JStorm netty是穩定的而Storm 的Netty是不穩定的
-
在使用ZeroMQ的情況下, JStorm 0.9.0 比Storm 0.9.0 快30%
性能提升的原因:
-
Zeromq 減少一次內存拷貝
-
增加反序列化線程
-
重寫采樣代碼,大幅減少采樣影響
-
優化ack代碼
-
優化緩沖map性能
-
Java 比clojure更底層
JStorm的其他優化點
-
資源隔離。不同部門,使用不同的組名,每個組有自己的Quato;不同組的資源隔離;采用cgroups 硬隔離
-
Classloader。解決應用的類和Jstorm的類發生沖突,應用的類在自己的類空間中
-
Task 內部異步化。Worker 內部全流水線模式,Spout nextTuple和ack/fail運行在不同線程
具體如何實現,請參考本ID的的博文系列 【jstorm-源碼解析】
概敘 & 應用場景
JStorm 是一個分布式實時計算引擎。
JStorm 是一個類似Hadoop MapReduce的系統, 用戶按照指定的接口實現一個任務,然后將這個任務遞交給JStorm系統,Jstorm將這個任務跑起來,並且按7 * 24小時運行起來,一旦中間一個worker 發生意外故障, 調度器立即分配一個新的worker替換這個失效的worker。
因此,從應用的角度,JStorm 應用是一種遵守某種編程規范的分布式應用。從系統角度, JStorm一套類似MapReduce的調度系統。 從數據的角度, 是一套基於流水線的消息處理機制。
實時計算現在是大數據領域中最火爆的一個方向,因為人們對數據的要求越來越高,實時性要求也越來越快,傳統的Hadoop Map Reduce,逐漸滿足不了需求,因此在這個領域需求不斷。
優點
在Storm和JStorm出現以前,市面上出現很多實時計算引擎,但自storm和JStorm出現后,基本上可以說一統江湖: 究其優點:
- 開發非常迅速, 接口簡單,容易上手,只要遵守Topology,Spout, Bolt的編程規范即可開發出一個擴展性極好的應用,底層rpc,worker之間冗余,數據分流之類的動作完全不用考慮。
- 擴展性極好, 當一級處理單元速度,直接配置一下並發數,即可線性擴展性能
- 健壯, 當worker失效或機器出現故障時, 自動分配新的worker替換失效worker
- 數據准確性, 可以采用Acker機制,保證數據不丟失。 如果對精度有更多一步要求,采用事務機制,保證數據准確。
應用場景
JStorm處理數據的方式是基於消息的流水線處理, 因此特別適合無狀態計算,也就是計算單元的依賴的數據全部在接受的消息中可以找到, 並且最好一個數據流不依賴另外一個數據流。
因此,常常用於
- 日志分析,從日志中分析出特定的數據,並將分析的結果存入外部存儲器如數據庫。目前,主流日志分析技術就使用JStorm或Storm
- 管道系統, 將一個數據從一個系統傳輸到另外一個系統, 比如將數據庫同步到Hadoop
- 消息轉化器, 將接受到的消息按照某種格式進行轉化,存儲到另外一個系統如消息中間件
- 統計分析器, 從日志或消息中,提煉出某個字段,然后做count或sum計算,最后將統計值存入外部存儲器。中間處理過程可能更復雜。
如何安裝
安裝步驟
- 從Downloads下載relase包
- 搭建Zookeeper集群
- 安裝Python 2.6
- 安裝Java
- 安裝zeromq
- 安裝Jzmq
- 配置$JSTORM_HOME/conf/storm.yaml
- 搭建web ui
- 啟動JStorm集群
搭建Zookeeper集群
本處不細描敘Zookeeper安裝步驟
- 安裝步驟麻煩參考 ”zookeeper 安裝步驟“
- zookeeper配置麻煩參考 “zookeeper 配置介紹”
搭建JStorm集群
安裝python 2.6
- 如果當前系統提供python,可以不用安裝python
- 自己可以參考 python
- 也可以使用https://github.com/utahta/pythonbrew 來安裝python > curl -kLhttp://xrl.us/pythonbrewinstall | bash
-s $HOME/.pythonbrew/etc/bashrc && source $HOME/.pythonbrew/etc/bashrc
pythonbrew install 2.6.7
pythonbrew switch 2.6.7
安裝java
注意,如果當前系統是64位系統,則需要下載java 64位,如果是32為系統,則下載32位java
安裝zeromq(如果不使用zeromq, 可以不安裝zeromq)
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./configure
make
sudo make install
sudo ldconfig
如果沒有root權限,或當前用戶無sudo權限時,執行 “ ./configure --prefix=/home/xxxxx” 替換 “./configure”, 其中/home/xxxx 為安裝目標目錄
安裝jzmq(如果不使用zeromq, 可以不安裝jzmq)
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install
如果沒有root權限,或當前用戶無sudo權限時,執行 “ ./configure --prefix=/home/xxxx --with-zeromq=/home/xxxx” 替換 “./configure”, 其中/home/xxxx 為安裝目標目錄
安裝JStorm
假設以jstorm-0.9.3.zip為例
unzip jstorm-0.9.3.zip
vi ~/.bashrc
export JSTORM_HOME=/XXXXX/XXXX
export PATH=$PATH:$JSTORM_HOME/bin
配置$JSTORM_HOME/conf/storm.yaml
配置項:
- storm.zookeeper.servers: 表示zookeeper 的地址,
- nimbus.host: 表示nimbus的地址
- storm.zookeeper.root: 表示jstorm在zookeeper中的根目錄,當多個JStorm共享一個ZOOKEEPER時,需要設置該選項,默認即為“/jstorm”
- storm.local.dir: 表示jstorm臨時數據存放目錄,需要保證jstorm程序對該目錄有寫權限
- java.library.path: zeromq 和java zeromq library的安裝目錄,默認"/usr/local/lib:/opt/local/lib:/usr/lib"
- supervisor.slots.ports: 表示supervisor 提供的端口slot列表,注意不要和其他端口發生沖突,默認是68xx,而storm的是67xx
- supervisor.disk.slot: 表示提供數據目錄,當一台機器有多塊磁盤時,可以提供磁盤讀寫slot,方便有重IO操作的應用。
- topology.enable.classloader: false, 默認關閉classloader,如果應用的jar與jstorm的依賴的jar發生沖突,比如應用使用thrift9,但jstorm使用thrift7時,就需要打開classloader
- nimbus.groupfile.path: 如果需要做資源隔離,比如數據倉庫使用多少資源,技術部使用多少資源,無線部門使用多少資源時,就需要打開分組功能, 設置一個配置文件的絕對路徑,改配置文件如源碼中group_file.ini所示
- storm.local.dir: jstorm使用的本地臨時目錄,如果一台機器同時運行storm和jstorm的話, 則不要共用一個目錄,必須將二者分離開
在提交jar的節點上執行:
安裝JStorm web ui
必須使用tomcat 7.0 或以上版本, 注意不要忘記拷貝 ~/.jstorm/storm.yaml
web ui 可以和nimbus不在同一個節點
mkdir ~/.jstorm
cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm
下載tomcat 7.x (以apache-tomcat-7.0.37 為例)
tar -xzf apache-tomcat-7.0.37.tar.gz
cd apache-tomcat-7.0.37
cd webapps
cp $JSTORM_HOME/jstorm-ui-0.9.3.war ./
mv ROOT ROOT.old
ln -s jstorm-ui-0.9.3 ROOT
cd ../bin
./startup.sh
啟動JStorm
- 在nimbus 節點上執行 “nohup jstorm nimbus &”, 查看$JSTORM_HOME/logs/nimbus.log檢查有無錯誤
- 在supervisor節點上執行 “nohup jstorm supervisor &”, 查看$JSTORM_HOME/logs/supervisor.log檢查有無錯誤
基本概念
流
在JStorm中有對於流stream的抽象,流是一個不間斷的無界的連續tuple,注意JStorm在建模事件流時,把流中的事件抽象為tuple即元組,后面會解釋JStorm中如何使用tuple。
Spout/Bolt
JStorm認為每個stream都有一個stream源,也就是原始元組的源頭,所以它將這個源頭抽象為spout,spout可能是連接消息中間件(如MetaQ, Kafka, TBNotify等),並不斷發出消息,也可能是從某個隊列中不斷讀取隊列元素並裝配為tuple發射。
有了源頭即spout也就是有了stream,那么該如何處理stream內的tuple呢,同樣的思想JStorm將tuple的中間處理過程抽象為Bolt,bolt可以消費任意數量的輸入流,只要將流方向導向該bolt,同時它也可以發送新的流給其他bolt使用,這樣一來,只要打開特定的spout(管口)再將spout中流出的tuple導向特定的bolt,又bolt對導入的流做處理后再導向其他bolt或者目的地。
我們可以認為spout就是一個一個的水龍頭,並且每個水龍頭里流出的水是不同的,我們想拿到哪種水就擰開哪個水龍頭,然后使用管道將水龍頭的水導向到一個水處理器(bolt),水處理器處理后再使用管道導向另一個處理器或者存入容器中。
Topology
對應上文的介紹,我們可以很容易的理解這幅圖,這是一張有向無環圖,JStorm將這個圖抽象為Topology即拓撲(的確,拓撲結構是有向無環的),拓撲是Jstorm中最高層次的一個抽象概念,它可以被提交到Jstorm集群執行,一個拓撲就是一個數據流轉換圖,圖中每個節點是一個spout或者bolt,圖中的邊表示bolt訂閱了哪些流,當spout或者bolt發送元組到流時,它就發送元組到每個訂閱了該流的bolt(這就意味着不需要我們手工拉管道,只要預先訂閱,spout就會將流發到適當bolt上)。 插個位置說下Jstorm的topology實現,為了做實時計算,我們需要設計一個拓撲圖,並實現其中的Bolt處理細節,JStorm中拓撲定義僅僅是一些Thrift結構體,這樣一來我們就可以使用其他語言來創建和提交拓撲。
Tuple
JStorm將流中數據抽象為tuple,一個tuple就是一個值列表value list,list中的每個value都有一個name,並且該value可以是基本類型,字符類型,字節數組等,當然也可以是其他可序列化的類型。拓撲的每個節點都要說明它所發射出的元組的字段的name,其他節點只需要訂閱該name就可以接收處理。
Worker/Task
Worker和Task是JStorm中任務的執行單元, 一個worker表示一個進程,一個task表示一個線程, 一個worker可以運行多個task。
資源slot
在JStorm中,資源類型分為4種, CPU, Memory,Disk, Port, 不再局限於Storm的port。 即一個supervisor可以提供多少個CPU slot,多少個Memory slot, 多少個Disk slot, 多少個Port slot
- 一個worker就消耗一個Port slot, 默認一個task會消耗一個CPU slot和一個Memory slot
- 當task執行任務較重時,可以申請更多的CPU slot,
- 當task需要更多內存時,可以申請更多的內存slot,
- 當task 磁盤讀寫較多時,可以申請磁盤slot,則該磁盤slot給該task獨享。
應用例子
最簡單的JStorm例子分為4個步驟:
生成Topology
IRichSpout
IRichSpout 為最簡單的Spout接口
其中注意:
- spout對象必須是繼承Serializable, 因此要求spout內所有數據結構必須是可序列化的
- spout可以有構造函數,但構造函數只執行一次,是在提交任務時,創建spout對象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內容將攜帶到每一個task內(因為提交任務時將spout序列化到文件中去,在worker起來時再將spout從文件中反序列化出來)。
- open是當task起來后執行的初始化動作
- close是當task被shutdown后執行的動作
- activate 是當task被激活時,觸發的動作
- deactivate 是task被deactive時,觸發的動作
- nextTuple 是spout實現核心, nextuple完成自己的邏輯,即每一次取消息后,用collector 將消息emit出去。
- ack, 當spout收到一條ack消息時,觸發的動作,詳情可以參考 ack機制
- fail, 當spout收到一條fail消息時,觸發的動作,詳情可以參考 ack機制
- declareOutputFields, 定義spout發送數據,每個字段的含義
- getComponentConfiguration 獲取本spout的component 配置
Bolt
其中注意:
- bolt對象必須是繼承Serializable, 因此要求spout內所有數據結構必須是可序列化的
- bolt可以有構造函數,但構造函數只執行一次,是在提交任務時,創建bolt對象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內容將攜帶到每一個task內(因為提交任務時將bolt序列化到文件中去,在worker起來時再將bolt從文件中反序列化出來)。
- prepare是當task起來后執行的初始化動作
- cleanup是當task被shutdown后執行的動作
- execute是bolt實現核心, 完成自己的邏輯,即接受每一次取消息后,處理完,有可能用collector 將產生的新消息emit出去。 ** 在executor中,當程序處理一條消息時,需要執行collector.ack, 詳情可以參考 ack機制 ** 在executor中,當程序無法處理一條消息時或出錯時,需要執行collector.fail ,詳情可以參考 ack機制
- declareOutputFields, 定義bolt發送數據,每個字段的含義
- getComponentConfiguration 獲取本bolt的component 配置
編譯
在Maven中配置
如果找不到jstorm-client和jstorm-client-extension包,可以自己下載jstorm源碼進行編譯,請參考 源碼編譯
打包時,需要將所有依賴打入到一個包中
提交jar
JStorm vs Storm vs flume vs S4 選型
jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter
- xxxx.jar 為打包后的jar
- com.alibaba.xxxx.xx 為入口類,即提交任務的類
- parameter即為提交參數
JStorm VS Storm 請參看 JStorm 0.9.0 介紹.pptx
JStorm 比Storm更穩定,更強大,更快, storm上跑的程序,一行代碼不變可以運行在jstorm上。
Flume 是一個成熟的系統,主要focus在管道上,將數據從一個數據源傳輸到另外一個數據源, 系統提供大量現成的插件做管道作用。當然也可以做一些計算和分析,但插件的開發沒有Jstorm便捷和迅速。
S4 就是一個半成品,健壯性還可以,但數據准確性較糟糕,無法保證數據不丟失,這個特性讓S4 大受限制,也導致了S4開源很多年,但發展一直不是很迅速。
AKKA 是一個actor模型,也是一個不錯的系統,在這個actor模型基本上,你想做任何事情都沒有問題,但問題是你需要做更多的工作,topology怎么生成,怎么序列化。數據怎么流(隨機,還是group by)等等。
Spark 是一個輕量的內存MR, 更偏重批量數據處理
0.9.0 性能測試
JStorm 0.9.0 性能非常的好, 使用netty時單worker 發送最大速度為11萬QPS, 使用zeromq時,最大速度為12萬QPS.結論
- JStorm 0.9.0 在使用Netty的情況下,比Storm 0.9.0 使用netty情況下,快10%, 並且JStorm netty是穩定的而Storm 的Netty是不穩定的
- 在使用ZeroMQ的情況下, JStorm 0.9.0 比Storm 0.9.0 快30%
原因
- Zeromq 減少一次內存拷貝
- 增加反序列化線程
- 重寫采樣代碼,大幅減少采樣影響
- 優化ack代碼
- 優化緩沖map性能
- Java 比clojure更底層
測試
測試樣例
測試樣例為https://github.com/longdafeng/storm-examples
測試環境
5 台 16核, 98G 物理機
測試結果
- JStorm with netty, Spout 發送QPS 為 11萬
- storm with netty, Spout 應用發送QPS 為 10萬 (截圖為上層應用的QPS, 沒有包括發送到ack的QPS, Spout發送QPS 正好為上層應用QPS的2倍)
- JStorm with zeromq, Spout 發送QPS 為12萬
- Storm with zeromq, Spout 發送QPS 為9萬(截圖為上層應用的QPS, 沒有包括發送到ack的QPS, Spout發送QPS 正好為上層應用QPS的2倍)
資源硬隔離
cgroups是control groups的縮寫,是Linux內核提供的一種可以限制, 記錄, 隔離進程組(process groups)所使用的物理資源(如:cpu,memory,IO 等等)的機制。
在Jstorm中,我們使用cgroup進行cpu硬件資源的管理。使用前,需要做如下檢查和配置。
- 檢查/etc/passwd 文件中當前用戶的uid和gid, 假設當前用戶是admin, 則看/etc/passwd文件中admin的uid和gid是多少
-
cgroup功能在當前系統的內核版本是否支持
檢查/etc/cgconfig.conf是否存在。如果不存在, 請“yum install libcgroup”,如果存在,設置cpu子系統的掛載目錄位置, 以及修改該配置文件中相應的uid/gid為啟動jstorm用戶的uid/gid, 本例子中以500為例, 注意是根據第一步來進行設置的。
- 然后啟動cgroup服務
Note: cgconfig.conf只能在root模式下修改。
或者直接執行命令
這是一個cgconfig.conf配置文件例子。比如jstorm的啟動用戶為admin,admin在當前 系統的uid/gid為500(查看/etc/passwd 可以查看到uid和gid),那么相對應cpu子系統的jstorm目錄uid/gid也需要設置為相同的值。 以便jstorm有相應權限可以在這個目錄下為jstorm的每個需要進行資源隔離的進程創建對應 的目錄和進行相關設置。
. 在jstorm配置文件中打開cgroup, 配置storm.yaml
supervisor.enable.cgroup: true
常見問題
性能問題
參考性能優化
資源不夠
當報告 ”No supervisor resource is enough for component “, 則意味着資源不夠 如果是僅僅是測試環境,可以將supervisor的cpu 和memory slot設置大,
在jstorm中, 一個task默認會消耗一個cpu slot和一個memory slot, 而一台機器上默認的cpu slot是(cpu 核數 -1), memory slot數(物理內存大小 * 75%/1g), 如果一個worker上運行task比較多時,需要將memory slot size設小(默認是1G), 比如512M, memory.slot.per.size: 535298048
序列化問題
所有spout,bolt,configuration, 發送的消息(Tuple)都必須實現Serializable, 否則就會出現序列化錯誤.
如果是spout或bolt的成員變量沒有實現Serializable時,但又必須使用時, 可以對該變量申明時,增加transient 修飾符, 然后在open或prepare時,進行實例化
Log4j 沖突
0.9.0 開始,JStorm依舊使用Log4J,但storm使用Logbak,因此應用程序如果有依賴log4j-over-slf4j.jar, 則需要exclude 所有log4j-over-slf4j.jar依賴,下個版本將自定義classloader,就不用擔心這個問題。
類沖突
如果應用程序使用和JStorm相同的jar 但版本不一樣時,建議打開classloader, 修改配置文件
topology.enable.classloader: true
或者
ConfigExtension.setEnableTopologyClassLoader(conf, true);
JStorm默認是關掉classloader,因此JStorm會強制使用JStorm依賴的jar
提交任務后,等待幾分鍾后,web ui始終沒有顯示對應的task
有3種情況:
用戶程序初始化太慢
如果有用戶程序的日志輸出,則表明是用戶的初始化太慢或者出錯,查看日志即可。 另外對於MetaQ 1.x的應用程序,Spout會recover ~/.meta_recover/目錄下文件,可以直接刪除這些消費失敗的問題,加速啟動。
通常是用戶jar沖突或初始化發生問題
打開supervisor 日志,找出啟動worker命令,單獨執行,然后檢查是否有問題。類似下圖:
檢查是不是storm和jstorm使用相同的本地目錄
檢查配置項 ”storm.local.dir“, 是不是storm和jstorm使用相同的本地目錄,如果相同,則將二者分開
提示端口被綁定
有2種情況:
多個worker搶占一個端口
假設是6800 端口被占, 可以執行命令 “ps -ef|grep 6800” 檢查是否有多個進程, 如果有多個進程,則手動殺死他們
系統打開太多的connection
Linux對外連接端口數限制,TCP client對外發起連接數達到28000左右時,就開始大量拋異常,需要
# echo "10000 65535" > /proc/sys/net/ipv4/ip_local_port_range