Kafka Consumer API是客戶端的接口,封裝了消息的接收,心跳的檢測,consumer的rebalance等,此分析的代碼基於kafka-clients-0.10.0.1 java版本
KafkaConsumer.pollOnce 是輪詢的入口,完成一次輪詢動作,包括consumer相關的所有邏輯,其邏輯過程如下:
進一步,將相關的過程展開,如下圖所示:
上圖中紅色線框表示pollOnce在一次輪詢中的活動過程,其右邊是相應展開的活動過程,在pollOnce是consumer的關鍵方法,所有相關的邏輯都在這方法中實現,包括消息的拉取,心跳檢測,consumer的再平衡,偏移的自動提交及更新操作等,下面逐個分析
1:獲取coordinator:ensureCoordinatorReady,請求為GroupCoordinatorRequest
對於consumer所在的group(不同的group以groupid區分),需要從所有的broker中找到一個coordinator,用戶本地初始配置一個缺省的broker列表,從中找到一個最近最少負載的節點,構造請求GroupCoordinatorRequest后,放到ConsumerNetworkClient的unsent隊列中,然后阻塞調用ConsumerNetworkClient的poll(future)方法,直到future isDone
private RequestFuture<Void> sendGroupCoordinatorRequest() { Node node = this.client.leastLoadedNode(); //找到最少負載節點 ...... GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId); return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) .compose(new RequestFutureAdapter<ClientResponse, Void>() { @Override public void onSuccess(ClientResponse response, RequestFuture<Void> future) { handleGroupMetadataResponse(response, future); } }); } } private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) { ...... client.tryConnect(coordinator); //連接coordinator // start sending heartbeats only if we have a valid generation if (generation > 0) heartbeatTask.reset(); //如果generation >0,說明是重新連接coordinator后,則設置心跳延遲任務 future.complete(null); ...... }
在kafka 0.9 以前,consumer group是依賴ZK來維護的,但由於有“herd”及“split brain”問題,后重新設計,在新的版本中由broker集群中選擇一個節點作為coordinator,以解決group中各個consumer的同步,如Rebalance,Failover,Partition Assignment,Offset Commit
參考Kafka consumer 設計重構原文:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
2:加入group,分配partition並同步group 狀態及負載均衡:ensurePartitionAssignment,請求為JoinGroupRequest 及SyncGroupRequest
獲取coordinator后,調用ensurePartitionAssignment,在內部又繼續調用ensureActiveGroup方法,這個方法的主要功能就是Join Group 及Sync Group。在向coordinator准備發送JoinGroup請求前,如果在coordinator節點上還有未發出的請求(unsent及inflight隊列),則需要阻塞等所有請求完成后再繼續,sendJoinGroupRequest構造好JoinGroupRequest並放到unsent隊列中,其中傳入了回調類,用於處理響應
private RequestFuture<ByteBuffer> sendJoinGroupRequest() { if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); // send a join group request to the coordinator log.info("(Re-)joining group {}", groupId); JoinGroupRequest request = new JoinGroupRequest( groupId, this.sessionTimeoutMs, this.memberId, protocolType(), metadata()); log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator); return client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler()); }
在后面的client.poll(future)中阻塞調用,直到coordinator返回結果,回調處理函數JoinGroupResponseHandler.handle,如果返回結果錯誤碼為Errors.NONE,則表明成功加入到group中,如果返回結果表示consumer是leader,則需要在onJoinLeader中繼續,由leader分配分區信息,並告訴coordinator同步給其它的follow。而如果是follow,則在onJoinFollower中發送同步消息
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> { @Override public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { ......if (error == Errors.NONE) { ...... if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); } else { onJoinFollower().chain(future); } } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) { ......
...... } } private RequestFuture<ByteBuffer> onJoinFollower() { SyncGroupRequest request = new SyncGroupRequest(groupId, ...); //同步組請求
return sendSyncGroupRequest(request); } private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) { try { // perform the leader synchronization and send back the assignment for the group Map<String, ByteBuffer> groupAssignment = performAssignment(...); //leader分配分區 SyncGroupRequest request = new SyncGroupRequest(...); //leader同步組 return sendSyncGroupRequest(request); } catch (RuntimeException e) { return RequestFuture.failure(e); } }
在onJoinLeader中,調用performAssignment方法,根據broker配置的group protocol(如range,roundrobin)來分配group member所消費的TopicPartition,然后發送同步請求SyncGroupRequest到coordinator,而其它的group member則為follow,也同理發送請求,從coordinator獲取所對應的分配狀態,在完成JoinGroup和SyncGroup后,在onJoinComplete更新partition分配狀態
3:更新拉取偏移:updateFetchPositions
4:執行延遲任務:executeDelayedTasks
延遲任務包括AutoCommitTask和HeartbeatTask,延遲任務是每隔一個周期執行的任務,自動提交任務的周期是auto.commit.interval.ms,心跳任務的周期是 heartbeat.interval.ms,延遲任務保存在延遲隊列中的DelayedTaskQueue,在到達指定周期后,執行延遲任務,比如提交偏移或心跳檢測
自動提交任務和心跳任務實現了延遲任務接口,並實現了任務運行方法run
延遲隊列中的task,會在每次poll時調用其中的run方法,執行具體任務
- 自動提交任務
在KafkaConsumer實例化時,會創建消費者協調器對象,
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { try { this.coordinator = new ConsumerCoordinator(this.client, //創建消費者協調器 ....... } }
在消費者協調器ConsumerCoordinator中,有一個自動提交任務成員
public final class ConsumerCoordinator extends AbstractCoordinator { private final AutoCommitTask autoCommitTask; //自動提交任務對象 }
而在消費者協調器對象的創建過程中,如果默認配置為自動提交,則初始化自動提交任務並設置一個提交任務
public ConsumerCoordinator(ConsumerNetworkClient client, .......) { ....... if (autoCommitEnabled) { //如果配置為自動提交任務,則初始化自動提交任務對象 this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs); this.autoCommitTask.reschedule(); //在延遲隊列中添加任務,設定延遲執行時間 } else { this.autoCommitTask = null; } ...... }
public class ConsumerNetworkClient implements Closeable { private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue(); //延遲隊列,保存了自動提交任務項及心跳任務項 public void schedule(DelayedTask task, long at) { delayedTasks.add(task, at); } }
- 心跳檢測任務
在消費者JoinGroup成功后,會開始設置心跳任務
public void ensureActiveGroup() { ....... while (needRejoin()) { ensureCoordinatorReady(); ...... RequestFuture<ByteBuffer> future = sendJoinGroupRequest(); //申請加入Group future.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance onJoinComplete(generation, memberId, protocol, value); needsJoinPrepare = true; heartbeatTask.reset(); //加入Group成功,設置心跳任務 } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin } }); ...... } }
5:消息的拉取及消費