Storm和JStorm(阿里的流處理框架)


本文導讀:

1、What——JStorm是什么? 1.1 概述
    1.2優點
    1.3應用場景
    1.4JStorm架構

2、Why——為什么啟動JStorm項目?(與storm的區別) 2.1storm的現狀、缺陷
    2.2JStorm比Storm更穩定,功能更強大,更快!—— 表現
        2.2.1穩定性好的表現
        2.2.2調度強大的表現
        2.2.3性能更好的表現
    2.3性能提升的原因所在
    2.4JStorm的其它優點
    2.5與flume、S4、AKKA、Spark的比較

3、JStorm的性能優化點 4、JStorm的常見問題 5、TODO List 6、參考鏈接

 

1、What——JStorm是什么? 

概述:

  JStorm 是一個分布式實時計算引擎,類似Hadoop MapReduce的系統, 用戶按照規定的編程規范實現一個任務,然后將這個任務遞交給JStorm系統,Jstorm將這個任務跑起來,並且按7 * 24小時運行起來,一旦中間一個worker 發生意外故障, 調度器立即分配一個新的worker替換這個失效的worker。因此,從應用的角度,JStorm 應用是一種遵守某種編程規范的分布式應用。從系統角度,JStorm一套類似MapReduce的調度系統。從數據的角度,是一套基於流水線的消息處理機制。實時計算現在是大數據領域中最火爆的一個方向,因為人們對數據的要求越來越高,實時性要求也越來越快,傳統的 Hadoop Map Reduce,逐漸滿足不了需求,因此在這個領域需求不斷。

  在Storm和JStorm出現以前,市面上出現很多實時計算引擎,但自storm和JStorm出現后,基本上可以說一統江湖,

其優點:

  1. 開發非常迅速: 接口簡單,容易上手,只要遵守Topology,Spout, Bolt的編程規范即可開發出一個擴展性極好的應用,底層rpc,worker之間冗余,數據分流之類的動作完全不用考慮。
  2. 擴展性極好:當一級處理單元速度,直接配置一下並發數,即可線性擴展性能
  3. 健壯:當worker失效或機器出現故障時, 自動分配新的worker替換失效worker;調度器Nimbus采用主從備份,支持熱切
  4. 數據准確性: 可以采用Acker機制,保證數據不丟失。 如果對精度有更多一步要求,采用事務機制,保證數據准確。

應用場景:
  JStorm處理數據的方式是基於消息的流水線處理, 因此特別適合無狀態計算,也就是計算單元的依賴的數據全部在接受的消息中可以找到, 並且最好一個數據流不依賴另外一個數據流。

  1. 日志分析:從日志中分析出特定的數據,並將分析的結果存入外部存儲器如數據庫。目前,主流日志分析技術就使用JStorm或Storm
  2. 管道系統: 將一個數據從一個系統傳輸到另外一個系統, 比如將數據庫同步到Hadoop
  3. 消息轉化器: 將接受到的消息按照某種格式進行轉化,存儲到另外一個系統如消息中間件
  4. 統計分析器: 從日志或消息中,提煉出某個字段,然后做count或sum計算,最后將統計值存入外部存儲器。中間處理過程可能更復雜。
  5. ......

JStorm架構:

  JStorm 從設計的角度,就是一個典型的調度系統

  在這個系統中,

    • Nimbus是作為調度器角色
    • Supervisor 作為worker的代理角色,負責殺死worker和運行worker
    • Worker是task的容器
    • Task是真正任務的執行者
    • ZK 是整個系統中的協調者

 具體參考下圖:

jiagou

 

來自阿里的流處理框架:JStorm

  關於流處理框架,在先前的文章匯總已經介紹過Strom,今天學習的是來自阿里的的流處理框架JStorm。簡單的概述JStorm就是:JStorm 比Storm更穩定,更強大,更快,Storm上跑的程序,一行代碼不變可以運行在JStorm上。直白的講JStorm是阿里巴巴的團隊基於Storm的二次開發產物,相當於他們的Tengine是基於Nginx開發的一樣。以下為阿里巴巴團隊放棄直接使用Storm選擇自行開發JStorm的原因:

jstorm

2、Why——為什么啟動JStorm項目?___與storm的區別

阿里擁有自己的實時計算引擎

  1. 類似於hadoop 中的MR
  2. 開源storm響應太慢
  3. 開源社區的速度完全跟不上Ali的需求
  4. 降低未來運維成本
  5. 提供更多技術支持,加快內部業務響應速度

現有Storm無法滿足一些需求

  1. 現有storm調度太簡單粗暴,無法定制化
  2. Storm 任務分配不平衡
  3. RPC OOM(OOM - Out of Memory,內存溢出 ——俗稱雪崩問題)一直沒有解決
  4. 監控太簡單
  5. 對ZK 訪問頻繁

現狀

  在整個阿里巴巴集團,1000+的物理機上運行着Storm,一淘(200+),CDO(200+),支付寶(150+),B2B(50+),阿里媽媽(50+),共享事業群(50+),其他等。

WHY之一句話概述:JStorm比Storm更穩定,功能更強大,更快!(Storm上跑的程序可以一行代碼不變運行在JStorm上)

  JStorm相比Storm更穩定

  1. Nimbus 實現HA:當一台nimbus掛了,自動熱切到備份nimbus ——Nimbus HA
  2. 原生Storm RPC:Zeromq 使用堆外內存,導致OS 內存不夠,Netty 導致OOM;JStorm底層RPC 采用netty + disruptor,保證發送速度和接受速度是匹配的,徹底解決雪崩問題
  3. 現有Strom,在添加supervisor或者supervisor shutdown時,會觸發任務rebalance;提交新任務時,當worker數不夠時,觸發其他任務做rebalance。——在JStorm中不會發生,使得數據流更穩定
  4. 新上線的任務不會沖擊老的任務:新調度從cpu,memory,disk,net 四個角度對任務進行分配;已經分配好的新任務,無需去搶占老任務的cpu,memory,disk和net ——任務之間影響小
  5. Supervisor主線 ——more catch
  6. Spout/Bolt 的open/prepare ——more catch
  7. 所有IO, 序列化,反序列化 ——more catch
  8. 減少對ZK的訪問量:去掉大量無用的watch;task的心跳時間延長一倍;Task心跳檢測無需全ZK掃描。

  JStorm相比Storm調度更強大

  1. 徹底解決了storm 任務分配不均衡問題
  2. 從4個維度進行任務分配:CPU、Memory、Disk、Net
  3. 默認一個task,一個cpu slot。當task消耗更多的cpu時,可以申請更多cpu slot 
    • 解決新上線的任務去搶占老任務的cpu
    • 一淘有些task內部起很多線程,單task消耗太多cpu
  4. 默認一個task,一個memory slot。當task需要更多內存時,可以申請更多內存slot
    • 先海狗項目中,slot task 需要8G內存,而且其他任務2G內存就夠了
  5. 默認task,不申請disk slot。當task 磁盤IO較重時,可以申請disk slot
    • 海狗/實時同步項目中,task有較重的本地磁盤讀寫操作
  6. 可以強制某個component的task 運行在不同的節點上
    • 聚石塔,海狗項目,某些task提供web Service服務,為了端口不沖突,因此必須強制這些task運行在不同節點上
  7. 可以強制topology運行在單獨一個節點上
    • 節省網絡帶寬
    • Tlog中大量小topology,為了減少網絡開銷,強制任務分配到一個節點上
  8. 可以自定義任務分配:提前預約任務分配到哪台機器上,哪個端口,多少個cpu slot,多少內存,是否申請磁盤
    • 海狗項目中,部分task期望分配到某些節點上
  9. 可以預約上一次成功運行時的任務分配:上次task分配了什么資源,這次還是使用這些資源
    • CDO很多任務期待重啟后,仍使用老的節點,端口

  Task內部異步化

  1. Worker內部全流水線模式
  2. Spout nextTuple和ack/fail運行在不同線程
    • EagleEye中,在nextTuple做sleep和wait操作不會block ack/fail動作  

  JStorm相比Storm性能更好

  JStorm 0.9.0 性能非常的好,使用netty時單worker 發送最大速度為11萬QPS,使用zeromq時,最大速度為12萬QPS。

  • JStorm 0.9.0 在使用Netty的情況下,比Storm 0.9.0 使用netty情況下,快10%, 並且JStorm netty是穩定的而Storm 的Netty是不穩定的
  • 在使用ZeroMQ的情況下, JStorm 0.9.0 比Storm 0.9.0 快30%

為什么更快、性能提升的原因:

  1. Zeromq 減少一次內存拷貝
  2. 增加反序列化線程
  3. 重寫采樣代碼,大幅減少采樣影響
  4. 優化ack代碼
  5. 優化緩沖map性能
  6. Java 比clojure更底層

附注:和storm編程方式的改變:

  編程接口改變:當topology.max.spout.pending 設置不為1時(包括topology.max.spout.pending設置為null),spout內部將額外啟動一個線程單獨執行ack或fail操作, 從而nextTuple在單獨一個線程中執行,因此允許在nextTuple中執行block動作原生的storm,nextTuple/ack/fail 都在一個線程中執行,當數據量不大時,nextTuple立即返回,而ack、fail同樣也容易沒有數據,進而導致CPU 大量空轉,白白浪費CPU, 而在JStorm中, nextTuple可以以block方式獲取數據,比如從disruptor中或BlockingQueue中獲取數據,當沒有數據時,直接block住,節省了大量CPU。

  但因此帶來一個問題, 處理ack/fail 和nextTuple時,必須小心線程安全性

  附屬: 當topology.max.spout.pending為1時, 恢復為spout一個線程,即nextTuple/ack/fail 運行在一個線程中。

JStorm的其他優化點

  1. 資源隔離。不同部門,使用不同的組名,每個組有自己的Quato;不同組的資源隔離;采用cgroups 硬隔離
  2. Classloader。解決應用的類和Jstorm的類發生沖突,應用的類在自己的類空間中
  3. Task 內部異步化。Worker 內部全流水線模式,Spout nextTuple和ack/fail運行在不同線程

JStorm與其它產品的比較:

  Flume 是一個成熟的系統,主要focus在管道上,將數據從一個數據源傳輸到另外一個數據源, 系統提供大量現成的插件做管道作用。當然也可以做一些計算和分析,但插件的開發沒有Jstorm便捷和迅速。

  S4 就是一個半成品,健壯性還可以,但數據准確性較糟糕,無法保證數據不丟失,這個特性讓S4 大受限制,也導致了S4開源很多年,但發展一直不是很迅速。

  AKKA 是一個actor模型,也是一個不錯的系統,在這個actor模型基本上,你想做任何事情都沒有問題,但問題是你需要做更多的工作,topology怎么生成,怎么序列化。數據怎么流(隨機,還是group by)等等。

  Spark 是一個輕量的內存MR, 更偏重批量數據處理。

3、JStorm性能優化:

  1. 選型:

    按照性能來說, trident < transaction < 使用ack機制普通接口 < 關掉ack機制的普通接口, 因此,首先要權衡一下應該選用什么方式來完成任務。

    如果“使用ack機制普通接口”時, 可以嘗試關掉ack機制,查看性能如何,如果性能有大幅提升,則預示着瓶頸不在spout, 有可能是Acker的並發少了,或者業務處理邏輯慢了。

  2. 增加並發:可以簡單增加並發,查看是否能夠增加處理能力
  3. 讓task分配更加均勻:當使用fieldGrouping方式時,有可能造成有的task任務重,有的task任務輕,因此讓整個數據流變慢, 盡量讓task之間壓力均勻。
  4. 使用MetaQ或Kafka時:對於MetaQ和Kafka, 一個分區只能一個線程消費,因此有可能簡單的增加並發無法解決問題, 可以嘗試增加MetaQ和Kafka的分區數

4、常見問題:

4.1 性能問題

  參考上面3中JStorm性能優化

4.2 資源不夠

  當報告 ”No supervisor resource is enough for component “, 則意味着資源不夠 如果是僅僅是測試環境,可以將supervisor的cpu 和memory slot設置大,

  在jstorm中, 一個task默認會消耗一個cpu slot和一個memory slot, 而一台機器上默認的cpu slot是(cpu 核數 -1), memory slot數(物理內存大小 * 75%/1g), 如果一個worker上運行task比較多時,需要將memory slot size設小(默認是1G), 比如512M, memory.slot.per.size: 535298048

1 #if it is null, then it will be detect by system
2  supervisor.cpu.slot.num: null
3 
4  #if it is null, then it will be detect by system
5  supervisor.mem.slot.num: null
6 
7 # support disk slot
8 # if it is null, it will use $(storm.local.dir)/worker_shared_data
9  supervisor.disk.slot: null

4.3 序列化問題

  所有spout,bolt,configuration, 發送的消息(Tuple)都必須實現Serializable, 否則就會出現序列化錯誤.

  如果是spout或bolt的成員變量沒有實現Serializable時,但又必須使用時, 可以對該變量申明時,增加transient 修飾符, 然后在open或prepare時,進行實例化

seriliazble_error

4.4 Log4j 沖突

  0.9.0 開始,JStorm依舊使用Log4J,但storm使用Logbak,因此應用程序如果有依賴log4j-over-slf4j.jar, 則需要exclude 所有log4j-over-slf4j.jar依賴,下個版本將自定義classloader,就不用擔心這個問題。

 1 SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. 
 2 SLF4J: See also 
 3 http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
 4 Exception in thread "main" java.lang.ExceptionInInitializerError
 5         at org.apache.log4j.Logger.getLogger(Logger.java:39)
 6         at org.apache.log4j.Logger.getLogger(Logger.java:43)
 7         at com.alibaba.jstorm.daemon.worker.Worker.<clinit>(Worker.java:32)
 8 Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also 
 9 http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
10         at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
11         ... 3 more
12 Could not find the main class: com.alibaba.jstorm.daemon.worker.Worker.  Program will exit.

4.5 類沖突

  如果應用程序使用和JStorm相同的jar 但版本不一樣時,建議打開classloader, 修改配置文件

1 topology.enable.classloader: true 

  或者

1 ConfigExtension.setEnableTopologyClassLoader(conf, true); 

  JStorm默認是關掉classloader,因此JStorm會強制使用JStorm依賴的jar

4.6 提交任務后,等待幾分鍾后,web ui始終沒有顯示對應的task

  有3種情況:

  4.6.1用戶程序初始化太慢

  如果有用戶程序的日志輸出,則表明是用戶的初始化太慢或者出錯,查看日志即可。 另外對於MetaQ 1.x的應用程序,Spout會recover ~/.meta_recover/目錄下文件,可以直接刪除這些消費失敗的問題,加速啟動。

  4.6.2通常是用戶jar沖突或初始化發生問題

    打開supervisor 日志,找出啟動worker命令,單獨執行,然后檢查是否有問題。類似下圖:

fail_start_worker

  4.6.3檢查是不是storm和jstorm使用相同的本地目錄

    檢查配置項 ”storm.local.dir“, 是不是storm和jstorm使用相同的本地目錄,如果相同,則將二者分開

4.7 提示端口被綁定

  有2種情況:

  4.7.1多個worker搶占一個端口

    假設是6800 端口被占, 可以執行命令 “ps -ef|grep 6800” 檢查是否有多個進程, 如果有多個進程,則手動殺死他們

  4.7.2系統打開太多的connection

    Linux對外連接端口數限制,TCP client對外發起連接數達到28000左右時,就開始大量拋異常,需要

1 # echo "10000 65535" > /proc/sys/net/ipv4/ip_local_port_range

 

5、TODO list

  1. Quato,每個group配額
  2. Storm on yarn
  3. 應用自定義Hook
  4. 權限管理 
  5. logview
  6. classloader
  7. upgrade Netty to netty4

 

參考鏈接

Github源碼:https://github.com/alibaba/jstorm/

中文文檔:https://github.com/alibaba/jstorm/wiki/JStorm-Chinese-Documentation

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM