轉發請注明原文地址:http://www.cnblogs.com/dongxiao-yang/p/6031398.html
最近協助同事優化一個並發消費kafka數據用來計算的任務,壓測過程中發現有兩個spout對應的topic消費速度明顯低於其他topic的指標,每個spout分配10個並發消費速度到了1w左右完全就上不去了,通過監控埋點分析出spout以及下游的bolt代碼塊里面的業務代碼執行耗時完全不高於其余可以正常消費的topic對應的spout組件。
最后只能摘出有問題的代碼新做一個demo進行測試,發現把nextTuple中 collector.emit()這個方法的調用注銷,只保留讀取kafka的邏輯后demo程序的消費kafka速度也同樣卡在了一個很低的速度,查看問題程序代碼nextTuple的調用邏輯大概如下
if(booleanfunction)
{
collector.emit(....)
}
其中booleanfunction指代一個執行了業務代碼並返回boolean值的方法,推測這個方法在實際線上並沒有每次都返回true進入調用emit方法的環節,
修改代碼如下
if(booleanfunction)
{
collector.emit(....)
}
else
{
collector.emit(....)
}
相當於每次nextTuple調用都會運行emit方法,任務重新上線后10個spout消費輕松突破30W+。
產生問題的原因是由於storm的spout在nextTuple代碼執行的時候,emit方法每次執行后會在內存里更新一個emitted-count的變量值,如果spout的發現emitted-count跟上次調用完畢后的值一致,表明nextTuple函數沒有發送出去消息,此時會調用spout-wait-strategy的的emitEmpty方法,默認這個方法會sleep一毫秒。所以在沒有emit的情況下nextTuple理論上最大的調用頻率就是1000/s
。
參考資料
2 《Storm 源碼分析》 第10章 10.3.5 消息循環