一.背景介紹
項目上進行算法調度的需求,打算采用kafka作為消息中間件,通過將多個算法消費者加入到同一個group中並行的處理算法請求,從而達到高效處理的目的。但是算法處理的時間較長,多則幾十分鍾,短的幾分鍾。測試的結果是算法時間過長的消費者會引發kafka的rebalance,消費者無法再消費到新數據。
二.rebalance機制介紹
為了弄懂上述問題,還需要了解relance的機制。由於rebalance機制資料較多,在此只進行簡單介紹。
Kafka保證同一groupId的consumer只會消費某條消息(即不重復消費也不漏數據),rebalance划分同一groupId的消費者與topic的分區的一一對應關系。因此每當有消費者加入或是退出時,必定會發生一次rebalance。在rebalance完成之前,消費者是拿不到任何數據的。
三.參數調整
簡單了解rebalance后,再進行kafka的參數調整。此次調整涉及參數如下:
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
props.put("max.poll.records", "1");
props.put("max.poll.interval.ms",180000000);//5小時
props.put("heartbeat.interval.ms","2000");
參數說明:
1. "enable.auto.commit"設置為false后,消費后數據后需要手動調用consumer.commitAsync(),以保證將偏移量信息提交至kafka服務端。"enable.auto.commit"也可設置為true,便不必再手動調用consumer.commitAsync();
2.props.put ("session.timeout.ms",”1800000”),該條參數使用默認值即可,建議不要調整。測試時將該參數調大后,會引發groupId再次消費時無效的問題。
3."heartbeat.interval.ms"->"2000",測試時心跳時使用默認值或是調整2秒均可,理論值應該為"session.timeout.ms"的1/3,但是建議不要調整。
4. props.put("max.poll.records", "1"),每次poll拉取數據的最大條數。測試環境是一條kafka數據,對應一個算法任務,算法處理時間較長,因此測試時設置為1。
5. props.put("max.poll.interval.ms",180000000),該參數時間一定要設置大點,超過消費處理的最大時間開銷。如果該參數較小,消費者處理時間超過該參數后,會引發兩個現象。一個是偏移量提交會報錯。一個是groupId會被移出消費組,再使用該groupId時無法正常
拿到數據。
四、調整后的問題。
按上述參數調整后,多個算法消費者均可以正常消費kafka數據了,但是碰到新的問題。
如果有2個算法消費者正在處理,一個算法需要3分鍾,另一個算法需要20分鍾。當加入一個新的算法消費者后會觸發一次rebalance,觸發rebalance完成后所有消費者還必須要等待那個處理20分鍾的算法消費者調用consumer.poll()接口后,所有消費者才能正常接收數據。
Rebalance之后,會划分完消費者與分區對應關系,空閑的分區所對應的消費者理論上應該在rebalance之后可以直接消費數據。至於為什么非要等待所有消費者執行consumer.poll()接口后才能拿到數據,暫不知其中的原因。
五、總結
Kafka適用於吞吐量高,消費者處理能力高的場景,不太適用消費者處理能力低的場景。如果消費者處理能力低,可以使用其他的中間件,比如:rabbitmq。