1 數據結構
消費者的消費狀態是保存在SubscriptionState類中的,而SubscriptionState有個重要的屬性那就是assignment保存了消費者消費的partition及其partition的狀態
public class SubscriptionState {
/* the pattern user has requested */
private Pattern subscribedPattern;
/* the list of topics the user has requested */
private final Set<String> subscription;
/* the list of topics the group has subscribed to (set only for the leader on join group completion) */
private final Set<String> groupSubscription;
/* the list of partitions the user has requested */
private final Set<TopicPartition> userAssignment;
/* the list of partitions currently assigned */
private final Map<TopicPartition, TopicPartitionState> assignment; // 關鍵, 保存了消費者消費的partition及其partition的狀態
// ...
看下TopicPartitionState。TopicPartitionState用於表示消費者消費到該partition哪個位置了,需要注意的是position表示下一條需要消費的位置而不是已經消費的位置,拉取消息的時候就是根據position來確定需要拉取的第一條消息的offset
private static class TopicPartitionState {
private Long position; // 下一條消費哪個offset
private OffsetAndMetadata committed; // 已經提交的position
private boolean paused; // whether this partition has been paused by the user
private OffsetResetStrategy resetStrategy; // 重置position的時候的策略
// ...
}
public class OffsetAndMetadata implements Serializable {
private final long offset;
private final String metadata;
}
2 commit offset
以KafkaConsumer#commitSync為例來看下客戶端是如何提交offset的
KafkaConsumer#commitSync
public void commitSync() {
acquire();
try {
commitSync(subscriptions.allConsumed()); // 調用SubscriptionState#allConsumed來獲取已經消費的消息的位置,然后將其提交
} finally {
release();
}
}
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
acquire();
try {
coordinator.commitOffsetsSync(offsets);
} finally {
release();
}
}
2.1 獲取已經消費的位置
來看下SubscriptionState#allConsumed,從哪獲取到消費到的位置。從下面的代碼可以看出提交的offset就是TopicPartitionState#position
public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
TopicPartitionState state = entry.getValue();
if (state.hasValidPosition())
allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.position));// 關鍵,原來是將TopicPartitionState中的position封裝成OffsetAndMetadata,即提交的是TopicPartitionState#position
}
return allConsumed;
}
2.2 發送到網絡
獲取到消費到的offset位置后,最終是通過ConsumerCoordinator#sendOffsetCommitRequest將offset發送到coordinator的
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (coordinatorUnknown()) // 必須獲取coordinator
return RequestFuture.coordinatorNotAvailable();
if (offsets.isEmpty())
return RequestFuture.voidSuccess();
// create the offset commit request
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
OffsetAndMetadata offsetAndMetadata = entry.getValue();
offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
offsetAndMetadata.offset(), offsetAndMetadata.metadata())); // 以TopicPartition為key, offsetAndMetadat組成request中的數據
}
OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
this.generation,
this.memberId,
OffsetCommitRequest.DEFAULT_RETENTION_TIME,
offsetData);
log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
.compose(new OffsetCommitResponseHandler(offsets));// 發送到coordinator
}
2.3 處理response
從上面代碼最后一行可以看出處理response的邏輯在OffsetCommitResponseHandler中。如果提交成功,那么會將TopicPartitionState#position更新到TopicPartitionState#commit
private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
private final Map<TopicPartition, OffsetAndMetadata> offsets;
public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
this.offsets = offsets;
}
@Override
public OffsetCommitResponse parse(ClientResponse response) {
return new OffsetCommitResponse(response.responseBody());
}
@Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitLatency.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>();
for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); // this.offsets即sendOffsetCommitRequest中的入參,這點很關鍵
long offset = offsetAndMetadata.offset();
Errors error = Errors.forCode(entry.getValue());
if (error == Errors.NONE) {
if (subscriptions.isAssigned(tp))
subscriptions.committed(tp, offsetAndMetadata); // 更新TopicPartitionState#committed為發送的時候的TopicPartitionState#position
}
// ...
}
}
}
3 總結
- 下一條要消費的消息的offset就是TopicPartitionState#position
- 提交offset的時候即將TopicPartitionState#position發送到coordinator
- 提交成功后則將TopicPartitionState#committed更新為TopicPartitionState#position
