ConsumerConfig.scala
儲存Consumer的配置
按照我的理解,0.10的Kafka沒有專門的SimpleConsumer,仍然是沿用0.8版本的。
1.從poll開始
消費的規則如下:
- 一個partition只能被同一個ConsumersGroup的一個線程所消費.
- 線程數小於partition數,某些線程會消費多個partition.
- 線程數等於partition數,一個線程正好消費一個線程.
- 當添加消費者線程時,會觸發rebalance,partition的分配發送變化.
- 同一個partition的offset保證消費有序,不同的partition消費不保證順序.
Consumers編程的用法:
private final KafkaConsumer<Long, String> consumer; // 與Kafka進行通信的consumer
...
consumer = new KafkaConsumer<Long, String>(props);
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Long, String> records = consumer.poll(512);
...
consumer,是一個純粹的單線程程序,后面所講的所有機制(包括coordinator,rebalance, heartbeat等),都是在這個單線程的poll函數里面完成的。也因此,在consumer的代碼內部,沒有鎖的出現。
1.1包括的組件
從KafkaConsumer的構造函數可以看出,KafkaConsumer有以下幾個核心部件:
- Metadata: 存儲Topic/Partion與broker的映射關系
- NetworkClient:網絡層 A network client for asynchronous request/response network i/o.
- ConsumerNetworkClient: Higher level consumer access to the network layer //對NetworkClient的封裝,非線程安全
- ConsumerCoordinator:只是client端的類,只是和服務端的GroupCoordinator通信的介質。(broker端的Coordinator 負責reblance、Offset提交、心跳)
- SubscriptionState: consumer的Topic、Partition的offset狀態維護
- Fetcher: manage the fetching process with the brokers. //獲取消息
后面會分組件講解Consumers的工作流程
1.2 Consumer消費者的工作過程:
- 在consumer啟動時或者coordinator節點故障轉移時,consumer發送ConsumerMetadataRequest給任意一個brokers。在ConsumerMetadataResponse中,它接收對應的Consumer Group所屬的Coordinator的位置信息。
- Consumer連接Coordinator節點,並發送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration錯誤碼,說明協調節點已經在初始化平衡。消費者就會停止抓取數據,提交offsets,發送JoinGroupRequest給協調節點。在JoinGroupResponse,它接收消費者應該擁有的topic-partitions列表以及當前Consumer Group的新的generation編號。這個時候Consumer Group管理已經完成,Consumer就可以開始fetch數據,並為它擁有的partitions提交offsets。
- 如果HeartbeatResponse沒有錯誤返回,Consumer會從它上次擁有的partitions列表繼續抓取數據,這個過程是不會被中斷的。
2 設計
2.0 MetaData
見Producer里面的分析。
補充一下,KafkaConsumer、KafkaProducer都是在構造函數中獲取metadata信息,通過調用metadata.update
方法來獲取信息。
2.1 coordinator 為什么,做什么
1.去zookeeper依賴 -- 為什么
- 在0.9以前的client api中,consumer是要依賴Zookeeper的。因為同一個consumer group中的所有consumer需要進行協同,這與后面要講的rebalance有關。(
ConsumerConnector、KafkaStream、ConsumerIterator
) --package kafka.consumer
- 0.9之后新的consumer不依賴與Zookeeper,一個consumerGroup內的consumer由Coordinator管理.(
KafkaConsumer
) --package org.apache.kafka.clients.consumer
為什么?后面講
提問:為什么在一個group內部,1個parition只能被1個consumer擁有?
2.coordinator協議/partition分配問題
給定一個topic,有4個partition: p0, p1, p2, p3, 一個group有3個consumer: c0, c1, c2。
- 那么,如果按
RangeAssignor
策略,分配結果是:
c0: p0, c1: p1, c2: p2, p3 - 如果按
RoundRobinAssignor
策略:
c0: p1, p3, c1: p1, c2: p2 - partition.assignment.strategy=RangeAssignor,默認值
(到底是哪種分配狀態呢)
那這整個分配過程是如何進行的呢?見下圖所示:
3步分配過程
1. 步驟1:對於每1個consumer group,Kafka集群為其從broker集群中選擇一個broker作為其coordinator。因此,第1步就是找到這個coordinator。(1個consumer group對應一個coordinattor)
GroupCoordinatorRequest: GCR,由ConsumerNetworkClient發送請求去尋找coordinator。
2. 步驟2:找到coordinator之后,發送JoinGroup請求
consumer在這里會被划分leader、follower(無責任的說:選擇第一個consumer)
- leader作用:perform the leader synchronization and send back the assignment for the group(負責發送partition分配的結果)
- follower作用:send follower's sync group with an empty assignment
3. 步驟3:JoinGroup返回之后,發送SyncGroup,得到自己所分配到的partition
SyncGroupRequest
- consumer leader發送 SyncGroupRequest給Coordinator,Coordinator回給它null
- follower發送 null的 SyncGroupRequest 給Coordinator,Coordinator回給它partition分配的結果。
注意,在上面3步中,有一個關鍵點:
- partition的分配策略和分配結果其實是由client決定的,而不是由coordinator決定的。什么意思呢?在第2步,所有consumer都往coordinator發送JoinGroup消息之后,coordinator會指定其中一個consumer作為leader,其他consumer作為follower。
- 然后由這個leader進行partition分配。
- 然后在第3步,leader通過SyncGroup消息,把分配結果發給coordinator,其他consumer也發送SyncGroup消息,獲得這個分配結果。
接下來就到Fetcher
拉取數據了
2.2 Fetcher
四個步驟
0. 步驟0:獲取consumer的offset
- 步驟1:生成FetchRequest,並放入發送隊列
- 步驟2:網絡poll
- 步驟3:獲取結果
1.獲取consumer的offset
當consumer初次啟動的時候,面臨的一個首要問題就是:從offset為多少的位置開始消費。
poll之前,給集群發送請求,讓集群告知客戶端,當前該TopicPartition的offset是多少。通過SubscriptionState
來實現, 通過ConsumerCoordinator
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
核心是:向Coordinator發了一個OffsetFetchRequest,並且是同步調用,直到獲取到初始的offset,再開始接下來的poll.(也就是說Offset的信息如果存在Kafka里,是存在GroupCoordinator里面)
consumer的每個TopicPartition都有了初始的offset,接下來就可以進行不斷循環取消息了,這也就是Fetch的過程:
2.生成FetchRequest,並放入發送隊列 -- fetcher.initFetches(cluster)
核心就是生成FetchRequest: 假設一個consumer訂閱了3個topic: t0, t1, t2,為其分配的partition分別是: t0: p0; t1: p1, p2; t2: p2
即總共4個TopicPartition,即t0p0, t0p1, t1p1, t2p2。這4個TopicPartition可能分布在2台機器n0, n1上面: n0: t0p0, t1p1 n1: t0p1, t2p2
則會分別針對每台機器生成一個FetchRequest,即Map<Node, FetchRequest>
。所以會有一個方法把所有屬於同一個Node的TopicPartition放在一起,生成一個FetchRequest。
3.網絡poll
調用ConsumerNetworkClient.poll
發送網絡請求。向服務器發 送響應請求和獲取服務器的響應。(默認值:executeDelayedTasks=true)
4.獲取結果 -- fetcher.fetchedRecords()
獲取Broker返回的Response,里面包含了List<ConsumerRecord> records
2.3 offset確認機制
- 是否自動消費確認:由參數
auto.xxx.commit=true
控制 - 手動消費:用於自定義Consumers的消費控制
下面從自動消費確認來分析,Offset自動確認是由ConsumerCoordinator
的AutoCommitTask
來實現的。
其調用在ConsumerNetworkClient
的 DelayedTaskQueue delayedTasks
里面,然后被周期性的調用。 周期性的發送確認消息,類似HeartBeat,其實現機制也就是前面所講的DelayedQueue + DelayedTask
.
確認一次:offset的提交
poll
函數中的注釋:
// execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
- 可以這樣理解:第二次poll調用的時候,提交上一次poll的offset和心跳發送。
- 先提交offset,再去拉取record。那么這次Offset其實是上一次poll的Record的offset。
- 因此,當你把按照下面的邏輯寫程序的時候,可能會導致Consumer與Coordinator的心跳超時。
while(true) {
consumer.poll();
do process message // 假如這個耗時過長,那么這個consumer就無法發送心跳給coordinator,導致它錯誤認為這個consumer失去聯系了,引起不必要的rebalance。槽糕的情況下,會丟重復消費數據。
}
因此,有必要把offset的提交單獨拿出來做一個線程。
到這里,就把整個Consumer的流程走完了。