kafka consumer代碼梳理


kafka consumer是一個單純的單線程程序,因此相對於producer會更好理解些。閱讀consumer代碼的關鍵是理解回調,因為consumer中使用了大量的回調函數。參看kafka中的回調函數

1 整體流程

從KafkaConsumer#pollOnce(..)入口來看consumer的整體流程

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        coordinator.ensureCoordinatorReady(); // 發送獲取coordinator請求,直到獲取到coordinator

        if (subscriptions.partitionsAutoAssigned())
            coordinator.ensurePartitionAssignment(); // 發送joinGroup和syncGroup,直到獲取到consumer被分配的parttion信息;並啟動心跳

        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions()); // 拉取offset信息和commited信息,以便拉取數據的時候直到從哪開始拉取

        long now = time.milliseconds();

        client.executeDelayedTasks(now);

        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // 從本地數據結構中讀取,並不是發送請求

        if (!records.isEmpty()) // 如果獲取到就直接返回
            return records;

        fetcher.sendFetches(); // 發送拉取數據請求
        client.poll(timeout, now); // 真正的發送
        return fetcher.fetchedRecords(); // 從本地數據結構中讀取,並不是發送請求
    }

2 Reblance joinGroup和syncGroup

consumer需要向coordinator發送請求,來知道自己負責消費哪些topic的哪些partiton。這個過程可以分為兩個請求:

  1. joinGroup。joinGroup請求加入消費組,一旦coordinator確定了所有成員都發送了joinGroup,就會返回給客戶端response,response中包括memberid、generation、consumer是否是leader等信息。
  2. syncGroup。如果consumer是leader的話,他會在本地將已經分配好的partiton信息附加到request中,告訴coordinator,我是這樣分配的。這里需要注意consumer分區的分配是放在consumer端的。如果是普通的非leader consumer,那么就是簡單的請求。無論是leader還是普通的消費者, coordinator都會返回consumer需要消費的parttion列表。

joinGroup和syncGroup的主要邏輯在AbstractCoordinator#ensureActiveGroup(..),在發送join和sync之前會提交一把offset,這樣做是為了防止reblance造成的重復消費。

發送sync請求是在join請求的回調函數中,即AbstractCoordinator#JoinGroupResponseHandler(..),也就是說當join請求返回后,調用response的時候會發送一次sync請求。

private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {

        @Override
        public JoinGroupResponse parse(ClientResponse response) {
            return new JoinGroupResponse(response.responseBody());
        }

        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(joinResponse.errorCode());
            if (error == Errors.NONE) {
                log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
                AbstractCoordinator.this.memberId = joinResponse.memberId(); // 讀取response中的memberid
                AbstractCoordinator.this.generation = joinResponse.generationId(); // generationId
                AbstractCoordinator.this.rejoinNeeded = false;
                AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                sensors.joinLatency.record(response.requestLatencyMs());

                // 發送sync請求
                if (joinResponse.isLeader()) {
                    onJoinLeader(joinResponse).chain(future);
                } else {
                    onJoinFollower().chain(future);
                }
                // 省略其他
            } 
        }
    }

需要注意的是,kafka一個group可以消費多個topic,假設如果有兩個topic:TopicA和TopicB,他們分別都有一個消費組名字都叫test,如果TopicA的test內消費者數量變化引起reblance,會造成TopicB的test也會reblance的。可以看下這里:http://www.cnblogs.com/dongxiao-yang/p/5417956.html

3 heartBeat

在發送完joinGroup后會啟動heartBeat。HeartbeatTask實現了DelayedTask。heatbeat定時向coordinator發送心跳信息,如果返回ILLEGAL_GENERATION,說明coordinator已經重新進行了reblance,這個時候consuemr就需要再次發送join和sync請求。如下HeartbeatCompletionHandler

private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
        @Override
        public HeartbeatResponse parse(ClientResponse response) {
            return new HeartbeatResponse(response.responseBody());
        }

        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            sensors.heartbeatLatency.record(response.requestLatencyMs());
            Errors error = Errors.forCode(heartbeatResponse.errorCode());
            if (error == Errors.NONE) {
                log.debug("Received successful heartbeat response for group {}", groupId);
                future.complete(null);
            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
                        groupId, coordinator);
                coordinatorDead();
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) { // 服務端已經是新一代了,客戶端需要reblance。
                log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
                AbstractCoordinator.this.rejoinNeeded = true; // rejoinNeeded置為true,下次拉取的時候會重新發送join和sync請求
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }

4 DelayedTask

DelayedTask是一個接口,只有一個run方法,實現了DelayedTask的類只有兩個:AutoCommitTask和HeartbeatTask。兩個都是定時請求的任務。那么consumer單線程是如何實現定時提交的呢?原來Consumer會將AutoCommitTask和HeartbeatTask放在ConsumerNetworkClient#DelayedTaskQueue中,DelayedTaskQueue中包含一個PriorityQueue,會將DelayedTask封裝成Entry並根據時間優先級排序。每次poll的時候都會從DelayedTaskQueue中獲取第一個,根據第一個Entry剩余時間來確定poll阻塞時間。

ConsumerNetworkClient調用schedule將DelayedTaskQueue放到ConsumerNetworkClient#DelayedTaskQueue中

public void schedule(DelayedTask task, long at) {
        delayedTasks.add(task, at); // DelayedTaskQueue#add
} 

DelayedTaskQueue#add

public class DelayedTaskQueue {

    private PriorityQueue<Entry> tasks; // 優先級隊列

    public DelayedTaskQueue() {
        tasks = new PriorityQueue<Entry>();
    }

    /**
     * Schedule a task for execution in the future.
     *
     * @param task the task to execute
     * @param at the time at which to
     */
    public void add(DelayedTask task, long at) {
        tasks.add(new Entry(task, at));
    }
    // ...
}

AutoCommitTask和HeartbeatTask為了能夠一直執行,會在回調函數中將自己重新加入到DelayedTaskQueue中,並指定下次執行的時間。這樣就可以不停的執行了。以heartbeat為例

private class HeartbeatTask implements DelayedTask {

        private boolean requestInFlight = false;

        public void reset() {
            // start or restart the heartbeat task to be executed at the next chance
            long now = time.milliseconds();
            heartbeat.resetSessionTimeout(now);
            client.unschedule(this);

            if (!requestInFlight)
                client.schedule(this, now);
        }

        @Override
        public void run(final long now) {
            if (generation < 0 || needRejoin() || coordinatorUnknown()) {
                // no need to send the heartbeat we're not using auto-assignment or if we are
                // awaiting a rebalance
                return;
            }

            if (heartbeat.sessionTimeoutExpired(now)) {
                // we haven't received a successful heartbeat in one session interval
                // so mark the coordinator dead
                coordinatorDead();
                return;
            }

            if (!heartbeat.shouldHeartbeat(now)) {
                // we don't need to heartbeat now, so reschedule for when we do
                client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
            } else {
                heartbeat.sentHeartbeat(now);
                requestInFlight = true;

                RequestFuture<Void> future = sendHeartbeatRequest();
                future.addListener(new RequestFutureListener<Void>() {
                    @Override
                    public void onSuccess(Void value) {
                        requestInFlight = false;
                        long now = time.milliseconds();
                        heartbeat.receiveHeartbeat(now);
                        long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);

                        // 回調中再次加入,實現了循環定時執行
                        client.schedule(HeartbeatTask.this, nextHeartbeatTime);
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        requestInFlight = false;
                        client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
                    }
                });
            }
        }
    }

5 updateFetchPositions

updateFetchPositions 用於更新commited和offset信息。客戶端的消費狀態是保存在SubscriptionState中的。SubscriptionState有一下主要屬性

public class SubscriptionState {
    private Pattern subscribedPattern;
    // 消費者訂閱的topic
    private final Set<String> subscription;
    private final Set<String> groupSubscription;
    private final Set<TopicPartition> userAssignment;
    // 消費狀態
    private final Map<TopicPartition, TopicPartitionState> assignment;
    private boolean needsPartitionAssignment;
    private boolean needsFetchCommittedOffsets;
    private final OffsetResetStrategy defaultResetStrategy;
    private ConsumerRebalanceListener listener;
    // ...省略
}

private static class TopicPartitionState {
        private Long position; // 消費位置,從coordinator拉取的時候會帶上該字段
        private OffsetAndMetadata committed;  // 已經提交的offset
        private boolean paused;  // whether this partition has been paused by the user
        private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
}

消費狀態信息最終被保存在TopicPartitionState中,topicPartitionState中有兩個重要的屬性:committed和position。需要注意的是commited和position其實表示下一次需要消費的位置,比如0-10的offsetc都已經提交了,那么從coordinator拉取到的committed是11而不是10;position也是一樣的,如果已經消費到15,那么position的值是16。更多可見consumer提交offset原理

6 幾個重要的參數

  1. fetch.min.bytes 一個parttion拉取的最小字節數。consumer是批量從broker拉取消息的,fetch.min.bytes表示最小拉取多少字節才返回。默認值是1
  2. fetch.max.wait.ms 拉取數據的時候最長等待時間,與fetch.min.bytes配合使用。等待fetch.max.wait.ms時間后,還沒有得到fetch.min.bytes大小的數據則返回。默認值500.
  3. max.partition.fetch.bytes 一個partiton最多拉取字節數。默認值1048576,即1M。

以上參數都是放到request中。如下Fetcher#createFetchRequests(..)

private Map<Node, FetchRequest> createFetchRequests() {
        // create the fetch info
        Cluster cluster = metadata.fetch();
        Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
        for (TopicPartition partition : fetchablePartitions()) {
            Node node = cluster.leaderFor(partition);
            if (node == null) {
                metadata.requestUpdate();
            } else if (this.client.pendingRequestCount(node) == 0) {
                // if there is a leader and no in-flight requests, issue a new fetch
                Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                if (fetch == null) {
                    fetch = new HashMap<>();
                    fetchable.put(node, fetch);
                }

                long position = this.subscriptions.position(partition);
                fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); // fetchSize即max.partition.fetch.bytes
                log.trace("Added fetch request for partition {} at offset {}", partition, position);
            }
        }

        // create the fetches
        Map<Node, FetchRequest> requests = new HashMap<>();
        for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
            Node node = entry.getKey();
            // maxWaitMs即fetch.max.wait.ms,minBytes即fetch.min.byte
            FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
            requests.put(node, fetch);
        }
        return requests;
    }
  1. max.poll.records 返回的最大record數。與以上三個參數不同,該參數不會放到fetch request中,拉取的records會放在本地變量中,該參數表示將本地變量中多少records返回。

Fetcher拉取的所有消息都會被放到放到records中,record是一個List,存放了所有partiton的record,max.poll.records參數就用來配置每次從list中返回多少條record的,注意是所有partiton的。

Fetcher#fetchedRecords(..)

public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        if (this.subscriptions.partitionAssignmentNeeded()) {
            return Collections.emptyMap();
        } else {
            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
            throwIfOffsetOutOfRange();
            throwIfUnauthorizedTopics();
            throwIfRecordTooLarge();

            int maxRecords = maxPollRecords;
            Iterator<PartitionRecords<K, V>> iterator = records.iterator();
            while (iterator.hasNext() && maxRecords > 0) {
                PartitionRecords<K, V> part = iterator.next();
                maxRecords -= append(drained, part, maxRecords); // maxRecords就是max.poll.records
                if (part.isConsumed())
                    iterator.remove();
            }
            return drained;
        }
    }
  1. 另外在調用consumer api的時候需要制定timeout時間,如果超過timeout仍然沒有消息則返回空的records。
while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000); // timeout時間
//            System.out.println("begin for 2");
            for (ConsumerRecord<String, String> record : records) {
//                System.out.println("hello");
                System.out.println(record.partition() + " " + record.offset());
            }
        }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM