kafka是一種分布式的,基於發布訂閱的消息系統。具有以下幾個方面的特性。
1.能夠提供常數時間的消息持久化及訪問性能。
2.高吞吐率。廉價的商用機器上能夠達到每秒100k條的消息傳輸。
3.支持kafka server間的消息消息分區,分布式消費,分區內消息的順序性。
4.支持水平擴展。
5.支持離線數據處理和實時數據處理。
kafka架構
kafka的拓撲結構:
1.producer:消息生產者。
2.consumer:消息消費者。
3.broker:kafka集群由一個或者多個服務器組成。服務器被稱為broker。消息由producer發送到broker。consumer從borker中消費消息。
4.Toptic:消息主題。每條發送到kafka集群的消息都有一個Topic,物理上不同topic的消息分開存儲 。邏輯上一個topic的消息保存於一個或多個broker上。
5.partition:消息分區。每個topic包括一個或多個partition
6.consumer group:每個consumer屬於特定的group,可以為每個consumer指定group name,不指定,則屬於默認的group。
kafka拓撲結構圖
從圖中我么可以看出。kafka集群由若干producer,consumer grouper,broker,zookeeper組成。kafka通過zookeeper來管理集群的配置,以及在consumer發生變化進行reblance。
topic & partion
topic在邏輯上可以被理解為一個隊列,消息必須指明它的topic,可以理解為消息必須指定放到哪一個隊列中。為了提高kafka的吞吐率,物理上把topic分成一個或多個partion,每個partion物理上對應一個文件夾。該文件夾下存儲該partition下的消息及索引文件。
若創建兩個topic,topic1和topic2,每個topic對應有13個和19個分區,其中集群中共有8個結點,則集群中會創建32個文件夾。如下圖所示:
每個日志文件都是一個log entry序列,每個log entry序列包含一個四字節整形值(消息長度,1+4+n),一字節magic value,四字節的crc校驗碼,n字節的消息體長度組成。每條消息都有在當前partition下的唯一的64字節的offset。它指明了消息的的存儲位置,磁盤上消息的存儲格式如下:
message length : 4 bytes (value: 1+4+n) "magic" value : 1 byte crc : 4 bytes payload : n bytes
這個log entries並非由一個文件組成,而是分成多個segment,每個segment以該segment下的第一條消息的offset命名並以kafka為后綴。另外會有一個索引文件,他標明每個
segment下的log entry的offset的范圍,如下圖所示:
kafka高吞吐率的一個很重要的保證就是消息會被順序寫到partition中。如下圖所示:
對於傳統的消息系統,通常會刪除已經消費過的消息,kafka會保存已經消費的消息。並且根據實際情況對已經消費的消息提供兩種刪除策略,分別是基於消息的消費時間以及partition文件的大小。
我們可以通過配置文件$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數據,也可在Partition文件超過1GB時刪除舊數據。配置如下所示:
# The minimum age of a log file to be eligible for deletion log.retention.hours=168 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according to the retention policies log.retention.check.interval.ms=300000 # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false
對於consumer消費的消息,消息的offset由consumer來控制,對於kafka來說消息時無狀態的。kafka也不保證一個消息只由consumer group的一個consumer來消費,
從而不需要鎖機制,這也是kafka高吞吐率的一個重要保證。
push&pull
kafka采用push機制來推送消息到broker,pull機制來消費消息,push與pull機制各由優缺點。kafka采取pull機制消費消息可以簡化broker的設計,push機制采取盡快的投遞消息,
這樣很可能導致consumer來不及處理消息從而導致網絡擁塞或者拒絕服務,通過consumer自己來控制何時消費消息。即可批量消費又可逐條消費,能夠選擇不同的提交方式,從而實現不同傳輸語義。
消息遞送的保證機制。
- At most once 消息可能會丟,但絕不會重復傳輸
- At least one 消息絕不會丟,但可能會重復傳輸
-
Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。
當Producer向broker發送消息時,一旦這條消息被commit,因數replication的存在,它就不會丟。但是如果Producer發送數據給broker后,遇到網絡問題而造成通信中斷,那Producer就無法判斷該條消息是否已經commit。雖然Kafka無法確定網絡故障期間發生了什么,但是Producer可以生成一種類似於主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),這一Feature還並未實現,有希望在Kafka未來的版本中實現。(所以目前默認情況下一條消息從Producer到broker是確保了At least once,可通過設置Producer異步發送實現At most once)。
當Producer向broker發送消息時,一旦這條消息被commit,因數replication的存在,它就不會丟。但是如果Producer發送數據給broker后,遇到網絡問題而造成通信中斷,那Producer就無法判斷該條消息是否已經commit。雖然Kafka無法確定網絡故障期間發生了什么,但是Producer可以生成一種類似於主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),這一Feature還並未實現,有希望在Kafka未來的版本中實現。(所以目前默認情況下一條消息從Producer到broker是確保了At least once,可通過設置Producer異步發送實現At most once)。
-
讀完消息先commit再處理消息。這種模式下,如果Consumer在commit后還沒來得及處理消息就crash了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應於At most once
-
讀完消息先處理再commit。這種模式下,如果在處理完消息之后commit之前Consumer crash了,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經被處理過了。這就對應於At least once。在很多使用場景下,消息都有一個主鍵,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,那就可以認為是Exactly once。(筆者認為這種說法比較牽強,畢竟它不是Kafka本身提供的機制,主鍵本身也並不能完全保證操作的冪等性。而且實際上我們說delivery guarantee 語義是討論被處理多少次,而非處理結果怎樣,因為處理方式多種多樣,我們不應該把處理過程的特性——如是否冪等性,當成Kafka本身的Feature)
- 如果一定要做到Exactly once,就需要協調offset和實際操作的輸出。精典的做法是引入兩階段提交。如果能讓offset和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支持兩階段提交。比如,Consumer拿到數據后可能把數據放到HDFS,如果把最新的offset和數據本身一起寫到HDFS,那就可以保證數據的輸出和offset的更新要么都完成,要么都不完成,間接實現Exactly once。(目前就high level API而言,offset是存於Zookeeper中的,無法存於HDFS,而low level API的offset是由自己去維護的,可以將之存於HDFS中)
總之,Kafka默認保證At least once,並且允許通過設置Producer異步提交來實現At most once。而Exactly once要求與外部存儲系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。