2018年05月31日 13:26:59 xiaoguozi0218 閱讀數:2018更多
年后上線的系統,與其他業務系統的通信方式采用了第三代消息系統中間件Kafka。由於是第一次使用,踩了很多坑,通過這篇博客和大家分享一下,也算是做個總結,以便以后溫故而知新。
一、線上問題
系統平穩運行兩個多月,基本上沒有問題,知道最近幾天,突然出現Kafka手動提交失敗,堆棧信息如下:
通過堆棧信息可以看出,有兩個重要參數: session.timeout 和 max.poll.records
session.timeout.ms : 在使用Kafka的團隊管理設施時,用於檢測消費者失敗的超時時間。消費者定期發送心跳來向經紀人表明其活躍度。如果代理在該會話超時到期之前沒有收到心跳,那么代理將從該組中刪除該消費者並啟動重新平衡。
max.poll.records : 在一次調用poll()中返回的最大記錄數。
根據堆棧的提示,他讓增加 session.timeout.ms 時間 或者 減少 max.poll.records。
二、解決過程
然后我琢磨,上線兩個月都沒有問題,為什么最近突然出現問題了。我想肯定是業務系統有什么動作,我就去問了一個下,果然頭一天風控系統kafka掛掉了,並進行了數據重推,導致了數據阻塞。但是我又想即使阻塞了也會慢慢消費掉牙,不應該報錯呀。后來我看了一下kafka官網上的參數介紹,發現max.poll.records默認是2147483647 (0.10.0.1版本),也就是kafka里面有多少poll多少,如果消費者拿到的這些數據在制定時間內消費不完,就會手動提交失敗,數據就會回滾到kafka中,會發生重復消費的情況。如此循環,數據就會越堆越多。后來咨詢了公司的kafka大神,他說我的kafka版本跟他的集群版本不一樣讓我升級kafka版本。於是我就升級到了0.10.2.1,查閱官網發現這個版本的max.poll.records默認是500,可能kafka開發團隊也意識到了這個問題。並且這個版本多了一個max.poll.interval.ms這個參數,默認是300s。這個參數的大概意思就是kafka消費者在一次poll內,業務處理時間不能超過這個時間。后來升級了kafka版本,把max.poll.records改成了50個之后,上了一次線,准備觀察一下。上完線已經晚上9點了,於是就打卡回家了,明天看結果。第二天早起滿心歡喜准備看結果,以為會解決這個問題,誰曾想還是堆積。我的天,思來想去,也想不出哪里有問題。於是就把處理各個業務的代碼前后執行時間打印出來看一下,添加代碼,提交上線。然后觀察結果,發現大部分時間都用在數據庫IO上了,並且執行時間很慢,大部分都是2s。於是想可能剛上線的時候數據量比較小,查詢比較快,現在數據量大了,就比較慢了。當時腦子里第一想法就是看了一下常用查詢字段有沒有添加索引,一看沒有,然后馬上添加索引。加完索引觀察了一下,處理速度提高了好幾倍。雖然單條業務處理的快樂, 但是堆積還存在,后來發現,業務系統大概1s推送3、4條數據,但是我kafka現在是單線程消費,速度大概也是這么多。再加上之前的堆積,所以消費還是很慢。於是業務改成多線程消費,利用線程池,開啟了10個線程,上線觀察。幾分鍾就消費完了。大功告成,此時此刻,心里舒坦了好多。不容易呀!
總結:
1、 使用Kafka時,消費者每次poll的數據業務處理時間不能超過kafka的max.poll.interval.ms,該參數在kafka0.10.2.1中的默認值是300s,所以要綜合業務處理時間和每次poll的數據數量。
2、Java線程池大小的選擇,
對於CPU密集型應用,也就是計算密集型,線程池大小應該設置為CPU核數+1;
對於IO密集型應用 ,線程池大小設置為 2*CPU核數+1.