1.HighLevelApi
High Level Api是多線程的應用程序,以Topic的Partition數量為中心。消費的規則如下:
- 一個partition只能被同一個ConsumersGroup的一個線程所消費.
- 線程數小於partition數,某些線程會消費多個partition.
- 線程數等於partition數,一個線程正好消費一個線程.
- 當添加消費者線程時,會觸發rebalance,partition的分配發送變化.
- 同一個partition的offset保證消費有序,不同的partition消費不保證順序.
關於與ZK的幾個參數意思解釋
- zookeeper.connect: ZK連接。
- group.id: Consumer消費ID。
- zookeeper.session.timeout.ms: kafka節點與ZK會話的超時時間。
- zookeeper.sync.time.ms: zk的follower與leader的同步時間間隔。
- auto.commit.interval.ms: Consumer offset自動提交給Zookeeper的時間。
Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.(由於記錄Offset是基於時間的,所以當Consumer發生錯誤的時候,有可能會收到重復的消息。)
消費者的代碼
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(threads));
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(threads);
for (KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new KafkaConsumerWorker(stream));
}
2.ZookeeperConsumerConnector
一個Consumer會創建一個ZookeeperConsumerConnector,代表一個消費者進程.
- fetcher: 消費者獲取數據, 使用ConsumerFetcherManager fetcher線程抓取數據
- zkUtils: 消費者要和ZK通信, 除了注冊自己,還有其他信息也會寫到ZK中
- topicThreadIdAndQueues: 消費者會指定自己消費哪些topic,並指定線程數, 所以topicThreadId都對應一個隊列
- messageStreamCreated: 消費者會創建消息流, 每個隊列都對應一個消息流
- offsetsChannel: offset可以存儲在ZK或者kafka中,如果存在kafka里,像其他請求一樣,需要和Broker通信
- 還有其他幾個Listener監聽器,分別用於topicPartition的更新,負載均衡,消費者重新負載等
當Broker掛掉的時候,在這個Broker上的所有Partition都丟失了,而Partition是給消費者服務的.
所以Broker掛掉后在做遷移的時候,會將其上的Partition轉移到其他Broker上,因此消費者要消費的Partition也跟着變化.
2.1 init
在創建ZookeeperConsumerConnector時,有幾個初始化方法需要事先執行.
- 消費者要和ZK通信,所以connectZk會確保連接上ZooKeeper
- 消費者要消費數據,需要有抓取線程,所有的抓取線程交給ConsumerFetcherManager統一管理
- 由消費者客戶端自己保存offset,而消費者會消費多個topic的多個partition.
- 多個partition的offset管理類OffsetManager是一個GroupCoordinator
- 定時提交線程會使用OffsetManager建立的通道定時提交offset到zk或者kafka.
2.2 createMessageStreams
ConsumerConnector創建消息流,需要指定解碼器,因為要將日志反序列化(生產者寫消息時對消息序列化到日志文件).
在kafka的運行過程中,會有其他的線程將數據放入partition對應的queue中. 而queue是用於KafkaStream的.
一旦數據添加到queue后,KafkaStream的阻塞隊列就有數據了,消費者就可以從隊列中消費消息.
createMessageStreams
: 返回KafkaStream, 每個Topic都對應了多個KafkaStream. 數量和topicCount中的count一樣.
例子解釋
假設消費者C1聲明了topic1:2, topic2:3. topicThreadIds=consumerThreadIdsPerTopicMap.
topicThreadIds.values = [ (C1_1,C1_2), (C1_1,C1_2,C1_3)]一共有5個線程,queuesAndStreams也有5個元素.
consumerThreadIdsPerTopicMap = {
topic1: [C1_1, C1_2],
topic2: [C1_1, C1_2, C1_3]
}
topicThreadIds.values = [
[C1_1, C1_2],
[C1_1, C1_2, C1_3]
]
threadIdSet循環[C1_1, C1_2]時, 生成兩個queue->stream pair.
threadIdSet循環[C1_1, C1_2, C1_3]時, 生成三個queue->stream pair.
queuesAndStreams = [
(LinkedBlockingQueue_1,KafkaStream_1), //topic1:C1_1
(LinkedBlockingQueue_2,KafkaStream_2), //topic1:C1_2
(LinkedBlockingQueue_3,KafkaStream_3), //topic2:C1_1
(LinkedBlockingQueue_4,KafkaStream_4), //topic2:C1_2
(LinkedBlockingQueue_5,KafkaStream_5), //topic2:C1_3
]
- 客戶端關注的是我的每個線程都對應了一個隊列,每個隊列都是一個消息流就可以了.
- 客戶端的每個線程實際上是針對Partition級別的,一個線程對應一個或多個partition。
2.3 registerConsumerInZK
消費者需要向ZK注冊一個臨時節點,路徑為:/consumers/[group_id]/ids/[consumer_id],內容為訂閱的topic.
問題:什么時候這個節點會被刪除掉呢? Consumer進程掛掉時,或者Session失效時刪除臨時節點. 重連時會重新創建.
由於是臨時節點,一旦創建節點的這個進程掛掉了,臨時節點就會自動被刪除掉. 這是由zk機制決定的,不是由消費者完成的.
2.4 reinitializeConsumer listener
當前Consumer在ZK注冊之后,需要重新初始化Consumer.對於全新的消費者,注冊多個監聽器,在zk的對應節點的注冊事件發生時,會回調監聽器的方法.
- 將topic對應的消費者線程id及對應的LinkedBlockingQueue放入topicThreadIdAndQueues中,LinkedBlockingQueue是真正存放數據的queue
- 注冊sessionExpirationListener,監聽狀態變化事件.在session失效重新創建session時調用
- 向/consumers/[group_id]/ids注冊Child變更事件的loadBalancerListener,當消費組下的消費者發生變化時調用
- 向/brokers/topics/[topic]注冊Data變更事件的topicPartitionChangeListener,在topic數據發生變化時調用
- 顯式調用loadBalancerListener.syncedRebalance(), 會調用reblance方法進行consumer的初始化工作
private def reinitializeConsumer[K,V](topicCount: TopicCount,
queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
val dirs = new ZKGroupDirs(config.groupId)
// ② listener to consumer and partition changes
if (loadBalancerListener == null) {
val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString,
topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
}
// ① create listener for session expired event if not exist yet
if (sessionExpirationListener == null) sessionExpirationListener =
new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener)
// ③ create listener for topic partition change event if not exist yet
if (topicPartitionChangeListener == null)
topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
// listener to consumer and partition changes
zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
// register on broker partition path changes.
topicStreamsMap.foreach { topicAndStreams =>
zkUtils.zkClient.subscribeDataChanges(BrokerTopicsPath+"/"+topicAndStreams._1, topicPartitionChangeListener)
}
// explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance()
}
ZKRebalancerListener傳入ZKSessionExpireListener和ZKTopicPartitionChangeListener.它們都會使用ZKRebalancerListener完成自己的工作.
2.5 ZKSessionExpireListener
當Session失效時,新的會話建立時,立即進行rebalance操作.
2.6 ZKTopicPartitionChangeListener
當topic的數據變化時,通過觸發的方式啟動rebalance操作.
2.7 ZKRebalancerListener watcher
class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
extends IZkChildListener {
private var isWatcherTriggered = false
private val lock = new ReentrantLock
private val cond = lock.newCondition()
private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
override def run() {
var doRebalance = false
while (!isShuttingDown.get) {
lock.lock()
try {
// 如果isWatcherTriggered=false,則不會觸發syncedRebalance. 等待1秒后,繼續判斷
if (!isWatcherTriggered)
cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
} finally {
// 不管isWatcherTriggered值是多少,在每次循環時,都會執行. 如果isWatcherTriggered=true,則會執行syncedRebalance
doRebalance = isWatcherTriggered
// 重新設置isWatcherTriggered=false, 因為其他線程觸發一次后就失效了,想要再次觸發,必須再次設置isWatcherTriggered=true
isWatcherTriggered = false
lock.unlock()
}
if (doRebalance) syncedRebalance // 只有每次rebalanceEventTriggered時,才會調用一次syncedRebalance
}
}
}
watcherExecutorThread.start()
// 觸發rebalance開始進行, 修改isWatcherTriggered標志位,觸發cond條件運行
def rebalanceEventTriggered() {
inLock(lock) {
isWatcherTriggered = true
cond.signalAll()
}
}
3.ZKRebalancerListener rebalance
因為消費者加入/退出時,消費組的成員會發生變化,而消費組中的所有存活消費者負責消費可用的partitions.
可用的partitions或者消費組中的消費者成員一旦發生變化,都要重新分配partition給存活的消費者.下面是一個示例.
當然分配partition的工作絕不僅僅是這么簡單的,還要處理與之相關的線程,並重建必要的數據:
- 關閉數據抓取線程,獲取之前為topic設置的存放數據的queue並清空該queue
- 釋放partition的ownership,刪除partition和consumer的對應關系
- 為各個partition重新分配threadid
獲取partition最新的offset並重新初始化新的PartitionTopicInfo(queue存放數據,兩個offset為partition最新的offset) - 重新將partition對應的新的consumer信息寫入zookeeper
- 重新創建partition的fetcher線程
private def rebalance(cluster: Cluster): Boolean = {
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString,
zkUtils, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
val brokers = zkUtils.getAllBrokersInCluster()
if (brokers.size == 0) {
zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
true
} else {
// ① 停止fetcher線程防止數據重復.如果當前調整失敗了,被釋放的partitions可能被其他消費者擁有.
// 而沒有先停止fetcher的話,原先的消費者仍然會和新的擁有者共同消費同一份數據.
closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
// ② 釋放topicRegistry中topic-partition的owner
releasePartitionOwnership(topicRegistry)
// ③ 為partition重新分配消費者....
// ④ 為partition添加consumer owner
if(reflectPartitionOwnershipDecision(partitionAssignment)) {
allTopicsOwnedPartitionsCount = partitionAssignment.size
topicRegistry = currentTopicRegistry
// ⑤ 創建拉取線程
updateFetcher(cluster)
true
}
}
}
rebalance操作涉及了以下內容:
- PartitionOwnership: Partition的所有者(ownership)的刪除和重建
- AssignmentContext: 分配信息上下文
- PartitionAssignor: 為Partition分配Consumer的算法
- PartitionAssignment: Partition分配之后的上下文
- PartitionTopicInfo: Partition的最終信息
- Fetcher: 完成了rebalance,消費者就可以重新開始抓取數據
3.1 核心:PartitionAssignor
將可用的partitions以及消費者線程排序, 將partitions處於線程數,表示每個線程(不是消費者數量)平均可以分到幾個partition.
如果除不盡,剩余的會分給前面幾個消費者線程. 比如有兩個消費者,每個都是兩個線程,一共有5個可用的partitions: (p0-p4).
每個消費者線程(一共四個線程)可以獲取到至少一共partition(5/4=1),剩余一個(5%4=1)partition分給第一個線程.
最后的分配結果為: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1
關閉Fetcher時要注意:
- 先提交offset,然后才停止消費者. 因為在停止消費者的時候當前的數據塊中還會有點殘留數據.
- 因為這時候還沒有釋放partiton的ownership(即partition還歸當前consumer所有),強制提交offset,
這樣擁有這個partition的下一個消費者線程(rebalance后),就可以使用已經提交的offset了,確保不中斷. - 因為fetcher線程已經關閉了(stopConnections),這是消費者能得到的最后一個數據塊,以后不會有了,直到平衡結束,fetcher重新開始
-
topicThreadIdAndQueues來自於topicThreadIds,所以它的topic應該都在relevantTopicThreadIdsMap的topics中.
為什么還要過濾呢? 注釋中說到在本次平衡之后,只需要清理可能不再屬於這個消費者的隊列(部分的topicPartition抓取隊列). -
問題:新創建的ZKRebalancerListener中kafkaMessageAndMetadataStreams(即這里的messageStreams)為空的Map.
如何清空里面的數據? 實際上KafkaStream只是一個迭代器,在運行過程中會有數據放入到這個流中,這樣流就有數據了.
4.ConsumerFetcherManager
Fetcher線程要抓取數據關心的是PartitionTopicInfo,首先要找出Partition Leader(因為只向Leader Partition發起抓取請求).
初始時假設所有topicInfos(PartitionTopicInfo)都找不到Leader,即同時加入partitionMap和noLeaderPartitionSet.
在LeaderFinderThread線程中如果找到Leader,則從noLeaderPartitionSet中移除.
ConsumerFetcherManager管理了當前Consumer的所有Fetcher線程.
5.小結
high level的Consumer Rebalance的控制策略是由每一個Consumer通過在Zookeeper上注冊Watch完成的。
每個Consumer被創建時會觸發Consumer Group的Rebalance,具體的啟動流程是:
- (High Level)Consumer啟動時將其ID注冊到其Consumer Group下 (registerConsumerInZK)
- 在/consumers/[group_id]/ids上和/brokers/ids上分別注冊Watch (reinitializeConsumer->Listener)
- 強制自己在其Consumer Group內啟動Rebalance流程 (ZKRebalancerListener.rebalance)
在這種策略下,每一個Consumer或者Broker的增加或者減少都會觸發Consumer Rebalance。
因為每個Consumer只負責調整自己所消費的Partition,為了保證整個Consumer Group的一致性,
當一個Consumer觸發了Rebalance時,該Consumer Group內的其它所有其它Consumer也應該同時觸發Rebalance。
該方式有如下缺陷:
- Herd effect(羊群效應): 任何Broker或者Consumer的增減都會觸發所有的Consumer的Rebalance
- Split Brain(腦裂): 每個Consumer分別單獨通過Zookeeper判斷哪些Broker和Consumer 宕機了,
那么不同Consumer在同一時刻從Zookeeper“看”到的View就可能不一樣,這是由Zookeeper的特性決定的,這就會造成不正確的Reblance嘗試。 - 調整結果不可控: 所有的Consumer都並不知道其它Consumer的Rebalance是否成功,這可能會導致Kafka工作在一個不正確的狀態。
根據Kafka官方文檔,Kafka作者正在考慮在還未發布的0.9.x版本中使用中心協調器(coordinator)。大體思想是選舉出一個broker作為coordinator,由它watch Zookeeper,從而判斷是否有partition或者consumer的增減,然后生成rebalance命令,並檢查是否這些rebalance在所有相關的consumer中被執行成功,如果不成功則重試,若成功則認為此次rebalance成功.