原文:kafka consumer.seek 之后立即 poll 可能拉不到消息

問題 有個需求,需要頻繁seek到指定partition的指定offset,然后poll,且只poll一次,目的是為了快速將指定offset的消息拉取出來。 通常的poll寫法是,將poll邏輯放在死循環里,第一次拉不到,第二次繼續。如果offset上有消息,就一定能消費到: 但我使用的是consumer.assign 方法,而不是subscribe 。因為要靈活指定分區,用subscribe的話 ...

2022-02-17 20:48 0 2245 推薦指數:

查看詳情

Kafka consumer消息取及偏移的管理

消費者消息並處理主要有4個步驟: 獲取消費者所取分區的偏移位置OffsetFetchRequest(新的消息是從偏移位置開始的) 創建FetchReqeust,生成Map<Node, FetchRequest>,以消費者所消息的節點為key來分組,所消費 ...

Tue Oct 23 01:35:00 CST 2018 0 3168
Kafka consumer poll(long)與poll(Duration)的區別

最近在StackOverflow碰到的一個問題,即在consumer.poll之后assignment()返回為空的問題,如下面這段代碼所示: 有意思的是,如果是consumer.poll(0);則assignment不為空。之前我以為poll(long)被標記 ...

Fri Apr 26 20:03:00 CST 2019 10 12038
kafkaconsumer.poll(Long)和consumer.poll(Duration.ofMillis(2000)) 的區別

項目中用到了kafka,沒用Streaming,只是用了個簡單的kafka連接 最初的使用的是consumer.poll(10) 這樣取得數據, 發現這樣得取數據得方式當連接不上kafka時或者連接不正確,或者broker失敗,總而言之就是連接不上kafka,會使得程序一直在運行停不下來 ...

Wed May 26 17:36:00 CST 2021 0 5776
kafka--- consumer 消費消息

1、 consumer API kafka 提供了兩套 consumer API: 其中 high-level consumer API 提供了一個從 kafka 消費數據的高層抽象,而 SimpleConsumer API 則需要開發人員更多地關注細節。 1.1 ...

Mon Apr 02 23:24:00 CST 2018 0 2169
kafka consumer 指定 offset,進行消息回溯

kafka 消息回溯 指定 offset 的 api 對應 首先檢查當前消費者是否分配到分區,然后發送請求 KafkaConsumer#seek 和 KafkaConsumer#offsetsForTimes 和結合使用 所以,kafka消息 ...

Thu Sep 20 02:52:00 CST 2018 0 5166
Kafka consumer處理大消息數據問題

案例分析 處理kafka consumer的程序的時候,發現如下錯誤: 如上log可以看出,問題就是有一個較大的消息數據在codeTopic的partition 3上,然后consumer未能消費,提示我可以減小broker允許進入的消息數據的大小,或者增大consumer程序消費數據 ...

Thu Jan 12 21:04:00 CST 2017 0 15524
 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM