項目中用到了kafka,沒用Streaming,只是用了個簡單的kafka連接
最初的使用的是consumer.poll(10) 這樣拉取得數據,
發現這樣得拉取數據得方式當連接不上kafka時或者連接不正確,或者broker失敗,總而言之就是連接不上kafka,會使得程序一直在運行停不下來.
解決辦法:使用consumer.poll(Duration.ofMillis(2000)) //此處筆者設置的超時時間為2s ,超過2s沒拉到數據就斷開。
順便說一下如何判斷自己連接上了Kafka吧
這個問題,我當時時無語了,是網絡通信問題,還是其他問題,經理還要證明自己怎么沒連接上,百度查也是無果。沒連接上就是沒數據唄,這樣不行。集群地址正確,各種地址都正確,就是拉不到數據,讀到的size為0
記住一個吐血的經驗:用kafka工具去看,consumer里有沒有創建出來消費者,消費者只要沒出來,就是沒連接上,代碼中打印消費者地址無效的,可以打印出來也不代表你連接上了,還有即使你kafka工具連接上了集群,也不代表你項目就連接上了,筆者就出現過這種情況。如果集群地址正確,讓人家測測kafka的端口號通了沒,這樣的情況也是有的,還有host地址配了沒,運維的環境地址也得配,都碰到過,哎,說不完的心塞路程。
下面科普下網上查到的不能用poll(Long)原因:
在poll(0) 中consumer會一直阻塞直到它成功獲取了所需的元數據信息,之后它才會發起fetch請求去獲取數據。雖然poll可以指定超時時間,但這個超時時間只適用於后面的消息獲取,前面更新元數據信息不計入這個超時時間。poll(Duration)這個版本修改了這樣的設計,會把元數據獲取也計入整個超時時間。由於本例中使用的是0,即瞬時超時,因此consumer根本無法在這么短的時間內連接上coordinator,所以只能趕在超時前返回一個空集合。這就是為什么使用不同版本的poll命令assignment不同的原因。
仔細想想為什么社區要做這樣的變更?poll(0)這種設計的一個問題在於如果遠端的broker不可用了, 那么consumer程序會被無限阻塞下去。用戶指定了超時時間但卻被無限阻塞,顯然這樣的設計時有欠缺的。特別是對於Kafka Streams而言,這個設計可能導致的問題在於Stream Thread無法正常關閉。目前源代碼中依然有一些無限阻塞的場景,比如之前處理的initTransaction,commitTransaction和abortTransaction也是無限等待。看來后面社區還是需要慢慢地將它們都替換掉,畢竟在分布式系統中沒有什么場景是需要絕對地等待的。