storm spout的速度抑制問題


轉發請注明原文地址:http://www.cnblogs.com/dongxiao-yang/p/6031398.html

    最近協助同事優化一個並發消費kafka數據用來計算的任務,壓測過程中發現有兩個spout對應的topic消費速度明顯低於其他topic的指標,每個spout分配10個並發消費速度到了1w左右完全就上不去了,通過監控埋點分析出spout以及下游的bolt代碼塊里面的業務代碼執行耗時完全不高於其余可以正常消費的topic對應的spout組件。

    最后只能摘出有問題的代碼新做一個demo進行測試,發現把nextTuple中 collector.emit()這個方法的調用注銷,只保留讀取kafka的邏輯后demo程序的消費kafka速度也同樣卡在了一個很低的速度,查看問題程序代碼nextTuple的調用邏輯大概如下

if(booleanfunction)

{

collector.emit(....)

}

其中booleanfunction指代一個執行了業務代碼並返回boolean值的方法,推測這個方法在實際線上並沒有每次都返回true進入調用emit方法的環節,

修改代碼如下

if(booleanfunction)

{

collector.emit(....)

}

else

{

collector.emit(....)

}

相當於每次nextTuple調用都會運行emit方法,任務重新上線后10個spout消費輕松突破30W+。

     產生問題的原因是由於storm的spout在nextTuple代碼執行的時候,emit方法每次執行后會在內存里更新一個emitted-count的變量值,如果spout的發現emitted-count跟上次調用完畢后的值一致,表明nextTuple函數沒有發送出去消息,此時會調用spout-wait-strategy的的emitEmpty方法,默認這個方法會sleep一毫秒。所以在沒有emit的情況下nextTuple理論上最大的調用頻率就是1000/s

 

參考資料

storm spout emit 問題

2 《Storm 源碼分析》 第10章 10.3.5 消息循環

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM