org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { client.maybeTriggerWakeup(); long startMs = time.milliseconds(); // 這里面觸發自動提交 coordinator.poll(startMs, timeout); // Lookup positions of assigned partitions boolean hasAllFetchPositions = updateFetchPositions(); // 對拉取到的數據,更新 position Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // 發送拉取數據請求 fetcher.sendFetches(); long nowMs = time.milliseconds(); long remainingTimeMs = Math.max(0, timeout - (nowMs - startMs)); long pollTimeout = Math.min(coordinator.timeToNextPoll(nowMs), remainingTimeMs); // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure if (!hasAllFetchPositions && pollTimeout > retryBackoffMs) pollTimeout = retryBackoffMs; client.poll(pollTimeout, nowMs, new PollCondition() { @Override public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.needRejoin()) return Collections.emptyMap(); return fetcher.fetchedRecords(); }
結論就是:consumer 拉取到消息后,會更新保存的位點信息,下次拉取消息前,若自動提交的時間到了,就會把位點信息提交到 broker。