顧名思義,就是kafka的consumer api包。
一、ConsumerConfig.scala
Kafka consumer的配置類,除了一些默認值常量及驗證參數的方法之外,就是consumer的配置參數了,比如group.id, consumer.id等,詳細列表見官網。
二、ConsumerIterator.scala
KafkaStream的迭代器類,當stream底層的阻塞隊列為空時該iterator會置於阻塞狀態。這個迭代器還提供了一個shutdownCommand對象可作為一個標識位被加入到隊列中從而觸發關閉操作。
既然是迭代器,最重要的next方法一定是要提供的。下面我們依次分析下其定義的方法:
1. next:獲取下一個元素。具體邏輯就是用父類的next方法獲取下一個MessageAndMetadata,然后再更新一下consumer的度量元統計信息
2. makeNext:核心方法,具體邏輯如下:
- 獲取當前的迭代器,如果是空,就獲取一個。具體做法就是根據超時配置以不同的方式從獲取底層的channel中讀取一個數據塊
- 如果該數據塊是關閉命令,直接返回
- 否則,獲取當前的topic信息。如果要請求的位移值比當前已消耗的位移大,那么consumer就有可能會丟失數據。
- 之后獲取一個iterator,並調用next方法獲取下一個元素,並構造新的MessageAndMetadata實例返回
三、KafkaStream.scala
定義了一個Kafka consumer stream。每個stream都支持迭代遍歷其MessageAndMetadata元素。內部維護了一個迭代器ConsumerIterator。KafkaStream定義的方法如下:
1. iterator:返回內部維護的迭代器
2. clear:在consumer重分布時清除被迭代的隊列。主要是為了減少consumer接收到重復消息
四、ConsumerConnector.scala
consumer的主接口。定義了一個trait和一個object。ConsumerConnector trait定義了一些抽象方法:
1. createMessageStreams:為每個topic創建一組KafkaStream
2. createMessageStreams (支持指定KeyDeCoder和ValueDecoder)
3. createMessageStreamsByFilter:也是為給定的所有topic創建一組KafkaStream,只不過這個方法允許傳遞一個filter,允許黑白名單過濾
4. commitOffsets:向連接此consumer connector的所有broker分區執行提交位移操作
5. shutdown:關閉connector
而Consumer object定義了兩個方法:
1. create:創建一個ConsumerConnector
2. createJavaConsumerConnector:創建一個java client使用的consumer connector
五、FetchedDataChunk.scala
表示一段獲取到的數據塊,封裝了一組保存在一個字節緩沖區的消息,分區topic信息以及獲取到的位移值
六、PartitionAssignor.scala
為一個consumer group中的consumer做分區分配的。PartitionAssignor trait定義了assign方法,返回分區到consumer線程的映射記錄。其中被分配的線程必須要屬於給定分區上下文(AssignmentContext)中的某個consumer。
說到分配上下文類——AssignmentContext,它需要接收一個consumer group、一個consumer id以及一個zkClient,並在內部維護了一個map記錄topic對應的consumer線程集合(主要由TopicCount類中的方法提供)。其定義的方法還包括:
1. partitionsForTopic:返回topic對應的分區集合
2. consumersForTopic:返回topic對應的consumers線程
3. consumers:返回consumers id的集合
PartitionAssignor object定義了一個工廠方法用於創建不同策略的分區分配器,目前Kafka支持兩種再平衡策略(也就是分區分配策略):round robin和range。值得注意的是,這里所說的分區策略其實是指指如何將分區分配給消費組內的不同consumer實例。
假設我們有一個topic:T1,T1有10個分區,分別是[P0, P9],然后我們有2個consumer,C1和C2。C1有一個線程,C2有兩個線程。
下面我們來看看默認的range策略是如何分配分區的:
1. Range策略
對於每一個topic,range策略會首先按照數字順序排序所有可用的分區,並按照字典順序列出所有的consumer線程。結合我們上面的例子,分區順序是0,1,2,3,4,5,6,7,8,9,而consumer線程的順序是c1-0, c2-0, c2-1。然后使用分區數除以線程數以確定每個線程至少獲取的分區數。在我們的例子中,10/3不能整除,余數為1,因此c1-0會被額外多分配一個分區。最后的分區分配如下:
c1-0 獲得分區 0 1 2 3
c2-0 獲得分區 4 5 6
c2-1 獲得分區 7 8 9
如果該topic是11個分區,那么分區分配如下:
c1-0 獲取分區 0 1 2 3
c2-0 獲取分區 4 5 6 7
c2-1 獲取分區 8 9 10
2. roundrobin策略——輪詢策略
如果是輪詢策略,我們上面假設的例子就不適用了,因為該策略要求訂閱某個topic的所有consumer都必須有相同數目的線程數,因此我們修改上面的例子,假設每個consumer都有2個線程。round robin策略與range的一個主要的區別就是在再分配之前你是沒法預測分配結果的——因為它會使用哈希求模的方式隨機化排序順序。
如果要采用roundrobin策略必須要先滿足兩個條件:
- 訂閱topic的consumer必須有相同數目的線程數
- consumer group內每個consumer實例都必須有相同的被訂閱topic集合
當這兩個條件滿足后,kafka會將topic-partition對根據hashcode進行隨機排序以防某個topic的所有分區都被分配給一個consumer。之后所有的topic-partition對按照輪詢的方式分配給可用的consumer線程。以我們改進過的例子來說,假設排序之后的topic-分區是這樣的:
T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6和T1-9,而consumer線程是c1-0, c1-1, c2-0, c2-1.那么最后的分區結果如下:
T1-5 去 c1-0
T1-3 去 c1-1
T1-0 去 c2-0
T1-8 去 c2-1
此時所有的consumer線程已經分配過了,但還有尚未分配的分區,這時候就從頭再次分配線程:
T1-2 去 c1-0
T1-1 去 c1-1
T1-4 去 c2-0
T1-7 去 c2-1
再次從頭開始,
T1-6 去 c1-0
T1-9 去 c1-1
此時所有的分區都已經分配過了,每個consumer線程能夠分配到幾乎相同數目的分區——這就是round robin的方式。
七、TopicCount.scala
該scala定義了很多類,我們一一分析:
1. ConsumerThreadId:封裝了consumer id和線程id。因為擴展了Ordered接口,因此支持按照字典順序排序。主要為分區策略使用。
2. TopicCount trait:提供topic分組統計的主接口,定義了三個方法:
- getConsumerThreadIdsPerTopic——返回topic及其Consumer線程id集合的映射
- getTopicCountMap——返回topic對應consumer stream數的映射
- pattern:目前有三種pattern:static、white_list和black_list。通過對黑白名單的支持,允許consumer訂閱多個topic
- makeThreadId:consumer thread的命名規則是[consumer id]-thread id
- makeConsumerThreadIdsPerTopic:為給定的一組topic創建出一組ConsumerThreadId來
- constructTopicCount:根據給定的consumer group和consumer id創建一個TopicCount。具體邏輯如下:
- 讀取/consumers/[group_id]/ids/[consumer_id]節點下的數據(JSON)
- 解析這個JSON串,提取出各個字段的值
- 如果pattern是static類型,創建一個StaticTopicCount返回;否則創建一個WildcardTopicCount返回
constructTopicCount還有另外兩個重載方法,分別創建StaticTopicCount和WildcardTopicCount
4. StaticTopicCount類:實現了TopicCount接口。其pattern類型為static
5. WildcardTopicCount類:實現了TopicCount接口。根據給定的TopicFilter來判斷pattern是white_list還是black_list
八、TopicFilter.scala
TopicFilter抽象類,用於解析topic的正則表達式,並提供一個isTopicAllowed方法用於過濾topic。它有兩個子類:Whitelist和Blacklist分別實現白名單過濾和黑名單過濾。
九、PartitionTopicInfo.scala
封裝了topic的分區信息,包括這個分區的數據塊隊列,已消費的位移、已獲取的位移以及獲取大小等信息。另外提供了一些setter和getter方法可以獲取並設置這些信息
十、ZookeeperConsumerConnector.scala
該類主要負責處理consumer與zookeeper之間的交互。
與consumer相關的zookeeper目錄結構:
1. consumer id注冊節點: /consumers/[group_id]/ids/[consumer_id] 每個consumer在consumer group內有個唯一的id號。它會將該id號以臨時節點的方式注冊到zookeeper的對應目錄中,並把它訂閱的所有topic都封裝到subscription子JSON元素中。因為是臨時節點,consumer一結束zookeeper就會刪除該節點。值得注意的是,consumer id的命名沒有采用順序節點的方式,而是從配置中選定的——主要是因為順序生成節點不利於錯誤恢復
2. broker節點注冊:/brokers/ids/[brokerId]. 每個broker節點都會被分配一個邏輯節點號,從0開始。broker啟動時會將其自身注冊到zookeeper中——即在/brokers/ids下創建一個以邏輯節點號命名的子節點。這個znode的值是一個JSON串包含以下信息:
- version:版本號,固定為1
- host:broker的IP地址或主機名
- port:broker端口
- jmx:若啟用了jmx,就是jmx的端口號,否則為-1
- timestamp:broker創建時的時間戳
3. 分區注冊信息: /consumers/[group_id]/owners/[topic]/[partitionId]。
4. consumer位移信息:/consumers/[group_id]/offsets/[topic]/[partitionId] -> 位移
這個scala定義了一組伴生對象,其中object中就只有一個變量shutdownCommand用於標識關閉標識。當在隊列中看到這個標識的時候就需要結束迭代過程。而ZookeeperConsumerConnector類是這個文件中的核心。它實現了ConsumerConnector trait,因此也就要實現該trait定義的那些抽象方法。
下面先分析一下該類定義的一些重要字段:
1. isShuttingDown:用於標識該connector的狀態是否正處理關閉狀態
2. fetcher:ConsumerFetcher管理器,用於管理fetcher線程
3. zkClient:用於連接zookeeper的客戶端
4. topicRegistry:保存topic下的分區信息
5. checkpointedZkOffsets:保存topic分區對應的位移
6. topicThreadIdAndQueues:保存topic與其消費者線程對應的阻塞隊列
7. scheduler:調度器每過auto.commit.interval.ms時間就向zookeeper提交consumer位移
8. messageStreamCreated:標識KafkaStream是否已經創建
9. sessionExpirationListener/topicPartitionChangeListener/loadBalancerListener:三個zk監控器,分別由三個嵌套類實現,后面會提及
10. offsetsChannel:用於發送OffsetFetchRequst的通道
11. wildcardTopicWatcher:ZookeeperTopicEventWatcher類實現的topic事件監聽類
12. consumerIdString:定義了如何命名consumer id的規則。如果沒有指定consumer.id了,就設置為consumer group_主機名-時間戳-(uuid的一部分)
在構造函數中,該類會首先連接zookeeper,然后創建Fetcher管理器並會以阻塞的方式確認連上副本管理器,最后如果開啟了自動提交(auto.commit.enable),那么使用調度器創建一個定時任務。
下面重點說說它提供的一些方法:
1. connectZk:連接zookeeper.connect中指定的zookeeper,就是創建zkClient
2. createFetcher:創建ConsumerFetcherManager
3. ensureOffsetManagerConnected:該方法會一直阻塞知道確認找到可用的副本管理器,其底層的IO通道也已創建。該方法只是針對使用kafka來保存consumer位移的情況——即設置offsets.storage=kafka
4. shutdown:關閉該connector,主要涉及到關閉wildcardTopicWatcher、調度器、fetcher管理器、清除所有隊列、提交位移以及關閉zookeeper客戶端和位移通道等
5. registerConsumerInZK:在zookeeper中注冊給定的consumer——即在zookeeper的/consumers/[groupId]/ids下創建一個臨時節點
6. sendShutdownToAllQueues:清除topicThreadIdAndQueues中的隊列並向所有隊列發送關閉命令
7. autocommit:自動提交位移,主要由方法commitOffsets實現
8. commitOffsetToZooKeeper:向zookeeper提交位移,就是更新指定節點的數據並將offset保存在checkpointedZKOffsets緩存中
9. commitOffsets:提交位移。在具體分析代碼之前,先來分析下屬性offsets.commit.retries——重試位移的次數。它只對關閉connector時候的位移提交有效,而不計算自動提交線程發起的提交。它也不考慮在提交前的查詢位移。比如一個consumer元數據請求基於某種原因失敗了,它會被重試但並不計入這個統計之中。commitOffsets貌似參數含義寫反了,它現在的參數名是isAutoCommit,但實際實際調用過程中,如果是自動提交反而需要指定false。
具體邏輯如下:
- 根據是否為自動提交來設定重試次數——如果是為1次即不重試;否則為offsets.commit.retries + 1
- 從topicRegistry中構建要提交的位移集合
- 如果該集合是空自然也不需要提交什么,否則判斷一下使用何種存儲來保存consumer位移
- 如果是zookeeper保存(默認情況),遍歷待提交位移集合,為每一個topic分區去zookeeper的對應節點下更新位移
- 如果是kafka來保存位移,
- 首先要創建OffsetCommitRequest請求
- 然后確保能夠連上副本管理器
- 發送OffsetCommitRequest請求並得到對應的response
- 找出response中包含的錯誤碼,如果有錯誤標記為提交位移失敗
10. fetchOffsetFromZooKeeper:從Zookeeper中獲取給定分區的位移
11. fetchOffsets:獲取一組分區的consumer位移,如果是保存在zookeeper中直接調用fetchOffsetFromZooKeeper獲取,否則具體邏輯如下:
- 創建OffsetFetchRequest
- 確保連入副本管理器並發送OffsetFetchRequest請求,獲取對應的response
- 如果leader發生了變更或位移緩存正在加載中的話則返回的response是空——以便后面重試
- 查看是否啟用了雙路位移提交(dual.commit.enable)——比如一個consumer group正在從遷移zookeeper中的位移到kafka中,如果沒有的話直接返回response,否則就從zookeeper和kafka中選取大的那個返回給response
該類還有一些很重要的方法,但我們先看一下該scala文件中嵌套定義的4個類:
1. ZKSessionExpireListener —— 監聽zookeeper會話過期的監聽器。因為事先了IZKStateListener接口,因此也必須實現handleStateChanged和handleNewSession兩個方法。
- handleStateChanged:什么都不用做,因為zookeeper客戶端會重連
- handleNewSession:zookeeper會話過期后調用該方法來創建新的會話。也就是重建臨時節點,重新注冊consumer。主要邏輯就是
- 首先清空topicRegistry分區信息緩存
- 在zookeeper中重新注冊consumer (registerConsumerInZK)
- 在consumer上重新發起負載均衡操作——通過負載均衡監聽器的syncRebalance方法。另外由於在負載均衡過程中會重新注冊子節點變更和狀態變更的監聽器,因此handleNewSession方法中就不在重訂閱它們了。
2. ZKTopicPartitionChangeListener:也是一個監聽器,用於監聽zookeeper節點數據的變更。兩個方法:
- handleDataChange: topic數據發生變更時調用該方法,應對的方法就是調用relabalanceEventTriggered通知所有監聽執行線程繼續執行
- handleDataDeleted:拋出警告表明topic數據被意外地刪除了
3. ZKRebalancerListener:監聽zookeeper子節點變更的監聽器,用於觸發consumer的負載均衡。在類的內部它會創建一個監控執行線程用於監控給定的consumer,一旦監控到要觸發rebalance就調用syncedRebalance開始執行rebalance。因為是zookeeper的子節點監聽類,它還必須實現handleChildChange,用於觸發rebalacen事件。下面一一分析其定義的方法:
- rebalanceEventTriggered —— 設置isWatcherTriggered為true並喚醒監控線程開始執行rebalance操作
- deletePartitionOwnershipFromZK —— 從zookeeper中刪除給定topic分對應的分區znode: /consumers/[groupId]/owners/[topic]/[partition],就是刪除這個consumer的注冊信息
- releasePartitionOwnership —— 通過循環調用deletePartitionOwnershipFromZK方法, 取消給定所有topic的所有分區的consumer注冊信息。並刪除對應的統計信息以及清空對應的計數器
- resetState —— 清空該consumer connector上注冊的所有topic信息
- clearFetcherQueues —— 清空fetcher相關的所有隊列以及當前正在consumer線程中遍歷的數據塊(data chunk)
- closeFetchersForQueues —— 停止所有fetcher線程並清空所有隊列避免數據重復。在清空fetcher之前先要停掉leader發現線程。之后如果啟用了自動提交位移還是需要提交位移以防止consumer從當前數據塊中再返回消息。由於分區注冊信息還在zookeeper中沒有被釋放,本次提交位移能夠保證現在提交的位移會被下一個擁有當前數據塊分區的consumer線程所使用。因為fetcher總是要關閉的並且這是consumer遍歷的最后一個數據塊,迭代器就不會再返回任何新的消息了直到rebalance成功完成且fetcher重啟之后獲取更多的數據塊
- closeFetchers —— 清空consumer"可能"不再消費的topic分區的fetcher隊列
- updateFetcher —— 更新fetcher的分區
- reflectPartitionOwnershipDecision —— 判斷consumer是否是給定topic分區的owner,即在zookeeper上創建/consumers/[groupId]/owners/[topic]/[partition],如果能創建就是owner
- addPartitionTopicInfo —— 將給定的topic分區信息加入到這個connector的緩存中
- reinitializeConsumer —— 重新初始化consumer,主要就是創建各種監聽器,更新各種緩存等
- rebalance —— 根據可用broker重新分配consumer-topic分區的對應記錄
- syncedRebalance —— 重新再平衡分配consumer-topic分區的對應記錄
4. WildcardStreamsHandler類:用於做topic的通配符過濾之用
十一、ConsumerFetcherManager.scala
consumer fetcher的管理類,其定義的startConnections和stopConnections方法會被反復地調用。該類主要定義了一個嵌套類:
LeaderFinderThread —— 顧名思義,就是leader發現者線程,當leader可用時,將fetcher添加到對應的broker上
LeaderFinderThread —— 顧名思義,就是leader發現者線程,當leader可用時,將fetcher添加到對應的broker上
十二、ConsumerFetcherThread.scala
consumer獲取線程,三個方法:
1. processPartitionData:處理獲取到的數據,主要就是將消息集合入隊列等待處理
2. handleOffsetOutOfRange:處理一個分區的位移越界的情況,主要根據auto.offset.reset屬性設定的值來指定
3. handlePartitionsWithErrors:處理沒有leader需要leader選舉的分區
十三、ConsumerTopicStats.scala
consumer的統計信息類,就不詳細說了
十四、FetchRequestAndResponseStats.scala
統計一個給定的consumer客戶端提交給所有broker的所有FetchRequest請求統計信息以及對應的response統計信息
十五、TopicEventHandler.scala
一個處理topic事件的trait,只定義了一個方法:handleTopicEvent
十六、ZookeeperTopicEventWatcher.scala
監控/brokers/topics節點下各個topic子節點的變更
十七、SimpleConsumer.scala
kafka消息的consumer。它會維護一個BlockingChannel用於收發請求/響應,因此也提供了connect和disconnect方法用於開啟和關閉底層的blockingchannel。該類的定義核心方法還包括:
1. send,也就是發送TopicMetadataRequest和ConsumerMetadataRequest
2. getOffsetsBefore:獲取給定時間之前的一組有效位移
3. commitOffsets:提交一個topic的位移。請求中如果版本是0,提交位移給zookeeper,否則提交位移給Kafka
4. fetchOffsets:獲取一個topic的位移。版本0從zookeeper中獲取,否則從kafka中獲取
5. earliestOrLatestOffset:為給定的topic分區獲取最早或最新的位移
6. fetch:從FetchRequest中獲取一個topic的一組消息