本文導讀:
1、What——JStorm是什么? 1.1 概述 1.2優點 1.3應用場景 1.4JStorm架構 2、Why——為什么啟動JStorm項目?(與storm的區別) 2.1storm的現狀、缺陷 2.2JStorm比Storm更穩定,功能更強大,更快!—— 表現 2.2.1穩定性好的表現 2.2.2調度強大的表現 2.2.3性能更好的表現 2.3性能提升的原因所在 2.4JStorm的其它優點 2.5與flume、S4、AKKA、Spark的比較 3、JStorm的性能優化點 4、JStorm的常見問題 5、TODO List 6、參考鏈接
1、What——JStorm是什么?
概述:
JStorm 是一個分布式實時計算引擎,類似Hadoop MapReduce的系統, 用戶按照規定的編程規范實現一個任務,然后將這個任務遞交給JStorm系統,Jstorm將這個任務跑起來,並且按7 * 24小時運行起來,一旦中間一個worker 發生意外故障, 調度器立即分配一個新的worker替換這個失效的worker。因此,從應用的角度,JStorm 應用是一種遵守某種編程規范的分布式應用。從系統角度,JStorm一套類似MapReduce的調度系統。從數據的角度,是一套基於流水線的消息處理機制。實時計算現在是大數據領域中最火爆的一個方向,因為人們對數據的要求越來越高,實時性要求也越來越快,傳統的 Hadoop Map Reduce,逐漸滿足不了需求,因此在這個領域需求不斷。
在Storm和JStorm出現以前,市面上出現很多實時計算引擎,但自storm和JStorm出現后,基本上可以說一統江湖,
其優點:
- 開發非常迅速: 接口簡單,容易上手,只要遵守Topology,Spout, Bolt的編程規范即可開發出一個擴展性極好的應用,底層rpc,worker之間冗余,數據分流之類的動作完全不用考慮。
- 擴展性極好:當一級處理單元速度,直接配置一下並發數,即可線性擴展性能
- 健壯:當worker失效或機器出現故障時, 自動分配新的worker替換失效worker;調度器Nimbus采用主從備份,支持熱切。
- 數據准確性: 可以采用Acker機制,保證數據不丟失。 如果對精度有更多一步要求,采用事務機制,保證數據准確。
應用場景:
JStorm處理數據的方式是基於消息的流水線處理, 因此特別適合無狀態計算,也就是計算單元的依賴的數據全部在接受的消息中可以找到, 並且最好一個數據流不依賴另外一個數據流。
- 日志分析:從日志中分析出特定的數據,並將分析的結果存入外部存儲器如數據庫。目前,主流日志分析技術就使用JStorm或Storm
- 管道系統: 將一個數據從一個系統傳輸到另外一個系統, 比如將數據庫同步到Hadoop
- 消息轉化器: 將接受到的消息按照某種格式進行轉化,存儲到另外一個系統如消息中間件
- 統計分析器: 從日志或消息中,提煉出某個字段,然后做count或sum計算,最后將統計值存入外部存儲器。中間處理過程可能更復雜。
- ......
JStorm架構:
JStorm 從設計的角度,就是一個典型的調度系統。
在這個系統中,
-
- Nimbus是作為調度器角色
- Supervisor 作為worker的代理角色,負責殺死worker和運行worker
- Worker是task的容器
- Task是真正任務的執行者
- ZK 是整個系統中的協調者
具體參考下圖:
來自阿里的流處理框架:JStorm
關於流處理框架,在先前的文章匯總已經介紹過Strom,今天學習的是來自阿里的的流處理框架JStorm。簡單的概述JStorm就是:JStorm 比Storm更穩定,更強大,更快,Storm上跑的程序,一行代碼不變可以運行在JStorm上。直白的講JStorm是阿里巴巴的團隊基於Storm的二次開發產物,相當於他們的Tengine是基於Nginx開發的一樣。以下為阿里巴巴團隊放棄直接使用Storm選擇自行開發JStorm的原因:
2、Why——為什么啟動JStorm項目?___與storm的區別
阿里擁有自己的實時計算引擎
- 類似於hadoop 中的MR
- 開源storm響應太慢
- 開源社區的速度完全跟不上Ali的需求
- 降低未來運維成本
- 提供更多技術支持,加快內部業務響應速度
現有Storm無法滿足一些需求
- 現有storm調度太簡單粗暴,無法定制化
- Storm 任務分配不平衡
- RPC OOM(OOM - Out of Memory,內存溢出 ——俗稱雪崩問題)一直沒有解決
- 監控太簡單
- 對ZK 訪問頻繁
現狀
在整個阿里巴巴集團,1000+的物理機上運行着Storm,一淘(200+),CDO(200+),支付寶(150+),B2B(50+),阿里媽媽(50+),共享事業群(50+),其他等。
WHY之一句話概述:JStorm比Storm更穩定,功能更強大,更快!(Storm上跑的程序可以一行代碼不變運行在JStorm上)
JStorm相比Storm更穩定
- Nimbus 實現HA:當一台nimbus掛了,自動熱切到備份nimbus ——Nimbus HA
- 原生Storm RPC:Zeromq 使用堆外內存,導致OS 內存不夠,Netty 導致OOM;JStorm底層RPC 采用netty + disruptor,保證發送速度和接受速度是匹配的,徹底解決雪崩問題
- 現有Strom,在添加supervisor或者supervisor shutdown時,會觸發任務rebalance;提交新任務時,當worker數不夠時,觸發其他任務做rebalance。——在JStorm中不會發生,使得數據流更穩定
- 新上線的任務不會沖擊老的任務:新調度從cpu,memory,disk,net 四個角度對任務進行分配;已經分配好的新任務,無需去搶占老任務的cpu,memory,disk和net ——任務之間影響小
- Supervisor主線 ——more catch
- Spout/Bolt 的open/prepare ——more catch
- 所有IO, 序列化,反序列化 ——more catch
- 減少對ZK的訪問量:去掉大量無用的watch;task的心跳時間延長一倍;Task心跳檢測無需全ZK掃描。
JStorm相比Storm調度更強大
- 徹底解決了storm 任務分配不均衡問題
- 從4個維度進行任務分配:CPU、Memory、Disk、Net
- 默認一個task,一個cpu slot。當task消耗更多的cpu時,可以申請更多cpu slot
- 解決新上線的任務去搶占老任務的cpu
- 一淘有些task內部起很多線程,單task消耗太多cpu
- 默認一個task,一個memory slot。當task需要更多內存時,可以申請更多內存slot
- 先海狗項目中,slot task 需要8G內存,而且其他任務2G內存就夠了
- 默認task,不申請disk slot。當task 磁盤IO較重時,可以申請disk slot
- 海狗/實時同步項目中,task有較重的本地磁盤讀寫操作
- 可以強制某個component的task 運行在不同的節點上
- 聚石塔,海狗項目,某些task提供web Service服務,為了端口不沖突,因此必須強制這些task運行在不同節點上
- 可以強制topology運行在單獨一個節點上
- 節省網絡帶寬
- Tlog中大量小topology,為了減少網絡開銷,強制任務分配到一個節點上
- 可以自定義任務分配:提前預約任務分配到哪台機器上,哪個端口,多少個cpu slot,多少內存,是否申請磁盤
- 海狗項目中,部分task期望分配到某些節點上
- 可以預約上一次成功運行時的任務分配:上次task分配了什么資源,這次還是使用這些資源
- CDO很多任務期待重啟后,仍使用老的節點,端口
Task內部異步化
- Worker內部全流水線模式
- Spout nextTuple和ack/fail運行在不同線程
-
- EagleEye中,在nextTuple做sleep和wait操作不會block ack/fail動作
JStorm相比Storm性能更好
JStorm 0.9.0 性能非常的好,使用netty時單worker 發送最大速度為11萬QPS,使用zeromq時,最大速度為12萬QPS。
- JStorm 0.9.0 在使用Netty的情況下,比Storm 0.9.0 使用netty情況下,快10%, 並且JStorm netty是穩定的而Storm 的Netty是不穩定的
- 在使用ZeroMQ的情況下, JStorm 0.9.0 比Storm 0.9.0 快30%
為什么更快、性能提升的原因:
- Zeromq 減少一次內存拷貝
- 增加反序列化線程
- 重寫采樣代碼,大幅減少采樣影響
- 優化ack代碼
- 優化緩沖map性能
- Java 比clojure更底層
附注:和storm編程方式的改變:
編程接口改變:當topology.max.spout.pending 設置不為1時(包括topology.max.spout.pending設置為null),spout內部將額外啟動一個線程單獨執行ack或fail操作, 從而nextTuple在單獨一個線程中執行,因此允許在nextTuple中執行block動作,而原生的storm,nextTuple/ack/fail 都在一個線程中執行,當數據量不大時,nextTuple立即返回,而ack、fail同樣也容易沒有數據,進而導致CPU 大量空轉,白白浪費CPU, 而在JStorm中, nextTuple可以以block方式獲取數據,比如從disruptor中或BlockingQueue中獲取數據,當沒有數據時,直接block住,節省了大量CPU。
但因此帶來一個問題, 處理ack/fail 和nextTuple時,必須小心線程安全性。
附屬: 當topology.max.spout.pending為1時, 恢復為spout一個線程,即nextTuple/ack/fail 運行在一個線程中。
JStorm的其他優化點
- 資源隔離。不同部門,使用不同的組名,每個組有自己的Quato;不同組的資源隔離;采用cgroups 硬隔離
- Classloader。解決應用的類和Jstorm的類發生沖突,應用的類在自己的類空間中
- Task 內部異步化。Worker 內部全流水線模式,Spout nextTuple和ack/fail運行在不同線程
JStorm與其它產品的比較:
Flume 是一個成熟的系統,主要focus在管道上,將數據從一個數據源傳輸到另外一個數據源, 系統提供大量現成的插件做管道作用。當然也可以做一些計算和分析,但插件的開發沒有Jstorm便捷和迅速。
S4 就是一個半成品,健壯性還可以,但數據准確性較糟糕,無法保證數據不丟失,這個特性讓S4 大受限制,也導致了S4開源很多年,但發展一直不是很迅速。
AKKA 是一個actor模型,也是一個不錯的系統,在這個actor模型基本上,你想做任何事情都沒有問題,但問題是你需要做更多的工作,topology怎么生成,怎么序列化。數據怎么流(隨機,還是group by)等等。
Spark 是一個輕量的內存MR, 更偏重批量數據處理。
3、JStorm性能優化:
- 選型:
按照性能來說, trident < transaction < 使用ack機制普通接口 < 關掉ack機制的普通接口, 因此,首先要權衡一下應該選用什么方式來完成任務。
如果“使用ack機制普通接口”時, 可以嘗試關掉ack機制,查看性能如何,如果性能有大幅提升,則預示着瓶頸不在spout, 有可能是Acker的並發少了,或者業務處理邏輯慢了。
- 增加並發:可以簡單增加並發,查看是否能夠增加處理能力
- 讓task分配更加均勻:當使用fieldGrouping方式時,有可能造成有的task任務重,有的task任務輕,因此讓整個數據流變慢, 盡量讓task之間壓力均勻。
- 使用MetaQ或Kafka時:對於MetaQ和Kafka, 一個分區只能一個線程消費,因此有可能簡單的增加並發無法解決問題, 可以嘗試增加MetaQ和Kafka的分區數。
4、常見問題:
4.1 性能問題
參考上面3中JStorm性能優化
4.2 資源不夠
當報告 ”No supervisor resource is enough for component “, 則意味着資源不夠 如果是僅僅是測試環境,可以將supervisor的cpu 和memory slot設置大,
在jstorm中, 一個task默認會消耗一個cpu slot和一個memory slot, 而一台機器上默認的cpu slot是(cpu 核數 -1), memory slot數(物理內存大小 * 75%/1g), 如果一個worker上運行task比較多時,需要將memory slot size設小(默認是1G), 比如512M, memory.slot.per.size: 535298048
1 #if it is null, then it will be detect by system 2 supervisor.cpu.slot.num: null 3 4 #if it is null, then it will be detect by system 5 supervisor.mem.slot.num: null 6 7 # support disk slot 8 # if it is null, it will use $(storm.local.dir)/worker_shared_data 9 supervisor.disk.slot: null
4.3 序列化問題
所有spout,bolt,configuration, 發送的消息(Tuple)都必須實現Serializable, 否則就會出現序列化錯誤.
如果是spout或bolt的成員變量沒有實現Serializable時,但又必須使用時, 可以對該變量申明時,增加transient 修飾符, 然后在open或prepare時,進行實例化
4.4 Log4j 沖突
0.9.0 開始,JStorm依舊使用Log4J,但storm使用Logbak,因此應用程序如果有依賴log4j-over-slf4j.jar, 則需要exclude 所有log4j-over-slf4j.jar依賴,下個版本將自定義classloader,就不用擔心這個問題。
1 SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. 2 SLF4J: See also 3 http://www.slf4j.org/codes.html#log4jDelegationLoop for more details. 4 Exception in thread "main" java.lang.ExceptionInInitializerError 5 at org.apache.log4j.Logger.getLogger(Logger.java:39) 6 at org.apache.log4j.Logger.getLogger(Logger.java:43) 7 at com.alibaba.jstorm.daemon.worker.Worker.<clinit>(Worker.java:32) 8 Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also 9 http://www.slf4j.org/codes.html#log4jDelegationLoop for more details. 10 at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) 11 ... 3 more 12 Could not find the main class: com.alibaba.jstorm.daemon.worker.Worker. Program will exit.
4.5 類沖突
如果應用程序使用和JStorm相同的jar 但版本不一樣時,建議打開classloader, 修改配置文件
1 topology.enable.classloader: true
或者
1 ConfigExtension.setEnableTopologyClassLoader(conf, true);
JStorm默認是關掉classloader,因此JStorm會強制使用JStorm依賴的jar
4.6 提交任務后,等待幾分鍾后,web ui始終沒有顯示對應的task
有3種情況:
4.6.1用戶程序初始化太慢
如果有用戶程序的日志輸出,則表明是用戶的初始化太慢或者出錯,查看日志即可。 另外對於MetaQ 1.x的應用程序,Spout會recover ~/.meta_recover/目錄下文件,可以直接刪除這些消費失敗的問題,加速啟動。
4.6.2通常是用戶jar沖突或初始化發生問題
打開supervisor 日志,找出啟動worker命令,單獨執行,然后檢查是否有問題。類似下圖:
4.6.3檢查是不是storm和jstorm使用相同的本地目錄
檢查配置項 ”storm.local.dir“, 是不是storm和jstorm使用相同的本地目錄,如果相同,則將二者分開
4.7 提示端口被綁定
有2種情況:
4.7.1多個worker搶占一個端口
假設是6800 端口被占, 可以執行命令 “ps -ef|grep 6800” 檢查是否有多個進程, 如果有多個進程,則手動殺死他們
4.7.2系統打開太多的connection
Linux對外連接端口數限制,TCP client對外發起連接數達到28000左右時,就開始大量拋異常,需要
1 # echo "10000 65535" > /proc/sys/net/ipv4/ip_local_port_range
5、TODO list
- Quato,每個group配額
- Storm on yarn
- 應用自定義Hook
- 權限管理
- logview
- classloader
- upgrade Netty to netty4
參考鏈接:
Github源碼:https://github.com/alibaba/jstorm/
中文文檔:https://github.com/alibaba/jstorm/wiki/JStorm-Chinese-Documentation