轉自:http://blog.csdn.net/chunlongyu/article/details/52791874
單線程的consumer
在前面我們講過,KafkaProducer是線程安全的,同時其內部還有一個Sender,開了一個后台線程,不斷從隊列中取消息進行發送。
而consumer,是一個純粹的單線程程序,后面所講的所有機制,包括coordinator,rebalance, heartbeat等,都是在這個單線程的poll函數里面完成的。也因此,在consumer的代碼內部,沒有鎖的出現。
//客戶端線程 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); 。。。 }
- 1
- 2
- 3
- 4
- 5
何為coordinator?
去Zookeeper依賴
在0.9以前的client api中,consumer是要依賴Zookeeper的。因為同一個consumer group中的所有consumer需要進行協同,進行下面所講的rebalance。
但是因為zookeeper的“herd”與“split brain”,導致一個group里面,不同的consumer擁有了同一個partition,進而會引起消息的消費錯亂。為此,在0.9中,不再用zookeeper,而是Kafka集群本身來進行consumer之間的同步。下面引自kafka設計的原文:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
The current version of the high level consumer suffers from herd and split brain problems, where multiple consumers in a group run a distributed algorithm to agree on the same partition ownership decision. Due to different view of the zookeeper data, they run into conflicts that makes the rebalancing attempt fail. But there is no way for a consumer to verify if a rebalancing operation completed successfully on the entire group. This also leads to some potential bugs in the rebalancing logic, for example, https://issues.apache.org/jira/browse/KAFKA-242
- 1
為什么在一個group內部,1個parition只能被1個consumer擁有?
我們知道,對於屬於不同consumer group的consumers,可以消費同1個partition,從而實現Pub/Sub模式。
但是在一個group內部,是不允許多個consumer消費同一個partition的,這也就意味着,對於1個topic,1個group來說, 其partition數目 >= consumer個數。
比如對於1個topic,有4個partition,那么在一個group內部,最多只能有4個consumer。你加入更多的consumer,它們也不會分配到partition。
那為什么要做這個限制呢?原因在下面這篇文章中,有詳細闡述:
http://stackoverflow.com/questions/25896109/in-apache-kafka-why-cant-there-be-more-consumer-instances-than-partitions
簡單來說,一個是因為這樣做就沒辦法保證同1個partition消息的時序;另1方面,Kafka的服務器,是每個topic的每個partition的每個consumer group對應1個offset,即(topic, partition, consumer_group_id) – offset。如果多個consumer並行消費同1個partition,那offset的confirm就會出問題。
知道了這個前提,下面我們就來分析partition的分配問題。
coordinator協議 / partition分配問題
問題的提出:
給定一個topic,有4個partition: p0, p1, p2, p3, 一個group有3個consumer: c0, c1, c2。那么,如果按范圍分配策略,分配結果是:
c0: p0, c1: p1, c2: p2, p3
如果按輪詢分配策略:
c0: p1, p3, c1: p1, c2: p2
那這整個分配過程是如何進行的呢?見下圖所示:
3步分配過程
步驟1:對於每1個consumer group,Kafka集群為其從broker集群中選擇一個broker作為其coordinator。因此,第1步就是找到這個coordinator。
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { coordinator.ensureCoordinatorKnown(); //poll函數的第1行,就是尋找coordinator。如果沒找到,就會一直阻塞在這里 ... } public void ensureCoordinatorKnown() { while (coordinatorUnknown()) { RequestFuture<Void> future = sendGroupMetadataRequest(); client.poll(future); if (future.failed()) { if (future.isRetriable()) client.awaitMetadataUpdate(); else throw future.exception(); } } } private RequestFuture<Void> sendGroupMetadataRequest() { Node node = this.client.leastLoadedNode(); if (node == null) { return RequestFuture.noBrokersAvailable(); } else { GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId); //向集群中負載最小的node,發送請求,詢問這個group id對應的coordinator是誰 return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) .compose(new RequestFutureAdapter<ClientResponse, Void>() { @Override public void onSuccess(ClientResponse response, RequestFuture<Void> future) { handleGroupMetadataResponse(response, future); } }); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
步驟2:找到coordinator之后,發送JoinGroup請求
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { coordinator.ensureCoordinatorKnown(); //步驟1:尋找coordinator if (subscriptions.partitionsAutoAssigned()) coordinator.ensurePartitionAssignment(); //步驟2+3: JoinGroup + SyncGroup public void ensureActiveGroup() { if (!needRejoin()) return; if (needsJoinPrepare) { onJoinPrepare(generation, memberId); needsJoinPrepare = false; } while (needRejoin()) { ensureCoordinatorKnown(); if (client.pendingRequestCount(this.coordinator) > 0) { client.awaitPendingRequests(this.coordinator); continue; } RequestFuture<ByteBuffer> future = performGroupJoin(); client.poll(future); if (future.succeeded()) { onJoinComplete(generation, memberId, protocol, future.value()); needsJoinPrepare = true; heartbeatTask.reset(); } else { RuntimeException exception = future.exception(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue; else if (!future.isRetriable()) throw exception; time.sleep(retryBackoffMs); } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
步驟3:JoinGroup返回之后,發送SyncGroup,得到自己所分配到的partition
private RequestFuture<ByteBuffer> performGroupJoin() { if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); // send a join group request to the coordinator log.debug("(Re-)joining group {}", groupId); JoinGroupRequest request = new JoinGroupRequest( groupId, this.sessionTimeoutMs, this.memberId, protocolType(), metadata()); // create the request for the coordinator log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id()); return client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler()); } private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> { @Override public JoinGroupResponse parse(ClientResponse response) { return new JoinGroupResponse(response.responseBody()); } @Override public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { // process the response short errorCode = joinResponse.errorCode(); if (errorCode == Errors.NONE.code()) { log.debug("Joined group: {}", joinResponse.toStruct()); AbstractCoordinator.this.memberId = joinResponse.memberId(); AbstractCoordinator.this.generation = joinResponse.generationId(); AbstractCoordinator.this.rejoinNeeded = false; AbstractCoordinator.this.protocol = joinResponse.groupProtocol(); sensors.joinLatency.record(response.requestLatencyMs()); if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); //關鍵點:在JoinGroup返回之后,竟跟着發送SyncGroup消息 } else { onJoinFollower().chain(future); } } else if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) { 。。。 } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
注意,在上面3步中,有一個關鍵點: partition的分配策略和分配結果其實是由client決定的,而不是由coordinator決定的。什么意思呢?在第2步,所有consumer都往coordinator發送JoinGroup消息之后,coordinator會指定其中一個consumer作為leader,其他consumer作為follower。
然后由這個leader進行partition分配。然后在第3步,leader通過SyncGroup消息,把分配結果發給coordinator,其他consumer也發送SyncGroup消息,獲得這個分配結果。
為什么要在consumer中選一個leader出來,進行分配,而不是由coordinator直接分配呢?關於這個, Kafka的官方文檔有詳細的分析。其中一個重要原因是為了靈活性:如果讓server分配,一旦需要新的分配策略,server集群要重新部署,這對於已經在線上運行的集群來說,代價是很大的;而讓client分配,server集群就不需要重新部署了。
rebalance機制
所謂rebalance,就是在某些條件下,partition要在consumer中重新分配。那哪些條件下,會觸發rebalance呢?
條件1:有新的consumer加入
條件2:舊的consumer掛了
條件3:coordinator掛了,集群選舉出新的coordinator
條件4:topic的partition新加
條件5:consumer調用unsubscrible(),取消topic的訂閱
當consumers檢測到要rebalance時,所有consumer都會重走上面的流程,進行步驟2 + 步驟3: JoinGroup + SyncGroup。
可問題是: 當一個consumer掛了,或者有新的consumer加入,其他consumers怎么知道要進行rebalance呢? 答案就是下面的heartbeat。
heartbeat的實現原理
每一個consumer都會定期的往coordinator發送heartbeat消息,一旦coordinator返回了某個特定的error code:ILLEGAL_GENERATION, 就說明之前的group無效了(解散了),要重新進行JoinGroup + SyncGroup操作。
那這個定期發送如何實現呢?一個直觀的想法就是開一個后台線程,定時發送heartbeat消息,但維護一個后台線程,很顯然會增大實現的復雜性。上面也說了, consumer是單線程程序。在這里是通過DelayedQueue來實現的。
DelayedQueue與HeartBeatTask
其基本思路是把HeartBeatRequest放入一個DelayedQueue中,然后在while循環的poll中,每次從DelayedQueue中把請求拿出來發送出去(只有時間到了,Task才能從Queue中拿出來)。
private class HeartbeatTask implements DelayedTask { private boolean requestInFlight = false; //關鍵變量:判斷當前是否有發送出去的HeartBeatRequest,其Response還沒回來 //reset本質就是發送函數 public void reset() { long now = time.milliseconds(); heartbeat.resetSessionTimeout(now); client.unschedule(this); if (!requestInFlight) client.schedule(this, now); //如果沒有requestInFlight,則把HeartBeatTask放入DelayedQueue中 } @Override public void run(final long now) { if (generation < 0 || needRejoin() || coordinatorUnknown()) { return; } if (heartbeat.sessionTimeoutExpired(now)) { coordinatorDead(); return; } if (!heartbeat.shouldHeartbeat(now)) { client.schedule(this, now + heartbeat.timeToNextHeartbeat(now)); } else { heartbeat.sentHeartbeat(now); requestInFlight = true; RequestFuture<Void> future = sendHeartbeatRequest(); future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { requestInFlight = false; long now = time.milliseconds(); heartbeat.receiveHeartbeat(now); long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now); //放入delayedQueue client.schedule(HeartbeatTask.this, nextHeartbeatTime); } //hearbeat返回之后,無論response成功,還是失敗,把下1個heartbeat放入delayedQueue,從而形成循環間隔發送 @Override public void onFailure(RuntimeException e) { requestInFlight = false; client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs); } }); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
rebalance檢測
首先一點說明:個人認為這里的網絡框架,封裝的有點冗余:sendHeartbeatRequest既有callback機制(CompleteHandler),又為其Future加了Listener機制(上面的代碼)。
也就是在heartbeat的completeHandler中,完成了rebalance的檢測:從下面代碼可以看出,對於以下的response error code,都會觸發rebalance:
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* REBALANCE_IN_PROGRESS (27)
* GROUP_AUTHORIZATION_FAILED (30)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
public RequestFuture<Void> sendHeartbeatRequest() {
HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId); return client.send(coordinator, ApiKeys.HEARTBEAT, req) .compose(new HeartbeatCompletionHandler()); } private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> { @Override public HeartbeatResponse parse(ClientResponse response) { return new HeartbeatResponse(response.responseBody()); } @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); short errorCode = heartbeatResponse.errorCode(); if (errorCode == Errors.NONE.code()) { log.debug("Received successful heartbeat response."); future.complete(null); } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead."); coordinatorDead(); future.raise(Errors.forCode(errorCode)); } else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) { log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group."); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.REBALANCE_IN_PROGRESS); } else if (errorCode == Errors.ILLEGAL_GENERATION.code()) { log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group."); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.ILLEGAL_GENERATION); } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) { log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group."); memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { future.raise(new GroupAuthorizationException(groupId)); } else { future.raise(new KafkaException("Unexpected errorCode in heartbeat response: " + Errors.forCode(errorCode).exception().getMessage())); } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
關鍵點:這里所謂的觸發,其實就是把rejoinNeeded置為了true。然后在下一次poll循環的時候,檢測到rejoinNeeded為true,就會重走上面的步驟2 + 步驟3
failover
對於這整個體系來說,consumer可能掛掉,coordinator也可能掛掉。因此雙方需要互相檢測,對方是否掛了。
檢測方法同樣是上面的heartbeat:當consumer發現heartbeat返回超時,或者coordinator很久沒有收到heartbeat,就會認為對方掛了。
當然,這會有“誤操作”,比如consumer處理消息很慢(因為是單線程),導致下1次heartbeat遲遲沒有發出去。這個時候coordinator就會認為該consumer掛了,會主動斷開連接。從而觸發1次rebalance。
consumer認為coordinator掛了
就會從上面的步驟1開始,重新discovery coordinator,然后JoinGroup + SyncGroup
coordinator認為consumer掛了
從上面的步驟2開始,通知其他所有剩下的consumers,進行JoinGroup + SyncGroup