Kafka是什么?
Kafka是一個分布式,有分區的,有副本的日志服務系統,由LinkedIn公司開發,並於2011年開源。從本質上來說,Kafka擁有一套可擴展的發布/訂閱消息隊列架構,並組成了一套分布式的日志系統,這套系統的創建,是為任何一家大公司搭建一套可處理實時數據的統一平台。
和許多其他消息隊列系統相比(RabbitMQ,ActiveMQ,Redis),Kafka有一些主要的區別:
- 如上面提到的,Kafka底層是一個多副本的日志系統
- Kafka並不使用AMQP或其他已經存在的通信協議,而是使用他自己特有的基於TCP的二進制格式的協議
- Kafka很快,即便只使用很小的集群
- Kafka有強大的機制保證消息語義的順序和持久化
不考慮1.0之前的版本(當前版本是0.9.0.1),kafka已經是產品化了,且被大量著名的企業在使用,包括LinkedIn, Yahoo, Netflix, 和Datadog。
整體架構
在深入kafka之前,我們有必要了解一下kafka的整體架構,每一個版本都有以下這些組件組成,如圖所示:

生產者(producer)把消息發布到topic上,然后消費者(consumer)把消息再從topic上消費出來,生產者和消費者采取了一種推/拉(push/pull)的模式,生產者講消息推到topic上,而消費者把消息從topic上拉下來。Broker作為kafka集群的節點,扮演着一個中間協調者的角色,它存儲着生產者發布上去的消息,並讓消費者按照自己的速率來拉取。這意味着,broker是無狀態的,他們不用跟蹤消費的狀態,然后會根據配置的保留策略(retention policy)來刪除消息。
消息本身是由最原始的二進制字節數組(bytes)組成的,包含了topic和partition的信息。kafka將消息以topic為單位區分,消費者只訂閱他們需要的topic。kafka中的消息是按照時間戳排序且不可變,對消費者而言,只有讀這一種操作。

topic被切分成多個分組(paritition),而partition又被分配到多個broker上面,這樣,topic就可以按照broker進行數據分片。如果分區數越大,那對於同一個topic而言,就可以同時支持多個消費者的消費。
當第一次設置kafka的時候,需要同時關注兩點:給每一個topic分配足夠的parition和將parition平均第分配到各個broker上。如果在第一次部署kafka的時候就這樣做,可以減輕由於數據量增長帶來的痛苦。如果想了解更多關於設置合理的topic數和parition數,可以閱讀kafka作者Jun Rao的這篇優秀的文章。
Kafka具有副本的功能,不同的broker上保存了每個parition的不同副本,具體存在幾台broker上,是由配置的副本因子所決定的。盡管有大量副本的存在,但kafka只會在最初把數據寫入partition的leader(一個leader多個follower),leader是隨機的在ISR(in-sync replicas)池(所有處於同步狀態的partition副本)中選舉出來的。另外,消費者只會讀取partition leader,這樣follower副本將作為備份存在,以保證kafka的高可用性,從而防某個broker掛掉。
還有很重要的一點,沒有任何一個kafka版本是完全脫離zookeeper的,zookeeper就像膠水一樣把kafka的所有組件粘連在一起,他的職責是:
- 選舉controller(其中某一台kafka broker,用來管理所有的partition leader)
- 記錄集群的成員
- topic配置信息
- 磁盤分配(0.9+)
- 安全認證管理 (0.9+)
- 消費組成員管理(從0.9+以后刪除)
主要度量指標
一個功能健全的kafka集群可以處理相當大的數據量,由於消息系統是很多大型應用的基石,因此broker集群在性能上的缺陷,都會引起整個應用棧的各種問題。
kafka的度量指標主要有以下三類:
- kafka服務器(broker)指標
- 生產者指標
- 消費者指標
另外,由於kafka的狀態靠zookeeper來維護,對於zookeeper性能的監控也成為了整個kafka監控計划中一個必不可少的組成部分。了解更多請看后續的系列文章。
下面分別介紹上面提到的這三個方面的度量指標。
Broker度量指標
kafka的服務端度量指標是為了監控broker,也是整個消息系統的核心。因為所有消息都通過kafka broker傳遞,然后被消費,所以對於broker集群上出現的問題的監控和告警就尤為重要。broker性能指標有以下三類
- kafka本身的指標
- 主機層面的指標
- JVM垃圾回收指標
kafka本身的指標
| Name | MBean Name | Description | Metric Type |
|---|---|---|---|
| UnderReplicatedPartitions | kafka.server:type=ReplicaManager, name=UnderReplicatedPartitions | Number of unreplicated partitions | Resource: Availability |
| IsrShrinksPerSec IsrExpandsPerSec | kafka.server:type=ReplicaManager, name=IsrShrinksPerSec kafka.server:type=ReplicaManager,name=IsrExpandsPerSec | Rate at which the pool of in-sync replicas (ISRs) shrinks/expands | Resource: Availability |
| ActiveControllerCount | kafka.controller:type=KafkaController, name=ActiveControllerCount | Number of active controllers in cluster | Resource: Error |
| OfflinePartitionsCount | kafka.controller:type=KafkaController, name=OfflinePartitionsCount | Number of offline partitions | Resource: Availability |
| LeaderElectionRateAndTimeMs | kafka.controller:type=ControllerStats, name=LeaderElectionRateAndTimeMs | Leader election rate and latency | Other |
| UncleanLeaderElectionsPerSec | kafka.controller:type=ControllerStats, name=UncleanLeaderElectionsPerSec | Number of "unclean" elections per second | Resource: Error |
| TotalTimeMs | kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce-FetchConsumer-FetchFollower} | Total time (in ms) to serve the specified request (Produce/Fetch) | Work: Performance |
| PurgatorySize | kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize kafka.server:type=FetchRequestPurgatory,name=PurgatorySize | Number of requests waiting in producer purgatory ; Number of requests waiting in fetch purgatory | Other |
| BytesInPerSec BytesOutPerSec | kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec | Aggregate incoming/outgoing byte rate | Work: Throughput |
UnderReplicatedPartitions: 在一個運行健康的集群中,處於同步狀態的副本數(ISR)應該與總副本數(簡稱AR:Assigned Repllicas)完全相等,如果分區的副本遠遠落后於leader,那這個follower將被ISR池刪除,隨之而來的是IsrShrinksPerSec(可理解為isr的縮水情況,后面會講)的增加。由於kafka的高可用性必須通過副本來滿足,所有有必要重點關注這個指標,讓它長期處於大於0的狀態。
IsrShrinksPerSec/IsrExpandsPerSec: 任意一個分區的處於同步狀態的副本數(ISR)應該保持穩定,只有一種例外,就是當你擴展broker節點或者刪除某個partition的時候。為了保證高可用性,健康的kafka集群必須要保證最小ISR數,以防在某個partiton的leader掛掉時它的follower可以接管。一個副本從ISR池中移走有以下一些原因:follower的offset遠遠落后於leader(改變replica.lag.max.messages 配置項),或者某個follower已經與leader失去聯系了某一段時間(改變replica.socket.timeout.ms 配置項),不管是什么原因,如果IsrShrinksPerSec(ISR縮水) 增加了,但並沒有隨之而來的IsrExpandsPerSec(ISR擴展)的增加,就將引起重視並人工介入,kafka官方文檔提供了大量關於broker的用戶可配置參數。
ActiveControllerCount: kafka集群中第一個啟動的節點自動成為了controller,有且只能有一個這樣的節點。controller的職責是維護partitio leader的列表,和協調leader的變更(當遇到某個partiton leader不可用時)。如果有必要更換controller,一個新的controller將會被zookeeper從broker池中隨機的選取出來,通常來說,這個值(ActiveControllerCount)不可能大於1,但是當遇到這個值等於0且持續了一小段時間(<1秒)的時候,必須發出明確的告警。
OfflinePartitionsCount (只有controller有): 這個指標報告了沒有活躍leader的partition數,由於所有的讀寫操作都只在partition leader上進行,因此非0的這個值需要被告警出來,從而防止服務中斷。任何沒有活躍leader的partition都會徹底不可用,且該parition上的消費者和生產者都將被阻塞,直到leader變成可用。
LeaderElectionRateAndTimeMs: 當parition leader掛了以后,新leader的選舉就被觸發。當partition leader與zookeeper失去連接以后,它就被人為是“死了”,不像zookeeper zab,kafka沒有專門對leader選舉采用majority-consensus算法。是kafka的broker集群所有的機器列表,是由每一個parition的ISR所包含的機器這個子集,加起來的並集組成的,怎么說,假設一共有3個parition,第一個parition的ISR包含broker1、2、3,第二個parition包含broker2、3、4,第三個parition包含broker3、4、5,那么這三個parition的ISR所在broker節點加起來的並集就是整個kafka集群的所有broker全集1、2、3、4、5。當副本可以被leader捕獲到的時候,我們就人為它處於同步狀態(in-sync),這意味着任何在ISR池中的節點,都有可能被選舉為leader。
LeaderElectionRateAndTimeMs 報告了兩點:leader選舉的頻率(每秒鍾多少次)和集群中無leader狀態的時長(以毫秒為單位),盡管不像UncleanLeaderElectionsPerSec這個指標那么嚴重,但你也需要時長關注它,就像上文提到的,leader選舉是在與當前leader通信失敗時才會觸發的,所以這種情況可以理解為存在一個掛掉的broker。

UncleanLeaderElectionsPerSec: 這個指標如果存在的話很糟糕,這說明kafka集群在尋找partition leader節點上出現了故障,通常,如果某個作為partition leader的broker掛了以后,一個新的leader會被從ISR集合中選舉出來,不干凈的leader選舉(Unclean leader elections )是一種特殊的情況,這種情況是副本池中沒有存活的副本。基於每個topic必須擁有一個leader,而如果首領是從處於不同步狀態的副本中選舉出來的話,意味着那些與之前的leader沒有被同步的消息,將會永久性丟失。事實上,不干凈的leader選舉將犧牲持久性(consistency)來保證可用性(availability)。所以,我們必須明確地得到這個指標的告警,從而告知數據的丟失。
TotalTimeMs: The TotalTimeMs metric family measures the total time taken to service a request (be it a produce, fetch-consumer, or fetch-follower request):
這個指標族(很多地方都涉及到它)衡量了各種服務請求的時間(包括produce,fetch-consumer,fetch-follower)
-
produce:從producer發起請求發送數據
-
fetch-consumer: 從consumer發起請求獲取數據
-
fetch-follower:follower節點向leader節點發起請求,同步數據
TotalTimeMs 這個指標是由4個其他指標的總和構成的:
-
queue:處於請求隊列中的等待時間
-
local:leader節點處理的時間
-
remote:等待follower節點響應的時間(只有當
requests.required.acks=-1時) -
response:發送響應的時間
通常情況下,這個指標值是比較穩定的,只有很小的波動。當你看到不規則的數據波動,你必須檢查每一個queue,local,remote和response的值,從而定位處造成延遲的原因到底處於哪個segment。

PurgatorySize: 請求煉獄(request purgatory)作為一個臨時存放的區域,使得生產(produce)和消費(fetch)的請求在那里等待直到被需要的時候。每個類型的請求都有各自的參數配置,從而決定是否(將消息)添加到煉獄中:
-
fetch:當
fetch.wait.max.ms定義的時間已到,還沒有足夠的數據來填充(congsumer的fetch.min.bytes)請求的時候,獲取消息的請求就會被扔到煉獄中。 -
produce:當
request.required.acks=-1,所有的生產請求都會被暫時放到煉獄中,直到partition leader收到follower的確認消息。
關注煉獄的大小有助於判斷導致延遲的原因是什么,比如說,導致fetch時間的增加,很顯然可以認為是由於煉獄中fetch的請求增加了。
BytesInPerSec/BytesOutPerSec: 通常,磁盤的吞吐量往往是決定kafka性能的瓶頸,但也不是說網絡就不會成為瓶頸。根據你實際的使用場景,硬件和配置,網絡將很快會成為消息傳輸過程中最慢的一個環節,尤其是當你的消息跨數據中心傳輸的時候。跟蹤節點之間的網絡吞吐量,可以幫助你找到潛在的瓶頸在哪里,而且可以幫助決策是否需要把端到端的消息做壓縮處理。
主機層面的broker性能指標
| Name | Description | Metric Type |
|---|---|---|
| Page cache reads ratio | Ratio of reads from page cache vs reads from disk | Resource: Saturation |
| Disk usage | Disk space currently consumed vs available | Resource: Utilization |
| CPU usage | CPU use | Resource: Utilization |
| Network bytes sent/received | Network traffic in/out | Resource: Utilization |
Page cache read ratio: kafka在設計最初的時候,通過內核中的頁緩存,來達到溝通可靠性(基於磁盤)和高效性(基於內存)之間的橋梁。page cache read ratio(可理解為頁緩存讀取率),和數據庫中的cache-hit ratio(緩存命中率)比較相似,如果這個值比較大,則等價於更快的讀取速度,從而有更好的性能。如果發現頁緩存讀取率<80%,則說明需要增加broker了。

Disk usage: 由於kafka將所有數據持久化到磁盤上,很有必要監控一下kafka的剩余磁盤空間。當磁盤占滿時,kafka會失敗,所以,隨着時間的推移,跟蹤磁盤的增長率是很有必要的。一旦你了解了磁盤的增長速率,你就可以在磁盤將要占滿之前選擇一個合適的時間通知管理員。

CPU usage: 盡管kafka主要的瓶頸通常是內存,但並不妨礙觀察一下cpu的使用率。雖然即便在使用gzip壓縮的場景下,cpu都不太可能對性能產生影響,但是,如果發現cpu使用率突然增高,那肯定要引起重視了。
Network bytes sent/received: 如果你只是在監控kafka的網絡in/out指標,那你只會了解到跟kafka相關的信息。如果要全面了解主機的網絡使用情況,你必須監控主機層面的網絡吞吐量,尤其是當你的kafka主機還承載了其他與網絡有關的服務。高網絡使用率是性能下降的一種表現,此時需要聯系TCP重傳和丟包錯誤,來決定性能的問題是否是網絡相關的。
JVM垃圾回收指標
由於kafka是由scala編寫的,且運行在java虛擬機上,需要依賴java的垃圾回收機制來釋放內存,如果kafka集群越活躍,則垃圾回收的頻率也就越高。

只要對java有些了解的人都應該知道垃圾回收會產生很大的性能開銷,(垃圾回收造成的)暫停對kafka最大的影響就是會造成大量廢棄的zookeeper session(因為session超時了)。
垃圾回收的類型是基於回收的是年輕代(新的對象)還是老年代(長期存活的對象),初學者可以看這篇關於垃圾回收的文章:
當發現垃圾回收造成了過度的暫停,你可以考慮升級JDK版本或者垃圾回收器(或者增加zookeeper.session.timeout.ms來防止time out)。另外,可以調節java runtime參數來最小化垃圾回收。LInkedin的工程師寫了這么一篇深入介紹優化垃圾回收的文章,供參考。當然,也可以直接參考kafka官方文檔中給出的推薦配置。
| Name | MBean Name | Description | Type |
|---|---|---|---|
| ParNew count | java.lang:type=GarbageCollector,name=ParNew | Number of young-generation collections | Other |
| ParNew time | java.lang:type=GarbageCollector,name=ParNew | Elapsed time of young-generation collections, in milliseconds | Other |
| ConcurrentMarkSweep count | java.lang:type=GarbageCollector,name=ConcurrentMarkSweep | Number of old-generation collections | Other |
| ConcurrentMarkSweep time | java.lang:type=GarbageCollector,name=ConcurrentMarkSweep | Elapsed time of old-generation collections, in milliseconds | Other |
ParNew:可以理解成年輕代,這部分的垃圾回收會相當頻繁,ParNew是一個stop-the-world的垃圾回收,意味着所有應用線程都將被暫停,知道垃圾回收完成,所以ParNew延遲的任何增加都會對kafka的性能造成嚴重影響。

ConcurrentMarkSweep (CMS) :這種垃圾回收清理了堆上的老年代不用的內存,CMS是一個短暫暫停的垃圾回收算法,盡管會造成應用線程的短暫停頓,但這只是間歇性的,如果CMS需要幾秒鍾才能完成,或者發生的頻次增加,那么集群就沒有足夠的內存來滿足基本功能。
kafka生產者指標
kafka的生產者是專門把消息推送到broker的topic上供別人消費的,如果producer失敗了,那consumer也將無法消費到新的消息,下面是生產者的幾個有用的重要監控指標,保證數據流的穩定性。

| Name | v0.8.2.x MBean Name | v0.9.0.x MBean Name | Description | Metric Type |
|---|---|---|---|---|
| Response rate | N/A | kafka.producer:type=producer-metrics,client-id=([-.w]+) | Average number of responses received per second | Work: Throughput |
| Request rate | kafka.producer:type=ProducerRequestMetrics, name=ProducerRequestRateAndTimeMs,clientId=([-.w]+) | kafka.producer:type=producer-metrics,client-id=([-.w]+) | Average number of requests sent per second | Work: Throughput |
| Request latency avg | kafka.producer:type=ProducerRequestMetrics, name=ProducerRequestRateAndTimeMs,clientId=([-.w]+) | kafka.producer:type=producer-metrics,client-id=([-.w]+) | Average request latency (in ms) | Work: Throughput |
| Outgoing byte rate | kafka.producer:type=ProducerTopicMetrics, name=BytesPerSec,clientId=([-.w]+) | kafka.producer:type=producer-metrics,client-id=([-.w]+) | Average number of outgoing/incoming bytes per second | Work: Throughput |
| IO wait time ns avg | N/A | kafka.producer:type=producer-metrics,client-id=([-.w]+) | Average length of time the I/O thread spent waiting for a socket (in ns) | Work: Throughput |
對生產者來說,響應速率表示從broker上得到響應的速率,當broker接收到producer的數據時會給出響應,根據配置,“接收到”包含三層意思:
- 消息已接收到,但並未確認(
request.required.acks == 0) - leader已經把數據寫入磁盤(
request.required.acks == 1) - leader節點已經從其他follower節點上接收到了數據已寫入磁盤的確認消息(
request.required.acks == -1)
這看上去很完美,但是對消費者而言,只有當上述的這些確認步驟都准確無誤以后,才能讀取到producer生產的數據。
如果你發現響應速率很低,那是因為在這個過程中需要牽扯太多因素,一個很簡單的辦法就是檢查broker上配置的request.required.acks參數,根據你的使用場景來選擇合適的值,到底是更看中可用性(availability)還是持久性(consistency),前者強調消費者是否能盡快讀取到可用的消息,而后者強調消息是否准確無誤地持久化寫入某個topic的某個partition的所有副本的磁盤中。

Request rate:請求的速率是指數據從producer發送到broker的速率,很顯然,請求的速率變化是否健康,也是由使用的場景所決定的。關注速率走勢的上和下,對於保證服務的可用性非常關鍵,如果不開啟速率限制(rate-limiting)(0.9+版本才有),那么當流量高峰來臨時,broker就將變得很慢,因為他要忙於處理大量涌入的數據。
Request latency average: 平均請求延遲,這是用來衡量從producer調用KafkaProducer.send()方法到接收到broker響應的時長。“接收到”包含很多層意思,可參考response rate那一塊。
有多種途徑可以減少延遲,主要的途徑是producer的linger.ms 配置項,這個配置項告訴producer,在累積夠一個消息批次之前,需要等待多久才能發送。默認地,producer只要接收到上一次發送的確認消息后,就立即發送新的消息,但並非所有場景都適用,為了累積消息而等待一點時間會提高吞吐量。
由於延遲和吞吐量有着必然的聯系,就很有必要關注batch.size這個producer配置項,從而達到更完美的吞吐量。並不是只要配置一個合適的值就可以一勞永逸了,要視情況決定如何選擇一個更優的批大小。要記住,你所配置的批大小是一個上限值,意思是說,如果數據滿了,就立即發送,但如果沒滿的話,最多只等linger.ms 毫秒,小的批量將會導致更多次數的網絡通信,然后降低吞吐量,反之亦然。

Outgoing byte rate: 在kafka的broker中,肯定需要監控producer的網絡吞吐量,隨着時間的變化觀察網絡上的數據傳輸量是很有必要的,從而決定是否有必要調整網絡設備。另外,也很有必要知道producer是否以一個恆定的速率發送數據,從而讓consumer獲取到。監控producer的網絡傳輸情況,除了可以決定是否需要調整網絡設備,也可以了解producer的生產效率,以及定位傳輸延遲的原因。
IO wait time: Producer通常做了這么一些事:等待數據和發送數據。當producer產生了超越他發送能力的數據量,那結果就是只能等待網絡資源。當如果producer沒有發送速度限制,或者盡可能增加帶寬,就很難說這(網絡延遲)是個瓶頸了。因為磁盤的讀寫速率往往是最耗時的一個環節,所以對producer而言,最好檢查一下I/O等待的時間。請記住,I/O等待表示當CPU停下來等待I/O的時間,如果你發現了過分的等待時間,這說明producer無法足夠快地獲取他需要的數據,如果你還在使用傳統的機械磁盤作為存儲,那請考慮采用SSD。
Kafka消費者指標

0.8.2.2版本
在0.8.2.2版本中,消費者指標分成兩類:簡單消費者指標和高階消費者指標。
所有簡單消費者指標都被高階消費者采納,但反過來並非如此。這兩者之間最主要的區別就是開發者對於消費者的掌控程度不同。
簡單消費者,事實上就是那些被明確地告知連接哪個broker,哪個partition。簡單消費者也可以自行管理offset和進行parition leader的選舉。盡管為了保證消費者可以真正運行起來,需要做很多工作,但簡單消費者也是相對更靈活的。
高階消費者(也被稱為消費者組)忽略了大量實施過程中的細節,那些細節包括offset的位置,broker leader,和zookeeper管理的分區可用性,消費者組只做他最擅長的事,就是消費數據。然而,其實簡單消費者更強大,高階消費者更靈活。
0.9.0.0+版本
kafka0.9.0.0版本包括了很多新特性,包括了對consumer api的很多大調整。在0.9+以上版本中,專門定義了一類消費者指標,可以通過調用新api得到,這些消費者指標把0.8.2.2中的普通消費者和高階消費者結合到了一起,而且使用了完全不同的MBean命名空間。
| Name | v0.8.2.x MBean Name | v0.9.0.x MBean Name | Description | Metric Type | v0.8.2.x Consumer Type |
|---|---|---|---|---|---|
| ConsumerLag MaxLag | broker offset - consumer offset kafka.consumer:type= ConsumerFetcherManager, name=MaxLag, clientId=([-.\w]+) | broker offset - consumer offset Attribute: records-lag-max, kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.w]+) | Number of messages consumer is behind producer / Maximum number of messages consumer is behind producer | Work: Performance | Simple Consumer |
| BytesPerSec | kafka.consumer:type= ConsumerTopicMetrics, name=BytesPerSec, clientId=([-.\w]+) | kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+) | Bytes consumed per second | Work: Throughput | Simple Consumer |
| MessagesPerSec | kafka.consumer:type= ConsumerTopicMetrics, name=MessagesPerSec, clientId=([-.\w]+) | kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+) | Messages consumed per second | Work: Throughput | Simple Consumer |
| ZooKeeperCommitsPerSec | kafka.consumer:type= ZookeeperConsumerConnector, name=ZooKeeperCommitsPerSec, clientId=([-.\w]+) | N/A | Rate of consumer offset commits to ZooKeeper | Work: Throughput | High-level Consumer |
| MinFetchRate | kafka.consumer:type= ConsumerFetcherManager, name=MinFetchRate, clientId=([-.\w]+) | Attribute: fetch-rate, kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.w]+) | Minimum rate a consumer fetches requests to the broker | Work: Throughput |
ConsumerLag/MaxLag:這是所有人都很中意的kafka指標,ConsumerLag是指consumer當前的日志偏移量相對生產者的日志偏移量,MaxLag和ConsumerLag的關系很緊密,相當於是觀察到的ConsumerLag的最大值,這兩個度量指標的重要性在於,可以決定你的消費者在做什么。如果采用一個消費者組在存儲設備上存儲大量老的消息,你就需要重點關注消費者的延遲。當然,如果你的消費者處理的是實時消息,如果lag值一直居高不下,那就說明消費者有些過載(overloaded)了,遇到這種情況,就需要采用更多的消費者,和把topic切分成多個parition,從而可以提高吞吐量和降低延遲。
注意:ConsumerLag 是kafka之中過載的表現,正如上面的定義中所描述的額一樣,但它也可被用來表示partition leader和follower之間的offset差異。

BytesPerSec:正如前文提到的生產者和broker的關系,也需要監控消費者的網絡吞吐量。比如,MessagesPerSec的突然下降可能會導致消費失敗,但如果BytesPerSec還保持不變,那如果消費少批次大體量的消息問題還不大。不斷觀察網絡的流量,就像其他度量指標中提到的一樣,診斷不正常的網絡使用情況是很重要的。
MessagesPerSec: 消息的消費速度並不完全等同於比特的消費速度,因為消息本身可能有不同大小。依賴生產者和工作負載量,在典型的部署環境中,往往希望這個值是相當穩定的。通過隨着時間的推移監控這個指標,可以觀察出消費數據的趨勢,然后定出一個基線,從而確定告警的閾值。這個曲線的走勢取決於你的使用場景,但不管怎樣,在很多情況下,定出一條基線然后對於異常情況做出告警是很有必要的。

ZooKeeperCommitsPerSec:只有0.8x版本有,如果把zookeeper作為offset的存儲(在0.8x版本中是默認的,0.9+版本必須顯式地在配置中定義offsets.storage=zookeeper),那你肯定需要監控這個值。注意到如果想要在0.9+版本中明確使用zookeeper作為offset存儲,這個指標並沒有被開放。當zookeeper處於高寫負載的時候,將會遇到成為性能瓶頸,從而導致從kafka管道抓取數據變得緩慢。隨着時間推移跟蹤這個指標,可以幫助定位到zookeeper的性能問題,如果發現有大量發往zookeeper的commit請求,你需要考慮的是,要不對zookeeper集群進行擴展,要不直接把offset的存儲變為kafka(offsets.storage=kafka)。記住,這個指標只對高階消費者有用,簡單消費者自行管理offset。

MinFetchRate: 消費者拉取的速率很好反映了消費者的整體健康狀況,如果最小拉取速率接近0的話,就可能說明消費者出現問題了,對一個健康的消費者來說,最小拉取速率通常都是非0的,所以如果發現這個值在下降,往往就是消費者失敗的標志。
為什么要用zookeeper?
zookeeper在kafka的集群中扮演了非常重要的角色,他的職責是:維護消費者的offset和topic列表,leader選舉,以及某些常用的狀態信息。在kafka0.8版本中,broker和consumer的協作都是通過zookeeper來進行的,在0.9版本中,zookeeper只是被broker用到(默認是這樣的,除非你有其他配置),這樣會大大地降低zookeeper的負載,尤其是在大集群中。

Zookeeper度量指標
可以通過MBean和命令行接口來獲取zookeeper的度量指標,
| Name | Description | Metric Type |
|---|---|---|
zk_outstanding_requests |
Number of requests queued | Resource: Saturation |
zk_avg_latency |
Amount of time it takes to respond to a client request (in ms) | Work: Throughput |
zk_num_alive_connections |
Number of clients connected to ZooKeeper | Resource: Availability |
zk_followers |
Number of active followers | Resource: Availability |
zk_pending_syncs |
Number of pending syncs from followers | Other |
zk_open_file_descriptor_count |
Number of file descriptors in use | Resource: Utilization |
Bytes sent/received:在0.8x版本中,broker和consumer都要和zookeeper通信,大規模部署的集群中,有很多consumer和partition,這種和zookeeper連續不斷地通信將會成為zookeeper的瓶頸,因為zookeeper是串行處理請求的。根據時間變化跟蹤發送和接受數據比特大小可以幫助診斷性能問題。如果zookeeper集群需要連續不斷處理大流量,那么就需要為集群提供更多節點,來適應更大數據量。
Usable memory: zookeeper需要加載大量數據到內存中,當需要page到磁盤上的時候是最痛苦的。所以,為了使zookeeper的性能更優,跟蹤內存使用率是很有必要的。記住,因為zookeeper是用來保存狀態的,所以zookeeper的性能下降將會導致整個kafka集群的性能下降。因此,所有作為zookeeper節點的主機都需要擁有較大的內存,來應對負載的高峰。
Swap usage: 如果發現內存不夠了,將會用到swap,如上文提到的那樣,這樣是不好的,所以你必須知道它。
Disk latency:盡管zookeeper主要是使用內存,但也會用到文件系統,用來有規律地對當前狀態做快照,和保存所有事務的日志。由於在update發生以后,zookeeper必須要把事務寫到非易失的存儲設備上,這是的磁盤的讀寫存在潛在瓶頸,磁盤延遲的突增,會導致所有與zookeeper通信的服務器響應變慢,所以除了把zookeeper服務器的磁盤換成SSD,還需要時刻關注磁盤延遲。
