kafka consumer 指定 offset,進行消息回溯


kafka 消息回溯

指定 offset 的 api

KafkaConsumer#seek
KafkaConsumer#seekToBeginning
KafkaConsumer#seekToEnd

對應

assignedState(tp).seek(offset);

assignedState(partition).reset(offsetResetStrategy);

assignedState(partition).reset(offsetResetStrategy);

首先檢查當前消費者是否分配到分區,然后發送請求

// org.apache.kafka.clients.consumer.internals.SubscriptionState#assignedState    
private TopicPartitionState assignedState(TopicPartition tp) {
    TopicPartitionState state = this.assignment.stateValue(tp);
    if (state == null)
        throw new IllegalStateException("No current assignment for partition " + tp);
    return state;
}

KafkaConsumer#seek 和 KafkaConsumer#offsetsForTimes 和結合使用

所以,kafka 的消息回溯,需要給消費者發送指令,讓消費者調用 seek 或 seekToBeginning 或 seekToEnd。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM