Consumer 參數之 Kafka heartbeat.interval.ms 與 session.timeout.ms


  在kafka0.10.1之后的版本中,將session.timeout.ms 和 max.poll.interval.ms 解耦了。也就是說:new KafkaConsumer對象后,在while true循環中執行consumer.poll拉取消息這個過程中,其實背后是有2個線程的,即一個kafka consumer實例包含2個線程:一個是heartbeat 線程,另一個是processing線程,processing線程可理解為調用consumer.poll方法執行消息處理邏輯的線程,而heartbeat線程是一個后台線程,對程序員是"隱藏不見"的。如果消息處理邏輯很復雜,比如說需要處理5min,那么 max.poll.interval.ms可設置成比5min大一點的值。而heartbeat 線程則和上面提到的參數 heartbeat.interval.ms有關,heartbeat線程 每隔heartbeat.interval.ms向coordinator發送一個心跳包,證明自己還活着。只要 heartbeat線程 在 session.timeout.ms 時間內 向 coordinator發送過心跳包,那么 group coordinator就認為當前的kafka consumer是活着的。

  在kafka0.10.1之前,發送心跳包和消息處理邏輯這2個過程是耦合在一起的,試想:如果一條消息處理時長要5min,而session.timeout.ms=3000ms,那么等 kafka consumer處理完消息,group coordinator早就將consumer 移出group了,因為只有一個線程,在消息處理過程中就無法向group coordinator發送心跳包,超過3000ms未發送心跳包,group coordinator就將該consumer移出group了。而將二者分開,一個processing線程負責執行消息處理邏輯,一個heartbeat線程負責發送心跳包,那么:就算一條消息需要處理5min,只要底heartbeat線程在session.timeout.ms向group coordinator發送了心跳包,那consumer可以繼續處理消息,而不用擔心被移出group了。另一個好處是:如果consumer出了問題,那么在 session.timeout.ms內就能檢測出來,而不用等到 max.poll.interval.ms 時長后才能檢測出來。

  heartbeat.interval.ms 默認是 3000 也就是3秒

  session.timeout.ms 10000 (10 s)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM