一、術語
1.1 Broker
Kafka 集群包含一個或多個服務器,服務器節點稱為broker。
broker存儲topic的數據。
如果某topic有N個partition,集群有N個broker,那么每個broker存儲該topic的一個partition。
如果某topic有N個partition,集群有(N+M)個broker,那么其中有N個broker存儲該topic的一個partition,剩下的M個broker不存儲該topic的partition數據。
如果某topic有N個partition,集群中broker數目少於N個,那么一個broker存儲該topic的一個或多個partition。在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致Kafka集群數據不均衡。
1.2 Topic
每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。
1.3 Partition
topic中的數據分割為一個或多個partition。
partition中的數據是有序的,不同partition間的數據丟失了數據的順序。如果topic有多個partition,消費數據時就不能保證數據的順序。在需要嚴格保證消息的消費順序的場景下,需要將partition數目設為1。或者需要保證按業務類型順序消費的情況,則按業務類型發到同一分區,來保證對應業務類型內的順序處理。
1.4 Producer
生產者即數據的發布者,該角色將消息發布到Kafka的topic中。
生產者發送的消息,存儲到一個partition中,生產者也可以指定數據存儲的partition。
1.5 Consumer
消費者可以從broker中讀取數據。消費者可以消費多個topic中的數據。
1.6 Consumer Group
每個Consumer屬於一個特定的Consumer Group。同一組內topic的一條消息,只有一個消費者能消費到。
1.7 Leader
每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當前負責數據的讀寫的partition。
1.8 Follower
Follower跟隨Leader,所有寫請求都通過Leader路由,數據變更會廣播給所有Follower,Follower與Leader保持數據同步。
二、架構
2.1 拓撲結構
如上圖所示,一個典型的Kafka集群中包含若干Producer(可以是web前端產生的Page View,或者是服務器日志,系統CPU、Memory等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。
Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。
Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息。
2.2 Topics和Partition
Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進哪個queue里。
為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。
kafka在接收到生產者發送的消息之后,會根據均衡策略將消息存儲到不同的分區中。
因為每條消息都被append到該Partition中,屬於順序寫磁盤,因此效率非常高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。
對於傳統的message queue而言,一般會刪除已經被消費的消息,而Kafka集群會保留所有的消息,無論其被消費與否。
當然,因為磁盤限制,不可能永久保留所有數據(實際上也沒必要),因此Kafka提供兩種策略刪除舊數據。一是基於時間,二是基於Partition文件大小。例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數據,也可在Partition文件超過1GB時刪除舊數據,配置如下所示:
# The minimum age of a log file to be eligible for deletion log.retention.hours=168 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according to the retention policies log.retention.check.interval.ms=300000 # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false
因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除過期文件與提高Kafka性能無關。選擇怎樣的刪除策略只與磁盤以及具體的需求有關。
2.3 Producer消息路由
Producer發送消息到broker時,會根據Paritition機制選擇將其存儲到哪一個Partition。如果Partition機制設置合理,所有消息可以均勻分布到不同的Partition里,這樣就實現了負載均衡。
有了Partition后,不同的消息可以並行寫入不同broker的不同Partition里,極大的提高了吞吐率。
可以在$KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定新建Topic的默認Partition數量,也可在創建Topic時通過參數指定,同時也可以在Topic創建之后通過Kafka提供的工具修改。
在發送一條消息時,可以指定這條消息的key,Producer根據這個key和Partition機制來判斷應該將這條消息發送到哪個Parition。Paritition機制可以通過指定Producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。
2.4 Consumer Group
使用Consumer high level API時,同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。
這是Kafka用來實現一個Topic消息的廣播(發給所有的Consumer)和單播(發給某一個Consumer)的手段。一個Topic可以對應多個Consumer Group。如果需要實現廣播,只要每個Consumer有一個獨立的Group就可以了。要實現單播只要所有的Consumer在同一個Group里。
Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時將數據實時備份到另一個數據中心,只需要保證這三個操作所使用的Consumer屬於不同的Consumer Group即可。
2.5 Push vs Pull
作為一個消息系統,Kafka遵循了傳統的方式,選擇由Producer向broker push消息並由Consumer從broker pull消息。
2.6 Kafka delivery guarantee
幾種可能的delivery guarantee:
At most once 消息可能會丟,但絕不會重復傳輸
At least one 消息絕不會丟,但可能會重復傳輸
Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。
當Producer向broker發送消息時,一旦這條消息被commit,因為replication的存在,它就不會丟。但是如果Producer發送數據給broker后,遇到網絡問題而造成通信中斷,那Producer就無法判斷該條消息是否已經commit。雖然Kafka無法確定網絡故障期間發生了什么,但是Producer可以生成一種類似於主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了Exactly once。
Consumer在從broker讀取消息后,可以選擇commit,該操作會在Zookeeper中保存該Consumer在該Partition中讀取的消息的offset。該Consumer下一次再讀該Partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之后的開始位置相同。
當然可以將Consumer設置為autocommit,即Consumer一旦讀到數據立即自動commit。如果只討論這一讀取消息的過程,那Kafka是確保了Exactly once。
但實際使用中應用程序並非在Consumer讀取完數據就結束了,而是要進行進一步處理,而數據處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。
Kafka默認保證At least once,並且允許通過設置Producer異步提交來實現At most once。而Exactly once要求與外部存儲系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。
三、高可用
3.1 緣起
在沒有Replication的情況下,一旦某機器宕機或者某個Broker停止工作則會造成整個系統的可用性降低。隨着集群規模的增加,整個集群中出現該類異常的幾率大大增加,因此對於生產系統而言Replication機制的引入非常重要。
引入Replication之后,同一個Partition可能會有多個Replica,而這時需要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader交互,其它Replica作為Follower從Leader中復制數據。
3.2 高可用設計解析
Kafka分配Replica的算法如下:
1.將所有Broker(假設共n個Broker)和待分配的Partition排序
2.將第i個Partition分配到第(i mod n)個Broker上
3.將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
3.3 Data Replication(副本策略)
3.3.1 消息傳遞同步策略
Producer在發布消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然后無論該Topic的Replication Factor為多少,Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每個Follower都從Leader pull數據。
Follower存儲的數據順序與Leader保持一致。Follower在收到該消息並寫入其Log后,向Leader發送ACK。
一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW並且向Producer發送ACK。
為了提高性能,每個Follower在接收到數據后就立馬向Leader發送ACK,而非等到數據寫入Log中。因此,對於已經commit的消息,Kafka只能保證它被存於多個Replica的內存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發生后該條消息一定能被Consumer消費。
Consumer讀消息也是從Leader讀取,只有被commit過的消息才會暴露給Consumer。
總結:
高可用機制,就是 replica(復制品) 副本機制。
每個 partition 的數據都會同步到其它機器上,形成自己的多個 replica 副本。所有 replica 會選舉一個 leader 出來,那么生產和消費都跟這個 leader 打交道,然后其他 replica 就是 follower。寫的時候,leader 會負責把數據同步到所有 follower 上去,讀的時候就直接讀 leader 上的數據即可。只能讀寫 leader。
Kafka 會均勻地將一個 partition 的所有 replica 分布在不同的機器上,這樣才可以提高容錯性。
如果某個 broker 宕機了,沒事兒,那個 broker上面的 partition 在其他機器上都有副本的,如果這上面有某個 partition 的 leader,那么此時會從 follower 中重新選舉一個新的 leader 出來,大家繼續讀寫那個新的 leader 即可。這就有所謂的高可用性了。
寫數據的時候,生產者就寫 leader,然后 leader 將數據落地寫本地磁盤,接着其他 follower 自己主動從 leader 來 pull 數據。一旦所有 follower 同步好數據了,就會發送 ack 給 leader,leader 收到所有 follower 的 ack 之后,就會返回寫成功的消息給生產者。(當然,這只是其中一種模式,還可以適當調整這個行為)
消費的時候,只會從 leader 去讀,但是只有當一個消息已經被所有 follower 都同步成功返回 ack 的時候,這個消息才會被消費者讀到。