kafka 消息回溯
指定 offset 的 api
KafkaConsumer#seek
KafkaConsumer#seekToBeginning
KafkaConsumer#seekToEnd
對應
assignedState(tp).seek(offset);
assignedState(partition).reset(offsetResetStrategy);
assignedState(partition).reset(offsetResetStrategy);
首先檢查當前消費者是否分配到分區,然后發送請求
// org.apache.kafka.clients.consumer.internals.SubscriptionState#assignedState private TopicPartitionState assignedState(TopicPartition tp) { TopicPartitionState state = this.assignment.stateValue(tp); if (state == null) throw new IllegalStateException("No current assignment for partition " + tp); return state; }
KafkaConsumer#seek 和 KafkaConsumer#offsetsForTimes 和結合使用
所以,kafka 的消息回溯,需要給消費者發送指令,讓消費者調用 seek 或 seekToBeginning 或 seekToEnd。