本文kafka內容來源於kafka 的apache document.
一、結構與概念解釋
1.基礎概念
topics: kafka通過topics維護各類信息。
producer:發布消息到Kafka topic的進程。
consumer:訂閱kafka topic進程和處理訂閱的消息的進程。
broker:kafka集群的每個server叫broker.
提供了語言無關、高性能、簡單的client-server的鏈接方式。
2.Topics and Logs
(1)topic是發送消息的類別名稱。每個partition是持續添加的有序的不可變的消息序列-commit log. partition內部的消息分配唯一的Id number--offset.
(2)無論是否消費,kafka在一段可配置時間內會保留所有提交的消息。如設置2天,發布后兩天內都可以消費,兩天后就會騰出空間。kafka的性能是恆定量,多保留些數據不是問題。
(3)每個消費者實際需要保留的元數據信息是消費者處理log的位置-offset. offset由consumer來控制。 通常隨着讀取messages,consumer會線性增長offset, 但實際上這個值由consumer控制,可以以喜歡的任何順序來消費,如可以設置成一個old offset,重新處理消息。
(4)這些組合,造成kafka consumer非常輕量級,基本上不會對其他consumer造成影響。(如我們可以用命令行tail等操作 察看內容,不會改變已經存在的consumer的消費行為).
這些特性好處:允許日志量超出單台服務器,每個獨立分區必須適應擁有它的server,一個topic可以有多個partition(持有任意數量的數據);他們作為並行單元執行。
3. Distribution
(1)日志分區分布在Kafka集群上,每個server處理一部分分區的數據和請求,每個partition都可以有一個配置大小的容錯server.
(2)每個Partition會有一個Leader,有0個或多個followers. leader處理這個partition的所有讀寫請求,followers被動的復制leader. leader fails,其中一個followers自動成為leader.每個server都做一部分partition的leader,其他partition的follower,所以負載在集群上是均勻的。
4.producers
producer負責選擇哪個消息給topic中的哪個partition。這可以用輪詢的方式簡單的balance load,或者基於語義partition function(如根據消息Key).
5.consumers
(1)消息傳統兩種使用模式:隊列和發布-訂閱。 隊列模式下,幾個消費者可以從服務器讀取,消息只會到一個消費者;發布訂閱--消息會廣博到所有消費者。
(2)消費者給自己一個consumer group 名稱,發布給topic每個消息傳遞到每個consumer訂閱組中的一個實例中。consumer progress可以獨立線程或者獨立機器。
如果所有的consumer instance都有相同的consumer group, 類似於傳統的queue,負載到consumers.
如果所有的consumer instance都有不同的consumer group, 類似於發布-訂閱,廣播到所有的consumers.
更常見的是有多個consumer group,每個邏輯訂閱組一個名字。每個組為了擴展性和容錯性由多個consumer instance組成。這超出了發布訂閱模式語義,每個訂閱者是consumer群,不是單個進程。
(3)kafka比常見的消息系統有更強的排序保證。
傳統隊列系統,並行consumer會讓本來有序的隊列,處理完的速度並不相同。
kafka可以保證順序。通過分配partition給consumer group中的consumer保證每個分區由consumer中的一個consumer消費。(這樣保證consumer是partiton的唯一reader,按序消費數據,因為分區多,還是能平衡負載.這不能有比分區更多的partition.)
當然Kafka只提供了分區內部的有序性,不能跨partition. 每個分區的有序性,結合按Key分partition的能力對大多應用都夠用了。全序---one topic-one partition-one consumer,失去了並行。
6. Guarantees
(1)partition會按照publish message的順序排放消息。
(2)consumer看到消息的順序與在Log存儲的順序相同。
(3)對於復制因子N,容忍N-1 server宕機不影響服務。
二、使用場景
1.messaging消息隊列
可以用來替換傳統消息代理(解耦生產者消費者,緩存未處理消息),kafka與以往的軟件比更大的吞吐量、內置的分區和分區復制機制、容錯機制。更適合做大型消息處理的解決方案。
根據經驗消息系統經常需要吞吐量不高,但是需要端到端低延遲、經常以來Kafka提供的持久化保證。(與activemq和rabbitmq對比).
2.website activity tracking 網站活動跟蹤
(1)kafka的原始用途:通過重建用戶跟蹤管道為一組實時的發布-訂閱feeds。這意味着網站活動(pv,search,用戶的其他活動)根據類型被發布到中心topics的一個topic. 這些feed廣泛用於大量用例的訂閱包括實時處理、實時監控、load to hadoop或者離線數據倉庫為離線處理和報告。
(2)活動跟蹤需要高吞吐量,許多活動消息由每個用戶pv生成。
3.Metrics 度量.
kafka經常用於運營監測數據;包括聚合統計信息從分布式應用來生成運營數據中心化。
4.log aggregation日至聚合。 分散文件放入集中位置(file server/hdfs)處理
kafka做日志聚集的替代。kafka抽出文件內容,給日志和事件數據一個清潔提取做為消息流。這允許低延遲處理和易於支持多個數據源和多個消費者消費。對比中心化聚合系統scribe/flume,kafka提供更好的性能,更強的持久化保證,由於replication,更低的端到端延遲。
5.stream processing流處理
許多用戶做階段性的數據處理(數據消耗原始數據,然后聚合、補充、或者重新轉換為Kafka topic進一步處理)。
例如:文章推薦的一個處理流可能從rss抓取文章內容 然后發布到article topic;進一步處理可能規范化或者重復數據刪除生成一個清洗過的article content;最后階段可能試圖匹配這個內容給用戶。
這從個人topic創建了實時數據流圖;storm和Samza是實現這類轉換的工具。
6.event sourcing 事件溯源。
事件溯源是一種設計模式,狀態改變被記錄為一系列記錄。Kafka支持大量的日志數據,非常適合做這種類型數據的后端。
7.Commit Log
kafka可以做為一種分布式系統外部commit-log工具。日志幫助復制節點間的數據,為失敗節點重新存儲數據的重同步機制。日志壓縮功能幫助支持這種用途,類似於apache bookkeeper項目。
三、配置與搭建。
1.單機
(1)解壓
(2)tar -xzf kafka_2.9.2-0.8.1.tgz
(3)啟動:bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
(4)建立topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181
也可以配置broker自動創建topic,當提交到不存在的topic時。
(5)發送消息
Kafka提供了命令行工具從文件獲取輸入或者標准輸入 ,作為消息發送到kafka集群,默認一行一個消息。
標准輸入: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(6)啟動consumer.
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
所有命令行工具都有使用說明,不跟參數就會顯示使用說明。
2.建立集群。對kafka來說單實例,就是集群中起了一個實例。多broker就是多啟動幾個代理就行。
(1)配置
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
config/server-1.properties: broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 port=9094
log.dir=/tmp/kafka-logs-2
broker.id是集群中每個節點的唯一和永久名稱。重載端口號和日志位置(因為我們在同一台機器上)。
(2)啟動另外兩台。
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
(3)創建一個新topic,副本設置為3.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
(4)看每個broker做什么。
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
第一行是所有Partition的總結。后面的行是每個partition一行。
Leader---node編號。Replicas:node list(所有的列表-包括不是alive)."isr" is the set of "in-sync" replicas.(alive 和追趕leader的replicate).
(5)發送新消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
(6)測試容錯性,殺掉server-1. (Leader).
ps -ef |grep server-1.properties kill.
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 Replication Factor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,0
(7)生態:
有大量的主發行版意外的與kafka整合的工具,生態系統頁列出了很多,包括:數據流系統、hadoop集成、監控、部署工具。
3.API:
(1)Producer API
class kafka.javaapi.producer.Producer<K,V> {
public Producer(ProducerConfig config);
//發送數據到一個topic,用key partition,使用同步或者異步的Producer.message是封裝了key-message數據的生產者數據對象。
public void send(KeyedMessage<K,V> message);
//發送數據到多個topic.
public void send(List<KeyedMessage<K,V>> messages);
public void close(); //關閉生產者池到所有broker的鏈接。
}
(2)high level consumer api:
class Consumer { //config最少要指定,consumer的group id和zookeeper的連接。
public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(config: ConsumerConfig);
}
public interface kafka.javaapi.consumer.ConsumerConnector {
//為每個topic創建type T創建消息流。topicCountMap是topic和#stream的map對;decoder把message轉換為T;返回topic和kafka stream的列表(列表大小是#stream,每個stream支持迭代message/offset對)。
public <K,V> Map<String, List<KafkaStream<K,V>>> createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
//用默認的decoder為每個topic生成類型T的事件流列表。
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
//為匹配通配符的topics,生成消息流。numStreams返回流的數量。
public <K,V> List<KafkaStream<K,V>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
//用默認decoder.
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
//返回一個stream.
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
//Commit the offsets of all topic/partitions connected by this connector
public void commitOffsets();
//關閉連接。
public void shutdown();}
(3)class kafka.javaapi.consumer.SimpleConsumer {
//從topic獲取消息集合
public FetchResponse fetch(request: kafka.javaapi.FetchRequest);
//獲取topic的metadata. request指定versionid/clientid/seqence of topics.
public kafka.javaapi.TopicMetadataResponse send(request: kafka.javaapi.TopicMetadataRequest);
//返回有效的offset列表(知道maxsize),before given time.
public kafka.javaapi.OffsetResponse getOffsetsBefore(request: OffsetRequest);
public void close();}
大多數應用high level api足夠好,一些應用想要一些特性(high level consumer沒有暴露).(如重啟consumer重設定初始offset.) 這就可以用我們底層的SimpleConsumer,邏輯有點復雜。
(4)kafka hadoop consumer api.
基本用例:聚合和加載數據到hadoop.支持這個特性,提供一個hadoop-based的consumer,生成大量的map任務,並行的從kafka集群拉取數據。提供了十分快速的拉取hadoop數據的能力。(我們能填滿網絡,用很少的kafka集群)。
四、配置項多詳見http://kafka.apache.org/documentation.html#gettingStarted。
1.broker configs: 分必要的配置和topic-level配置。
(1)必要的配置:broker.id / log.dirs/zookeeper.connect.
scala class:kafka.server.KafkaConfig 類中有詳細內容。.
(2)topic-level配置。主題相關的配置既有全局的默認值,也有可選的每個主題的覆蓋。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1 (可選的覆蓋通過--config指定).
后來修改可選覆蓋。-alter.
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000
移除覆蓋
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --deleteConfig max.message.bytes
2. Consumer Configs
(1)group.id:consumer進程屬於的組字符串唯一標識,通過設置相同的id表示多個進程屬於相同consumer group.
zookeeper.connect. hostname1:port1,hostname2:port2,hostname3:port3.
consumer.id不設置自動生成。
(2)在scala的kafka.consumer.ConsumerConfig.
3. Producer Configs
(1)必要的配置:
metadata.broker.list 為了啟動,producer用它來獲取元數據(topic/partition/replica). 建立鏈接發送實際數據,建立的鏈接基於metadata返回的broker信息,The format is host1:port1,host2:port2,可以是brokers子集,也可以是vip.
request.required.acks. 控制何時認為producer請求完成,配置有多少broker 確認數據到Log。0--表示不等待確認(最低的延遲和最弱的持久化保證);1--leader接收數據后返回確認信息;-1---所有的in-sync replicas 接收。
producer.type:sync. 是否異步. async(后台線程,允許客戶端機器扔掉數據)/sync.
serializer.class:kafka.serializer.DefaultEncoder. The default encoder takes a byte[] and returns the same byte[].
(2)kafka.producer.ProducerConfig.
4.new Producer configs:未來會替換成新的配置,現在是beta版。
bootstrap.servers
acks
...
四、設計
1.Motivation
2.Persistence
(1)不要怕存儲慢。順序讀跟內存差不了多少。jmm:內存耗費大,經常是硬盤存儲的兩倍;java隨着堆數據增加,回收越來月瑣碎和緩慢。
(2)常量存取時間。B樹在硬盤的存取時間並不快(隨機取數);簡單讀取和追加可以在O(1)。 常數時間讓我們可以存放更多的數據,而不是盡快刪除,有了更多的靈活性。
3.Efficiency
低效的兩個操作:小io和字節拷貝(byte copying).
小io通過批量操作解決。
字節拷貝通過解決。高負載下這個消耗顯著,通過在producer/broker/consumer都使用標准的的二進制格式來避免這一點。broker維護的消息日志只是一些文件的目錄,每個文件由一系列消息集組成,消息集以生產者和消費者都使用的二進制格式寫到磁盤上。維護通用格式允許最大優化:持久化日志塊的網絡傳輸。
正常拷貝4步2系統調用,這種sendfile可以讓pagecache 直接到network.只需要在最后拷貝1次。
我們希望一個topic多個消費者,用上面的zero-copy優化,數據拷貝到pagecache一次,在每次消費重用。(可以以接近網絡限制的速度消耗掉)。
端到端批量壓縮:支持snappy和gzip.(通過一個集合壓縮,提高壓縮比)。
4.The Producer
(1)Load balancing負載均衡。kafka讓producer直接發送數據到broker(partiton leader),不用任何路由。讓producer這樣做,需要請求元數據信息:哪些server還存活,一個topic的哪些partition是leader。
客戶端控制將消息發送到哪個分區,可以用隨機實現(實現一種隨機的負載均衡或者語義分區),暴漏了語義分析的接口(提供key, hash 到一個partition).這可以反過來讓consumer做出本地的假設,這種風格被消費者設計成本地敏感的處理。
(2)Asynchronous send 異步發送。可以配置積累指定數量的消息或者指定的延遲。
5.The Consumer
可以指定offset,獲取之后的一段日志塊,必要時可以重新消費。
(1)push and pull. consumer-pull 還是broker -push. -------kafka遵從了從producer push,從consumer pull設計。而scribe/flume采用的是push設計。
兩種設計都有優缺點:目標都是讓消費者最大速率,而push方式可能會造成消費者不堪重負。 另一個優點:適合於消費者批處理聚合數據。(push一次一個,pull修復)
不足:沒有數據時,消費者在無用的忙碌,等待數據到來;避免這種情況,允許消費者阻塞在一個長poll等待直到數據到來。
更可靠(不需要生產者的持久性)。
(2)consume position.
追蹤哪些被消費是消息系統的關鍵點;許多消息系統擴展性差立即刪除是務實的選擇。
broker和consumer達成協議--哪些被消耗並不明顯。如果每次消費者消費掉后就刪掉數據(消費者死掉,可能會丟失數據,為解決這個問題--消息系統都加了確認機制);這種策略解決了丟數據問題,引發了新的問題:在發送確認前失敗會導致處理兩次消息;性能問題--對每個消息保留多個狀態。
kafka的處理機制不同:topic分為完全有序的partiiton集合,每個partition在一個時刻只有一個consumer消費。這意味着-consumer在每個partiiton的位置僅僅是一個單一的整數(下一個要處理的消息位置),這讓標記什么已經被消耗的狀態耗費小(每個分區一個狀態);這個狀態可以周期性的checkpoint,使得消息確認確認很便宜。
另外的好處:倒回,重新消費數據。
(3)offline dataload
擴展的持久性,提供批量加載可能性(批量到hadoop或數據倉庫)。
在hadoop應用,分散加載在map任務中,one for each node/topic/partition combination,充分並行;hadoop提供了任務管理,重跑不會重復數據,只是從原來的位置重新啟動。
6.Message Delivery Semantics
(1)多個可能的消息語義保證:至多一次(失敗不重傳);至少一次(不會丟失,可能重傳);精確一次。
值得拆成另外兩個問題:發布消息的持久化保證和消費消息的保證。
(2)持久化:kafka只要一個broker存活就可以繼續用。遇到網絡錯誤,可以類似數據的key保證,使得生產者重試是冪等的(在將來版本添加)。大多數用例不需要這么完全的保證。
(3)消費消息:consumer控制position. 如果consumer從不失敗,可以內存保存位置;如果可能失敗--幾個方案更新:
----可以在日志中保存處理的位置,然后處理消息。有可能保存了offset,但是獲取處理消息結果前進程失敗。會丟失幾條消息----at most once語義保證。
---可以讀取消息、處理消息、保存position. 處理完了沒有保存position. ----at least once.
---exactly once:不是消息系統特性,需要與實際存儲結果協調consumer的位置。經典的兩階段commit.保存結果和保存position時。可以簡單的處理將Offset保存在某個地方(因為很多系統不支持兩階段commit). 這方面的一個例子,我們etl工具在hdfs存儲offset,隨着讀取數據,offset或者都更新或者都沒有。
保證至少一次投遞:禁用生產者重試、提交offset先於處理數據。exactly-once:需要目標系統的協作;但是Kafka提供offset讓這個實現直接。
7.Replication
復制的單元是topic partition.
kafka node存活條件:必須能在zookeeper維持鏈接;如果是slave必須去復制Leader. 滿足這兩個條件叫'in sync',而不是叫'alive‘。leader跟蹤in sync node. (死/卡/跟不上,會移除這個node).只能處理那些突然失敗。
commited---所有in syc replicas都提交。只有commited log才給consumer. 生產者需要在延遲和持久性權衡---設置request.required.acks 是生產者的偏好。
(1)Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)
復制Log讓kafka專注隊列實現即可。選了確認的數量,選取Leader的比較數量-quorums.
對這種commit決定和leader選舉權衡的通用方案-使用大多數。 好處--取決於比較快的機器的反饋。(很多這類優秀算法用於zookeeper的zab,raft, viewstamped replication;kafka的實際實現是用的ms的pacificA).缺點:不用很多失敗就沒有候選人了。(一個失敗需要3個副本),這也是只有zookeeper這類軟件使用,hadoop namenode高可用並沒有采用原因。
kafka采取了略微不同的算法:kafka維護一個isr---可以跟上leader的列表。這個列表的單元才會選取leader. 這種方式可以容忍N-1的失敗。復制因子和吞吐量的權衡。
kafka重要設計區別:kafka不要求失敗的節點所有數據完整無誤的恢復。建立在穩定存儲(在失敗恢復不丟數據)假設之上。兩個主要問題:硬盤錯誤是最經常出現的問題,不會讓數據不完整;即使這沒有問題,沒有調用fsync提高性能,我們的協議讓replica追上的時候再加入isr.
(2)Unclean leader election: What if they all die? 等待一個isr副本存活;選擇第一個活過來的replica。現在版本用后面的,以后改成可配置。
(3)Replica Management 副本管理。平衡壓力。
可用性關鍵窗口--優化leader選舉。由一個broker選為controller負責選舉,批量選舉,提高速度。
8.Log Compaction 。 讓kafka總是能為每個message key保留至少最后已知值(在一個topic partition的數據日至中)。表達用例:應用壞掉后的狀態重建,應用重啟重新加載cache.
之前的保存策略---固定時間周期/到達設定的大小,適合於臨時事件數據。對一類數據---有主鍵的/可變的數據(如數據庫改變)。
123 => bill@microsoft.com 123 => bill@gatesfoundation.org
123 => bill@gmail.com
保證至少保留最近一次修改,可以重建狀態,不用保留所有變化。
(1)使用場景:
數據庫更改訂閱。變化的改變需要反應到cache中/hadoop/solr中,只需要最新日志。(但是如果重建--需要完整數據集).
事件溯源.
高可用日志.Samza.
(2)這些場景實時處理改變,但是有時候機器壞掉需要重建,重新處理,重新加載;實時處理和重建恢復兩種情況,log compaction在相同的后端topic都支持。日志的這種使用風格,見http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying。
(3)思想很簡單:保存所有變化日志,就能隨時重建;但是這個假設的完整log並不適用,即便對穩定的數據集隨着日志沒有邊界的增長更新一條記錄會有多次。更簡單的保持策略:扔掉old update縮減空間,但是日志不再是簡單的恢復當前狀態。
log compaction是一種更細的每條記錄的保存機制,而不是比較粗的基於時間的保存。思路是:有選擇的刪除記錄,我們有每個記錄更新的更新;這種方式每條記錄有最新的狀態。
retention policy可以每個topic設置;可以看到一個集群上的topic可以有基於時間保留的、基於大小保留的、基於compaction保留的。
這個架構起源於Linkedin 最早的和最成功的的底層架構-一個數據庫改變緩存服務(databus).不向其他基於Log的存儲系統--kafka專門為訂閱和快速線性讀寫組織數據。
(4)log-compaction基礎.
log compaction打刪除標記,后台自動刪除。 多次修改也在后台通過復制log進行,可以通過控制吞吐量避免影響producer和consumer.
(5) log compaction 提供的保證。
每個跟上頭部的consumer會有順序的offset.
log順序還會保證(不會重排序,只會刪除某些日志)。
offset不會變,它是log的永久標識。
任何從0offset讀的處理會看到至少所有記錄的最后狀態。【假設reader到達log頭(且在一個時間范圍內 delete.retention.ms 配置參數),所有的刪除標記會被看見;】這很重要,刪除標記不會刪除,只要reader沒讀過它。
任何consumer從日志頭處理,會看到所有記錄的最終狀態。
(6)Log Compaction Details
log compaction有 log cleaner處理,后台線程池重新復制日志線程段,removing之前已經出現的log.每個compactor進程工作原理:選擇head-tail中元素占比最多的log;為每個Key創建簡潔的摘要在日志頭;從頭到尾重新復制log(刪除后面還會出現的Log);summary of the log head僅僅是一個壓縮存儲的hash表。
(7)配置logcleaner. log.cleaner.enable=true; log.cleanup.policy=compact;
log compaction限制:不能配置還有多少日志沒有compact;還不能與壓縮的topic兼容。
五、實現
(一).API
1.producer:新producer包裝了同步和異步producer的底層。
(1)可以處理多個producer的queueing/buffering請求,batch data的異步調度,kafka.producer.Producer提供了多個異步調度能力(producer.type=async
)。(在序列化和調度之前)。 調度的規模可以配置---queue.time; batch.size
;這兩個配置到了就一起發送。kafka.producer.async.ProducerSendThread解隊列數據,kafka.producer.EventHandler發送數據到合適的broker的合適partition.
一個定制的事件處理可以通過event.handler配置。在producer pipeline中注入callback很重要(注入定制的log/跟蹤代碼;定制的監控)。 通過實現kafka.producer.async.CallbackHandler接口和配置callback.handler參數給class.
(2)處理序列化數據,通過用戶定義的encoder.
zk.connect
參數獲得。對一些應用對zookeeper的依賴不合適,這種情況下可以配置
broker.list
。這種情況下broker隨機選擇一個broker的partition,這種情況下broker down請求就失敗。
my_topic_0
and
my_topic_1
,內有很多文件包含topic的消息。log文件的格式是一系列Log entries.
並且每個額外的文件從以前文件有一個整數名大約S字節;S是配置的最大log file size.

Each consumer does the following during rebalancing:
1.基本操作
(1)Adding and removing topics

--whitelist
指定要復制的topic.(可以使正則表達式)。
--blacklist
黑名單。
2.datacenters.數據中心。
建議kafka之間復制,應用都采取本地策略。可以采取一個所有中心的聚合集群。
提高tcp socket包大小,用 socket.send.buffer.bytes和
socket.receive.buffer.bytes配置。不建議一個集群跨機房。
3.kafka配置
(1)重要的client配置。
最重要的生產者配置:
- compression 壓縮
- sync vs async production 生產者提交方式
- batch size (for async producers) 批量大小
最重要的consumer size是fetch size.
(2)A Production Server Config
Our client configuration varies a fair amount between different use cases.
4.硬件和系統os.
(1)內存:需要用大量的內存來緩存活躍的reader和writer.可以做內存的簡單估計,假設希望緩存30s計算內存需要寫吞吐量*30.
硬盤吞吐量比較重要,linkedin用8*7200磁盤,硬盤是瓶頸,越多硬盤越好。依賴你配置的刷新策略,可能不會從貴的硬盤得到好處(更頻繁flush,RPM SAS磁盤會好一些)
(2)OS:linux/unix. 系統層的配置修改。提升文件句柄數量(因為有大量的topic/連接);提升最大的socket buffer size確保數據中心間數據傳輸。
5.磁盤和文件系統
(1)用多個磁盤,不要讓Kafka和系統常用磁盤共享。0.8后可以raid,或者format和加載每個磁盤作為自己的目錄。
可以提供round-bin 數據目錄,每個partition會完整的在其中一個目錄。 (或者用raid)
(2)Application vs. OS Flush Management
建議用默認的flush策略(整個禁用application fync). 意味着使用Kafka和os的flush策略。
6.監控
(1) kafka使用 Yammer Metrics ,配置成可插入的配置連上監控系統。
Description | Mbean name | Normal value |
---|---|---|
Message in rate | "kafka.server":name="AllTopicsMessagesInPerSec",type="BrokerTopicMetrics" | |
Byte in rate | "kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics" | |
Request rate | "kafka.network":name="{Produce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics" | |
Byte out rate | "kafka.server":name="AllTopicsBytesOutPerSec",type="BrokerTopicMetrics" | |
Log flush rate and time | "kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats" | |
# of under replicated partitions (|ISR| < |all replicas|) | "kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager" | 0 |
Is controller active on broker | "kafka.controller":name="ActiveControllerCount",type="KafkaController" | only one broker in the cluster should have 1 |
Leader election rate | "kafka.controller":name="LeaderElectionRateAndTimeMs",type="ControllerStats" | non-zero when there are broker failures |
Unclean leader election rate | "kafka.controller":name="UncleanLeaderElectionsPerSec",type="ControllerStats" | 0 |
Partition counts | "kafka.server":name="PartitionCount",type="ReplicaManager" | mostly even across brokers |
Leader replica counts | "kafka.server":name="LeaderCount",type="ReplicaManager" | mostly even across brokers |
ISR shrink rate | "kafka.server":name="ISRShrinksPerSec",type="ReplicaManager" | If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0. |
ISR expansion rate | "kafka.server":name="ISRExpandsPerSec",type="ReplicaManager" | See above |
Max lag in messages btw follower and leader replicas | "kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager" | < replica.lag.max.messages |
Lag in messages per follower replica | "kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics" | < replica.lag.max.messages |
Requests waiting in the producer purgatory | "kafka.server":name="PurgatorySize",type="ProducerRequestPurgatory" | non-zero if ack=-1 is used |
Requests waiting in the fetch purgatory | "kafka.server":name="PurgatorySize",type="FetchRequestPurgatory" | size depends on fetch.wait.max.ms in the consumer |
Request total time | "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics" | broken into queue, local, remote and response send time |
Time the request waiting in the request queue | "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics" | |
Time the request being processed at the leader | "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics" | |
Time the request waits for the follower | "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics" | non-zero for produce requests when ack=-1 |
Time to send the response | "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics" | |
Number of messages the consumer lags behind the producer by | "kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager" |