現象
Spark streaming讀kafka數據做業務處理時,同一個stage的task,有個別task的運行時間比多數task時間都長,造成業務延遲增大。
查看業務對應的topic發現當topic isr不足時,會出現個別task運行時間過長的現象.
原因
和大部分分布式系統一樣,Kafka處理失敗需要明確定義一個Broker是否“活着”。對於Kafka而言,Kafka存活包含兩個條件,一是它必須維護與ZooKeeper的session(這個通過ZooKeeper的Heartbeat機制來實現)。二是Follower必須能夠及時將Leader的消息復制過來,不能“落后太多”。
Leader會跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)。如果一個Follower宕機,或者落后太多,Leader將把它從ISR中移除。這里所描述的“落后太多”指Follower復制的消息落后於Leader后的條數超過預定值(該值通過replica.lag.max.messages配置,其默認值是4000)或者Follower超過一定時間(該值通過replica.lag.time.max.ms來配置,其默認值是10000)未向Leader發送fetch請求。
解決方法
將下面幾個參數適當增大:
replicas響應leader的最長等待時間,若是超過這個時間,就將replicas排除在管理之外
replica.lag.time.max.ms = 10000
如果relicas落后太多,將會認為此partition relicas已經失效。而一般情況下,因為網絡延遲等原因,總會導致replicas中消息同步滯后。如果消息嚴重滯后,leader將認為此relicas網絡延遲較大或者消息吞吐能力有限。在broker數量較少,或者網絡不足的環境中,建議提高此值.
replica.lag.max.messages = 4000
leader中進行復制的線程數,增大這個數值會增加relipca的IO
num.replica.fetchers = 1
replicas每次獲取數據的最大字節數
replica.fetch.max.bytes = 1024 * 1024