Kafka consumer poll(long)與poll(Duration)的區別


最近在StackOverflow碰到的一個問題,即在consumer.poll之后assignment()返回為空的問題,如下面這段代碼所示:

consumer.subscribe(Arrays.asList("test")); consumer.poll(Duration.ofMillis(0)); // consumer.poll(0);
Set<TopicPartition> assignment = consumer.assignment(); // empty!

有意思的是,如果是consumer.poll(0);則assignment不為空。之前我以為poll(long)被標記為“Deprecated”之后使用poll(Duration)是相同的效果,現在看來兩者還是要有差別的。為什么poll(0)就能獲取到consumer分配方案,而使用poll(Duration)就不能呢?

 

調研了一番之后發現原因如下:在poll(0)中consumer會一直阻塞直到它成功獲取了所需的元數據信息,之后它才會發起fetch請求去獲取數據。雖然poll可以指定超時時間,但這個超時時間只適用於后面的消息獲取,前面更新元數據信息不計入這個超時時間。poll(Duration)這個版本修改了這樣的設計,會把元數據獲取也計入整個超時時間。由於本例中使用的是0,即瞬時超時,因此consumer根本無法在這么短的時間內連接上coordinator,所以只能趕在超時前返回一個空集合。這就是為什么使用不同版本的poll命令assignment不同的原因。

 

仔細想想為什么社區要做這樣的變更?poll(0)這種設計的一個問題在於如果遠端的broker不可用了, 那么consumer程序會被無限阻塞下去。用戶指定了超時時間但卻被無限阻塞,顯然這樣的設計時有欠缺的。特別是對於Kafka Streams而言,這個設計可能導致的問題在於Stream Thread無法正常關閉。目前源代碼中依然有一些無限阻塞的場景,比如之前處理的initTransaction,commitTransaction和abortTransaction也是無限等待。看來后面社區還是需要慢慢地將它們都替換掉,畢竟在分布式系統中沒有什么場景是需要絕對地等待的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM