Kafka consumer代碼研究及核心邏輯分析


Kafka Consumer API是客戶端的接口,封裝了消息的接收,心跳的檢測,consumer的rebalance等,此分析的代碼基於kafka-clients-0.10.0.1 java版本

KafkaConsumer.pollOnce 是輪詢的入口,完成一次輪詢動作,包括consumer相關的所有邏輯,其邏輯過程如下:

 

 

進一步,將相關的過程展開,如下圖所示:

 

上圖中紅色線框表示pollOnce在一次輪詢中的活動過程,其右邊是相應展開的活動過程,在pollOnce是consumer的關鍵方法,所有相關的邏輯都在這方法中實現,包括消息的拉取,心跳檢測,consumer的再平衡,偏移的自動提交及更新操作等,下面逐個分析

1:獲取coordinator:ensureCoordinatorReady,請求為GroupCoordinatorRequest

對於consumer所在的group(不同的group以groupid區分),需要從所有的broker中找到一個coordinator,用戶本地初始配置一個缺省的broker列表,從中找到一個最近最少負載的節點,構造請求GroupCoordinatorRequest后,放到ConsumerNetworkClient的unsent隊列中,然后阻塞調用ConsumerNetworkClient的poll(future)方法,直到future isDone

private RequestFuture<Void> sendGroupCoordinatorRequest() {

    Node node = this.client.leastLoadedNode(); //找到最少負載節點     ......
    GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
    return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
        .compose(new RequestFutureAdapter<ClientResponse, Void>() {
        @Override
         public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
           handleGroupMetadataResponse(response, future);
         }
     });
   }
}

private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {   ......
   client.tryConnect(coordinator); //連接coordinator // start sending heartbeats only if we have a valid generation
   if (generation > 0)
     heartbeatTask.reset(); //如果generation >0,說明是重新連接coordinator后,則設置心跳延遲任務
   future.complete(null);
   ......
}

 

在kafka 0.9 以前,consumer group是依賴ZK來維護的,但由於有“herd”及“split brain”問題,后重新設計,在新的版本中由broker集群中選擇一個節點作為coordinator,以解決group中各個consumer的同步,如Rebalance,Failover,Partition Assignment,Offset Commit

參考Kafka consumer 設計重構原文:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal

2:加入group,分配partition並同步group 狀態及負載均衡ensurePartitionAssignment,請求為JoinGroupRequest 及SyncGroupRequest

獲取coordinator后,調用ensurePartitionAssignment,在內部又繼續調用ensureActiveGroup方法,這個方法的主要功能就是Join Group 及Sync Group。在向coordinator准備發送JoinGroup請求前,如果在coordinator節點上還有未發出的請求(unsent及inflight隊列),則需要阻塞等所有請求完成后再繼續,sendJoinGroupRequest構造好JoinGroupRequest並放到unsent隊列中,其中傳入了回調類,用於處理響應

 

 private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (coordinatorUnknown())
            return RequestFuture.coordinatorNotAvailable();

        // send a join group request to the coordinator
        log.info("(Re-)joining group {}", groupId);
        JoinGroupRequest request = new JoinGroupRequest(
                groupId,
                this.sessionTimeoutMs,
                this.memberId,
                protocolType(),
                metadata());

        log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
        return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
                .compose(new JoinGroupResponseHandler());
    }

 

在后面的client.poll(future)中阻塞調用,直到coordinator返回結果,回調處理函數JoinGroupResponseHandler.handle,如果返回結果錯誤碼為Errors.NONE,則表明成功加入到group中,如果返回結果表示consumer是leader,則需要在onJoinLeader中繼續,由leader分配分區信息,並告訴coordinator同步給其它的follow。而如果是follow,則在onJoinFollower中發送同步消息

private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {

   @Override
   public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
       ......if (error == Errors.NONE) {
           ......
           if (joinResponse.isLeader()) {
               onJoinLeader(joinResponse).chain(future);
            } else {
               onJoinFollower().chain(future);
            }
         } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
         ......
...... } }
private RequestFuture<ByteBuffer> onJoinFollower() { SyncGroupRequest request = new SyncGroupRequest(groupId, ...); //同步組請求
return sendSyncGroupRequest(request); } private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) { try { // perform the leader synchronization and send back the assignment for the group Map<String, ByteBuffer> groupAssignment = performAssignment(...); //leader分配分區 SyncGroupRequest request = new SyncGroupRequest(...); //leader同步組 return sendSyncGroupRequest(request); } catch (RuntimeException e) { return RequestFuture.failure(e); } }

 

在onJoinLeader中,調用performAssignment方法,根據broker配置的group protocol(如range,roundrobin)來分配group member所消費的TopicPartition,然后發送同步請求SyncGroupRequest到coordinator,而其它的group member則為follow,也同理發送請求,從coordinator獲取所對應的分配狀態,在完成JoinGroup和SyncGroup后,在onJoinComplete更新partition分配狀態

3:更新拉取偏移:updateFetchPositions

參見 Kafka consumer消息的拉取及偏移的管理

 

4:執行延遲任務:executeDelayedTasks

延遲任務包括AutoCommitTask和HeartbeatTask,延遲任務是每隔一個周期執行的任務,自動提交任務的周期是auto.commit.interval.ms,心跳任務的周期是 heartbeat.interval.ms,延遲任務保存在延遲隊列中的DelayedTaskQueue,在到達指定周期后,執行延遲任務,比如提交偏移或心跳檢測

自動提交任務和心跳任務實現了延遲任務接口,並實現了任務運行方法run

 

延遲隊列中的task,會在每次poll時調用其中的run方法,執行具體任務

  • 自動提交任務

在KafkaConsumer實例化時,會創建消費者協調器對象,

private KafkaConsumer(ConsumerConfig config,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
        try {
            this.coordinator = new ConsumerCoordinator(this.client, //創建消費者協調器
            .......
         }
}

在消費者協調器ConsumerCoordinator中,有一個自動提交任務成員

public final class ConsumerCoordinator extends AbstractCoordinator {
    private final AutoCommitTask autoCommitTask; //自動提交任務對象
}

而在消費者協調器對象的創建過程中,如果默認配置為自動提交,則初始化自動提交任務並設置一個提交任務

   public ConsumerCoordinator(ConsumerNetworkClient client, .......) {
       .......

        if (autoCommitEnabled) { //如果配置為自動提交任務,則初始化自動提交任務對象 this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
            this.autoCommitTask.reschedule(); //在延遲隊列中添加任務,設定延遲執行時間
        } else {
            this.autoCommitTask = null;
        }
        ......
    }
public class ConsumerNetworkClient implements Closeable {
    private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue(); //延遲隊列,保存了自動提交任務項及心跳任務項 
    public void schedule(DelayedTask task, long at) {
        delayedTasks.add(task, at);
    }
}

 

  • 心跳檢測任務

在消費者JoinGroup成功后,會開始設置心跳任務

public void ensureActiveGroup() {
        .......
        while (needRejoin()) {
            ensureCoordinatorReady();
            ......
            RequestFuture<ByteBuffer> future = sendJoinGroupRequest(); //申請加入Group
            future.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    onJoinComplete(generation, memberId, protocol, value);
                    needsJoinPrepare = true;
                    heartbeatTask.reset(); //加入Group成功,設置心跳任務
                }

                @Override
                public void onFailure(RuntimeException e) {
                    // we handle failures below after the request finishes. if the join completes
                    // after having been woken up, the exception is ignored and we will rejoin
                }
            });
            ......
        }
    }

 

5:消息的拉取及消費

參見 Kafka consumer消息的拉取及偏移的管理

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM