kafka consumer 自动提交 offset


org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
    client.maybeTriggerWakeup();

    long startMs = time.milliseconds();
    // 这里面触发自动提交
    coordinator.poll(startMs, timeout);

    // Lookup positions of assigned partitions
    boolean hasAllFetchPositions = updateFetchPositions();

    // 对拉取到的数据,更新 position
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty())
        return records;

    // 发送拉取数据请求
    fetcher.sendFetches();

    long nowMs = time.milliseconds();
    long remainingTimeMs = Math.max(0, timeout - (nowMs - startMs));
    long pollTimeout = Math.min(coordinator.timeToNextPoll(nowMs), remainingTimeMs);

    // We do not want to be stuck blocking in poll if we are missing some positions
    // since the offset lookup may be backing off after a failure
    if (!hasAllFetchPositions && pollTimeout > retryBackoffMs)
        pollTimeout = retryBackoffMs;

    client.poll(pollTimeout, nowMs, new PollCondition() {
        @Override
        public boolean shouldBlock() {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        }
    });

    // after the long poll, we should check whether the group needs to rebalance
    // prior to returning data so that the group can stabilize faster
    if (coordinator.needRejoin())
        return Collections.emptyMap();

    return fetcher.fetchedRecords();
}

结论就是:consumer 拉取到消息后,会更新保存的位点信息,下次拉取消息前,若自动提交的时间到了,就会把位点信息提交到 broker。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM