問題
有個需求,需要頻繁seek到指定partition的指定offset,然后poll,且只poll一次,目的是為了快速將指定offset的消息拉取出來。
通常的poll寫法是,將poll邏輯放在死循環里,第一次拉不到,第二次繼續。如果offset上有消息,就一定能消費到:
consumer.subscribe("topics");
while(true){
records = consumer.poll(Duration.ofSeconds(1));
// do something with records
}
但我使用的是consumer.assign()方法,而不是subscribe()。因為要靈活指定分區,用subscribe的話,觸發rebalance很麻煩。代碼如下:
public ConsumerRecord<String, String> seekAndPoll(String topic, int partition, long offset) {
TopicPartition tp = new TopicPartition(topic, partition);
consumer.assign(Collections.singleton(tp));
System.out.println("assignment:" + consumer.assignment()); // 這里是有分配到分區的
consumer.seek(tp, offset);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100))
if(records.isEmpty()){
// 大概率拉取不到消息,進入此分支
return null;
} else {
return records.iterator().next();
}
}
由於我只poll一次,這就要求必須一次拉到消息。從現象上看,感覺是在seek之后,kafka有些metadata更新之類的操作未執行完畢,此時poll就拉不到消息。
我在StackOverflow上也搜到了這個問題(java - Kafka Cluster sometimes returns no records during seek and poll - Stack Overflow),但是沒有答案。在解決了這個問題后,我添加了一個答案。
分析
猜測1 新舊poll方法的區別
在測試時,發現有時使用舊版本的poll(long timeout)
方法有效,使用新版本的poll(Duration timeout)
方法無效。會不會跟這個有關?(調式發現無關,不感興趣的可跳過這一節)
兩個poll方法簽名如下:
@Deprecated
public ConsumerRecords<K, V> poll(final long timeoutMs) {
return poll(time.timer(timeoutMs), false);
}
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}
二者都調用了下面的這個poll方法,關鍵在於第二個參數includeMetadataInTimeout
,新版為false,老版為true。
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
// 略
if (includeMetadataInTimeout) {
// try to update assignment metadata BUT do not need to block on the timer for join group
updateAssignmentMetadataIfNeeded(timer, false);
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
log.warn("Still waiting for metadata");
}
}
// 略
}
這個Boolean值最終傳遞給了coordinator.poll()
的waitForJoinGroup
。因此,關鍵就在於coordinator
在poll的時候是否等待消費者成功加入消費組。
boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
return false;
}
return updateFetchPositions(timer);
}
但調試發現,在使用assign手動指定消費分區時,coordinator 為 null。這很好理解,只有subscribe模式才存在重平衡等情況,需要coordinator進行協調。
所以能否拉取到消息,與poll是新版還是舊版無關。
延伸閱讀,關於poll方法改版的KIP:
KIP-266: Fix consumer indefinite blocking behavior
關鍵內容摘抄如下:
Consumer#poll
The pre-existing variant
poll(long timeout)
would block indefinitely for metadata updates if they were needed, then it would issue a fetch and poll fortimeout
ms for new records. The initial indefinite metadata block caused applications to become stuck when the brokers became unavailable. The existence of the timeout parameter made the indefinite block especially unintuitive.We will add a new method
poll(Duration timeout)
with the semantics:
iff a metadata update is needed:
send (asynchronous) metadata requests
poll for metadata responses (counts against timeout)
- if no response within timeout, return an empty collection immediately
if there is fetch data available, return it immediately
if there is no fetch request in flight, send fetch requests
poll for fetch responses (counts against timeout)
- if no response within timeout, return an empty collection (leaving async fetch request for the next poll)
- if we get a response, return the response
We will deprecate the original method,
poll(long timeout)
, and we will not change its semantics, so it remains:
iff a metadata update is needed:
- send (asynchronous) metadata requests
- poll for metadata responses indefinitely until we get it
if there is fetch data available, return it immediately
if there is no fetch request in flight, send fetch requests
poll for fetch responses (counts against timeout)
- if no response within timeout, return an empty collection (leaving async fetch request for the next poll)
- if we get a response, return the response
One notable usage is prohibited by the new
poll
: previously, you could callpoll(0)
to block for metadata updates, for example to initialize the client, supposedly without fetching records. Note, though, that this behavior is not according to any contract, and there is no guarantee thatpoll(0)
won't return records the first time it's called. Therefore, it has always been unsafe to ignore the response.
簡言之,poll(long timeout)
是無限期阻塞的,會等待訂閱的元數據信息更新完成(這個等待時間不包含在timeout之內),確保能拉到消息。而poll(Duration timeout)
不會一直阻塞,經過最多timeout后就會返回,不管拉沒拉到消息。
猜測2 timeout決定了一切
在調式過程中,我發現不管用用新版的poll,還是老版本的poll,當timeout太小時,如10ms,第一次poll多半拉不到消息;timeout足夠大時,比如2000ms,每次都拉到消息了。於是我得出結論:將timeout設置足夠大即可。不足的是,如果傳入的offset參數越界,該位置本來就沒有消息,poll方法也會等待timeout才返回(這里或許是kafka的一個待優化的點?),浪費時間,於是我決定加一個檢查,當傳入的offset超過partition的起始偏移量時,快速返回。代碼如下:
public ConsumerRecord<String, String> seekAndPoll(String topic, int partition, long offset) {
TopicPartition tp = new TopicPartition(topic, partition);
consumer.assign(Collections.singleton(tp));
System.out.println("assignment:" + consumer.assignment()); // 這里是有分配到分區的
// endOffset: the offset of the last successfully replicated message plus one
// if there has 5 messages, valid offsets are [0,1,2,3,4], endOffset is 4+1=5
Long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
Long beginOffset = consumer.beginningOffsets(Collections.singleton(tp)).get(tp);
if (offset < beginOffset || offset >= endOffset) {
System.out.println("offset is illegal");
return null;
} else {
consumer.seek(tp, offset);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000))
if(records.isEmpty()){
return null;
} else {
return records.iterator().next();
}
}
}
真相? consumer.endOffsets()
我做了一個測試,在2個topic的4個partition上反復執行猜測2的代碼,循環10000次,並更改timeout的大小,期望得出timeout值的大小與seekAndPoll失敗之間量化關系。結果發現,即使timeout只有10ms,poll也有非常高的成功率;timeout=50ms時,poll成功率就能達到100%。而之前要timeout=1000ms ~ 2000ms
才能有這么高的成功率。我反復檢查,最終發現是這兩行代碼造成的:
Long beginOffset = consumer.beginningOffsets(Collections.singleton(tp)).get(tp);
Long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
點進endOffsets
方法瀏覽了一下,底下大概做了fetchOffsetsByTimes
、awaitMetadataUpdate
等事情。但是consumer.endOffsets()方法也不是萬無一失的,當timeout=5ms時,poll成功率為97%。
測試代碼見文末。
結論
就按照上述猜測2的代碼寫,timeout設置為2000ms或者更大。只要seek傳入的offset通過了檢查,那么該offset上一定有消息,poll時就會立即返回。因此這個timeout即使設置很大也無影響。
consumer.beginOffsets()
和consumer.endOffsets()
一方面起到了檢查offset的作用,另一方面起到了降低timeout的作用(雖然這並不是目的)。
注意,在beginOffset和endOffset確定的情況下進行多次seek,不需要每次都調API去查詢,而是應該緩存起來多次使用。在這種情況下,能一次poll到消息就靠timeout了,因此,不要把timeout設置得太小,至少100ms。
測試代碼
kafka-test/SeekTest.java at main · whuwangyong/kafka-test (github.com)