最近在StackOverflow碰到的一個問題,即在consumer.poll之后assignment()返回為空的問題,如下面這段代碼所示:
consumer.subscribe(Arrays.asList("test")); consumer.poll(Duration.ofMillis(0)); // consumer.poll(0);
Set<TopicPartition> assignment = consumer.assignment(); // empty!
有意思的是,如果是consumer.poll(0);則assignment不為空。之前我以為poll(long)被標記為“Deprecated”之后使用poll(Duration)是相同的效果,現在看來兩者還是要有差別的。為什么poll(0)就能獲取到consumer分配方案,而使用poll(Duration)就不能呢?
調研了一番之后發現原因如下:在poll(0)中consumer會一直阻塞直到它成功獲取了所需的元數據信息,之后它才會發起fetch請求去獲取數據。雖然poll可以指定超時時間,但這個超時時間只適用於后面的消息獲取,前面更新元數據信息不計入這個超時時間。poll(Duration)這個版本修改了這樣的設計,會把元數據獲取也計入整個超時時間。由於本例中使用的是0,即瞬時超時,因此consumer根本無法在這么短的時間內連接上coordinator,所以只能趕在超時前返回一個空集合。這就是為什么使用不同版本的poll命令assignment不同的原因。
仔細想想為什么社區要做這樣的變更?poll(0)這種設計的一個問題在於如果遠端的broker不可用了, 那么consumer程序會被無限阻塞下去。用戶指定了超時時間但卻被無限阻塞,顯然這樣的設計時有欠缺的。特別是對於Kafka Streams而言,這個設計可能導致的問題在於Stream Thread無法正常關閉。目前源代碼中依然有一些無限阻塞的場景,比如之前處理的initTransaction,commitTransaction和abortTransaction也是無限等待。看來后面社區還是需要慢慢地將它們都替換掉,畢竟在分布式系統中沒有什么場景是需要絕對地等待的。