storm1.0節點間消息傳遞過久分析及調優


  序:最近對storm平台系統進行性能檢測發現偶爾會出現oncebolt向另一個twobolt發送數據后,twobolt要500毫秒后才接收到進行處理。這里簡單說增大twobolt的並行度即可解決,但是究其內部原因是因為storm的通信機制所導致的問題。
  先介紹背景:一個拓撲的結構,spout(並行度:1)[處理性能:capacity 0.04],oncebolt(並行度:20)[處理性能:capacity 0.2],twobolt(並行度:100)[處理性能:capacity 0.6];整個拓撲就我預估最大的處理量就是一秒一千條

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6544017.htmll

微信:intsmaze

避免微信回復重復咨詢問題,技術咨詢請博客留言。

  最近對系統進行性能檢測,統計整個storm系統中一條消息處理中各個IO耗時的時間,找出性能瓶頸。發現除了活動匹配中會有分布式鎖以及大量的redis的IO操作,導致最多會耗時30ms,以及從Hbase中查詢數據時由於hbase集群當時正在跑任務導致耗時1~2s。唯一出現的問題就是onebolt向twobolt發送數據后,某些數據耗時幾百毫秒才會被twobolt接收到。這就引起了我的注意。
先上一下偽代碼:

public class OnceBolt extends BaseRichBolt{
    private static final long serialVersionUID = -5283595260540124273L;
    
    private OutputCollector collector;
    
    
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }
    public void execute(Tuple input) {long intsmazeTime=System.currentTimeMillis();
        collector.emit(input,new Values(intsmazeTime));
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("intsmaze"));
    }
}
public class TwoBolt extends BaseRichBolt{
    private static final long serialVersionUID = -5283595260540124273L;
    
    private OutputCollector collector;
    
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }
    public void execute(Tuple input) {long intsmazeTime=input.getLong(0);
            System.out.println("耗時:"+(System.currentTimeMillis()-intsmazeTime));
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}

這個問題從storm內部通信來說:

每個executor有自己的接收隊列和輸出隊列。

每個worker進程有一個獨立的接收線程將外部發送過來的消息移動到對應的executor線程的接收隊列中。

每個worker存在一個獨立的發送線程負責從worker的傳輸隊列中讀取消息,並通過網絡發送給其他worker。

每個executor有單獨的線程分別來處理spout/bolt的業務邏輯,業務邏輯輸出的中間數據會存放在輸出隊列中,executor的輸出隊列中的tuple達到一定的閥值,executor的發送線程將批量獲取輸出隊列中的tuple,並發送到work中的傳輸隊列中。

  因為oncebolt任務向自己的發送隊列生產過快,且向twobolt任務的接收隊列發送數據過多,導致twobolt的接收隊列滿了,twobolt處理不過來了。[簡單說就是oncebolt生產數據的速度快於twobolt的消費速率]。這個時候就會出現twobolt處理一個oncebolt的消息要幾百毫秒。這個情況是因為twobolt的處理一條消息平均要50毫秒,twobolt接收隊列長度是10,剛好twobolt在從隊列拉取一條消息處理時,twobolt的接收隊列滿了,這個時候隊列中第10條消息等被處理就會阻塞10*50毫秒的。
  同時因為接收隊列滿了,oncebolt就會阻塞到,等twobolt接收隊列有空了再去發送(很多文章說會導致消息丟失,但是我測試發現沒有這種情況,只會阻塞到,這種就是流量洪峰下,storm會出現的一種情況)。這種情況是某幾秒消息量過大導致產生,所以這種情況只是偶爾發送,過一會就會正常了,但是如果交易量一直很大,這個時候我們就要進行調優了,最簡單的就是增大twobolt的並行度以及work數量。
  個人認為的最優並行度設置:我們可以參照每一個節點的capacity的性能指標,比如我們這里spout的指標是0.04所以就不需要再增加它的並行度和kafka的分區保持一致。oncebolt的指標是0.2,而twobolt的指標是0.6。很明顯是oncebolt資源被浪費了或者twobolt的速率跟不上oncebolt,我們給oncebolt的並行度可以減少一半,比如10個。這種方式是減少資源的浪費。或者就目前的問題,增大twobolt的並行度來提示消費的速度。
  還有一個問題我說一下:storm的性能提升我們是增加work數量還是增加節點的並行度。
  這個是一個調優的過程,如果我們只啟動一個work,一昧的在這個work中增加並行度,這樣會導致頻繁的full GC,因為一個work的2G資源供所有的任務一起用;或者我們啟動10個work,每個work只啟動一個任務,先不說浪費資源,首先在任務間傳遞消息時就一定會走網絡通信這也是速率的消耗。所以是一句話,一個work中的任務數量要合理,不要太多,也不要太少,這是一個調優的過程。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM