kafka consumer是一個單純的單線程程序,因此相對於producer會更好理解些。閱讀consumer代碼的關鍵是理解回調,因為consumer中使用了大量的回調函數。參看kafka中的回調函數
1 整體流程
從KafkaConsumer#pollOnce(..)入口來看consumer的整體流程
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
coordinator.ensureCoordinatorReady(); // 發送獲取coordinator請求,直到獲取到coordinator
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment(); // 發送joinGroup和syncGroup,直到獲取到consumer被分配的parttion信息;並啟動心跳
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions()); // 拉取offset信息和commited信息,以便拉取數據的時候直到從哪開始拉取
long now = time.milliseconds();
client.executeDelayedTasks(now);
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // 從本地數據結構中讀取,並不是發送請求
if (!records.isEmpty()) // 如果獲取到就直接返回
return records;
fetcher.sendFetches(); // 發送拉取數據請求
client.poll(timeout, now); // 真正的發送
return fetcher.fetchedRecords(); // 從本地數據結構中讀取,並不是發送請求
}
2 Reblance joinGroup和syncGroup
consumer需要向coordinator發送請求,來知道自己負責消費哪些topic的哪些partiton。這個過程可以分為兩個請求:
- joinGroup。joinGroup請求加入消費組,一旦coordinator確定了所有成員都發送了joinGroup,就會返回給客戶端response,response中包括memberid、generation、consumer是否是leader等信息。
- syncGroup。如果consumer是leader的話,他會在本地將已經分配好的partiton信息附加到request中,告訴coordinator,我是這樣分配的。這里需要注意consumer分區的分配是放在consumer端的。如果是普通的非leader consumer,那么就是簡單的請求。無論是leader還是普通的消費者, coordinator都會返回consumer需要消費的parttion列表。
joinGroup和syncGroup的主要邏輯在AbstractCoordinator#ensureActiveGroup(..),在發送join和sync之前會提交一把offset,這樣做是為了防止reblance造成的重復消費。
發送sync請求是在join請求的回調函數中,即AbstractCoordinator#JoinGroupResponseHandler(..),也就是說當join請求返回后,調用response的時候會發送一次sync請求。
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public JoinGroupResponse parse(ClientResponse response) {
return new JoinGroupResponse(response.responseBody());
}
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
AbstractCoordinator.this.memberId = joinResponse.memberId(); // 讀取response中的memberid
AbstractCoordinator.this.generation = joinResponse.generationId(); // generationId
AbstractCoordinator.this.rejoinNeeded = false;
AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
sensors.joinLatency.record(response.requestLatencyMs());
// 發送sync請求
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
// 省略其他
}
}
}
需要注意的是,kafka一個group可以消費多個topic,假設如果有兩個topic:TopicA和TopicB,他們分別都有一個消費組名字都叫test,如果TopicA的test內消費者數量變化引起reblance,會造成TopicB的test也會reblance的。可以看下這里:http://www.cnblogs.com/dongxiao-yang/p/5417956.html
3 heartBeat
在發送完joinGroup后會啟動heartBeat。HeartbeatTask實現了DelayedTask。heatbeat定時向coordinator發送心跳信息,如果返回ILLEGAL_GENERATION,說明coordinator已經重新進行了reblance,這個時候consuemr就需要再次發送join和sync請求。如下HeartbeatCompletionHandler
private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
@Override
public HeartbeatResponse parse(ClientResponse response) {
return new HeartbeatResponse(response.responseBody());
}
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatLatency.record(response.requestLatencyMs());
Errors error = Errors.forCode(heartbeatResponse.errorCode());
if (error == Errors.NONE) {
log.debug("Received successful heartbeat response for group {}", groupId);
future.complete(null);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
groupId, coordinator);
coordinatorDead();
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (error == Errors.ILLEGAL_GENERATION) { // 服務端已經是新一代了,客戶端需要reblance。
log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
AbstractCoordinator.this.rejoinNeeded = true; // rejoinNeeded置為true,下次拉取的時候會重新發送join和sync請求
future.raise(Errors.ILLEGAL_GENERATION);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}
}
4 DelayedTask
DelayedTask是一個接口,只有一個run方法,實現了DelayedTask的類只有兩個:AutoCommitTask和HeartbeatTask。兩個都是定時請求的任務。那么consumer單線程是如何實現定時提交的呢?原來Consumer會將AutoCommitTask和HeartbeatTask放在ConsumerNetworkClient#DelayedTaskQueue中,DelayedTaskQueue中包含一個PriorityQueue,會將DelayedTask封裝成Entry並根據時間優先級排序。每次poll的時候都會從DelayedTaskQueue中獲取第一個,根據第一個Entry剩余時間來確定poll阻塞時間。
ConsumerNetworkClient調用schedule將DelayedTaskQueue放到ConsumerNetworkClient#DelayedTaskQueue中
public void schedule(DelayedTask task, long at) {
delayedTasks.add(task, at); // DelayedTaskQueue#add
}
DelayedTaskQueue#add
public class DelayedTaskQueue {
private PriorityQueue<Entry> tasks; // 優先級隊列
public DelayedTaskQueue() {
tasks = new PriorityQueue<Entry>();
}
/**
* Schedule a task for execution in the future.
*
* @param task the task to execute
* @param at the time at which to
*/
public void add(DelayedTask task, long at) {
tasks.add(new Entry(task, at));
}
// ...
}
AutoCommitTask和HeartbeatTask為了能夠一直執行,會在回調函數中將自己重新加入到DelayedTaskQueue中,並指定下次執行的時間。這樣就可以不停的執行了。以heartbeat為例
private class HeartbeatTask implements DelayedTask {
private boolean requestInFlight = false;
public void reset() {
// start or restart the heartbeat task to be executed at the next chance
long now = time.milliseconds();
heartbeat.resetSessionTimeout(now);
client.unschedule(this);
if (!requestInFlight)
client.schedule(this, now);
}
@Override
public void run(final long now) {
if (generation < 0 || needRejoin() || coordinatorUnknown()) {
// no need to send the heartbeat we're not using auto-assignment or if we are
// awaiting a rebalance
return;
}
if (heartbeat.sessionTimeoutExpired(now)) {
// we haven't received a successful heartbeat in one session interval
// so mark the coordinator dead
coordinatorDead();
return;
}
if (!heartbeat.shouldHeartbeat(now)) {
// we don't need to heartbeat now, so reschedule for when we do
client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
} else {
heartbeat.sentHeartbeat(now);
requestInFlight = true;
RequestFuture<Void> future = sendHeartbeatRequest();
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
requestInFlight = false;
long now = time.milliseconds();
heartbeat.receiveHeartbeat(now);
long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
// 回調中再次加入,實現了循環定時執行
client.schedule(HeartbeatTask.this, nextHeartbeatTime);
}
@Override
public void onFailure(RuntimeException e) {
requestInFlight = false;
client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
}
});
}
}
}
5 updateFetchPositions
updateFetchPositions 用於更新commited和offset信息。客戶端的消費狀態是保存在SubscriptionState中的。SubscriptionState有一下主要屬性
public class SubscriptionState {
private Pattern subscribedPattern;
// 消費者訂閱的topic
private final Set<String> subscription;
private final Set<String> groupSubscription;
private final Set<TopicPartition> userAssignment;
// 消費狀態
private final Map<TopicPartition, TopicPartitionState> assignment;
private boolean needsPartitionAssignment;
private boolean needsFetchCommittedOffsets;
private final OffsetResetStrategy defaultResetStrategy;
private ConsumerRebalanceListener listener;
// ...省略
}
private static class TopicPartitionState {
private Long position; // 消費位置,從coordinator拉取的時候會帶上該字段
private OffsetAndMetadata committed; // 已經提交的offset
private boolean paused; // whether this partition has been paused by the user
private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting
}
消費狀態信息最終被保存在TopicPartitionState中,topicPartitionState中有兩個重要的屬性:committed和position。需要注意的是commited和position其實表示下一次需要消費的位置,比如0-10的offsetc都已經提交了,那么從coordinator拉取到的committed是11而不是10;position也是一樣的,如果已經消費到15,那么position的值是16。更多可見consumer提交offset原理
6 幾個重要的參數
- fetch.min.bytes 一個parttion拉取的最小字節數。consumer是批量從broker拉取消息的,fetch.min.bytes表示最小拉取多少字節才返回。默認值是1
- fetch.max.wait.ms 拉取數據的時候最長等待時間,與fetch.min.bytes配合使用。等待fetch.max.wait.ms時間后,還沒有得到fetch.min.bytes大小的數據則返回。默認值500.
- max.partition.fetch.bytes 一個partiton最多拉取字節數。默認值1048576,即1M。
以上參數都是放到request中。如下Fetcher#createFetchRequests(..)
private Map<Node, FetchRequest> createFetchRequests() {
// create the fetch info
Cluster cluster = metadata.fetch();
Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
for (TopicPartition partition : fetchablePartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
} else if (this.client.pendingRequestCount(node) == 0) {
// if there is a leader and no in-flight requests, issue a new fetch
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new HashMap<>();
fetchable.put(node, fetch);
}
long position = this.subscriptions.position(partition);
fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); // fetchSize即max.partition.fetch.bytes
log.trace("Added fetch request for partition {} at offset {}", partition, position);
}
}
// create the fetches
Map<Node, FetchRequest> requests = new HashMap<>();
for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
// maxWaitMs即fetch.max.wait.ms,minBytes即fetch.min.byte
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
requests.put(node, fetch);
}
return requests;
}
- max.poll.records 返回的最大record數。與以上三個參數不同,該參數不會放到fetch request中,拉取的records會放在本地變量中,該參數表示將本地變量中多少records返回。
Fetcher拉取的所有消息都會被放到放到records中,record是一個List,存放了所有partiton的record,max.poll.records參數就用來配置每次從list中返回多少條record的,注意是所有partiton的。
Fetcher#fetchedRecords(..)
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
if (this.subscriptions.partitionAssignmentNeeded()) {
return Collections.emptyMap();
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
throwIfOffsetOutOfRange();
throwIfUnauthorizedTopics();
throwIfRecordTooLarge();
int maxRecords = maxPollRecords;
Iterator<PartitionRecords<K, V>> iterator = records.iterator();
while (iterator.hasNext() && maxRecords > 0) {
PartitionRecords<K, V> part = iterator.next();
maxRecords -= append(drained, part, maxRecords); // maxRecords就是max.poll.records
if (part.isConsumed())
iterator.remove();
}
return drained;
}
}
- 另外在調用consumer api的時候需要制定timeout時間,如果超過timeout仍然沒有消息則返回空的records。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000); // timeout時間
// System.out.println("begin for 2");
for (ConsumerRecord<String, String> record : records) {
// System.out.println("hello");
System.out.println(record.partition() + " " + record.offset());
}
}