原文:kafka consumer.seek 之后立即 poll 可能拉不到消息

问题 有个需求,需要频繁seek到指定partition的指定offset,然后poll,且只poll一次,目的是为了快速将指定offset的消息拉取出来。 通常的poll写法是,将poll逻辑放在死循环里,第一次拉不到,第二次继续。如果offset上有消息,就一定能消费到: 但我使用的是consumer.assign 方法,而不是subscribe 。因为要灵活指定分区,用subscribe的话 ...

2022-02-17 20:48 0 2245 推荐指数:

查看详情

Kafka consumer消息取及偏移的管理

消费者消息并处理主要有4个步骤: 获取消费者所取分区的偏移位置OffsetFetchRequest(新的消息是从偏移位置开始的) 创建FetchReqeust,生成Map<Node, FetchRequest>,以消费者所消息的节点为key来分组,所消费 ...

Tue Oct 23 01:35:00 CST 2018 0 3168
Kafka consumer poll(long)与poll(Duration)的区别

最近在StackOverflow碰到的一个问题,即在consumer.poll之后assignment()返回为空的问题,如下面这段代码所示: 有意思的是,如果是consumer.poll(0);则assignment不为空。之前我以为poll(long)被标记 ...

Fri Apr 26 20:03:00 CST 2019 10 12038
kafkaconsumer.poll(Long)和consumer.poll(Duration.ofMillis(2000)) 的区别

项目中用到了kafka,没用Streaming,只是用了个简单的kafka连接 最初的使用的是consumer.poll(10) 这样取得数据, 发现这样得取数据得方式当连接不上kafka时或者连接不正确,或者broker失败,总而言之就是连接不上kafka,会使得程序一直在运行停不下来 ...

Wed May 26 17:36:00 CST 2021 0 5776
kafka--- consumer 消费消息

1、 consumer API kafka 提供了两套 consumer API: 其中 high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多地关注细节。 1.1 ...

Mon Apr 02 23:24:00 CST 2018 0 2169
kafka consumer 指定 offset,进行消息回溯

kafka 消息回溯 指定 offset 的 api 对应 首先检查当前消费者是否分配到分区,然后发送请求 KafkaConsumer#seek 和 KafkaConsumer#offsetsForTimes 和结合使用 所以,kafka消息 ...

Thu Sep 20 02:52:00 CST 2018 0 5166
Kafka consumer处理大消息数据问题

案例分析 处理kafka consumer的程序的时候,发现如下错误: 如上log可以看出,问题就是有一个较大的消息数据在codeTopic的partition 3上,然后consumer未能消费,提示我可以减小broker允许进入的消息数据的大小,或者增大consumer程序消费数据 ...

Thu Jan 12 21:04:00 CST 2017 0 15524
 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM