哪些場景適合使用Kafka
線上系統會實時產生數以萬計的日志信息,服務器運行狀態,用戶行為記錄,業務消息 等信息,這些信息需要用於多個不同的目的,比如審計、安全、數據挖掘等,因此需要以分類的方式將這些信息發送到某個地方,以方便后台處理service實時的去獲取數據。MQ用於解決數據生成速率與數據消費速率不一致的場景,業務接口解耦,數據緩存冗余,海量數據處理彈性,異步通信。
Kafka是LinkedIn開源出來的分布式消息發布-訂閱系統,主要特點是基於Pull模式來處理消息,O(1)常數時間級別的消息持久化和讀取時間復雜度,基於at least once的處理原則,追求高吞吐量(單台機器吞吐量可達10w/s),主要用於非敏感信息如日志的收集和傳輸,不支持事務,對消息的重復、丟失和錯誤沒有嚴格要求 。
簡單的Kafka消息生產/消費模型
最簡單的Kafka拓撲網絡里只有一個broker,producer創建不同的topics,並將msg通過push的方式發送到broker,broker使用append log的方式順序性持久化msg,然后不同的consumers根據自己的消費速率按需從broker處pull自己需要的topics對應的msg;對於一個broker上的一個topic的msg而言,kafka總會保證其被consumer消費的先后順序;采用pull的消息處理模式可以讓consumer按需處理。
Broker從本地文件系統里添加消息和獲取消息都是按照隊列的入列和出列模式操作,因此時間復雜度都是O(1);Kafka不會主動刪除被消費過的消息,而是通過server.properties中的配置按照過期時間或者文件總大小來進行文件刪除。
構建Kafka Cluster,並使用zookeeper cluster作為協調服務
為了提供更高的msg吞吐量以及HA,Kafka支持以cluster的方式創建多個broker對外提供服務,由於集群環境引入的不確定性,kafka使用Zookeeper作為協調性服務(0.7.+引入),功能如下:
#1監聽broker的活躍狀態及其存儲的topic和partition狀態,協調broker leader和partition leader的選舉;
#2 為producer提供broker的訪問地址,並記錄每個topic下對應的partition leader分布地址,以幫助實現負載均衡;
#3 為consumer提供broker的訪問地址,並記錄每個partition正在被哪些CG內的哪一個consumer消費,CG中成員的變化,以幫助實現負載均衡,同時記錄每個CG的offset。
Zookeeper上典型的存儲kafka信息的格式如下,Kafka Cluster中每個broker都可以獲取關於cluster的metadata,包含active broker list,topic’s partition leaders等,因此對於producer而言每個broker都是對等的;
1 leo-chen-zookeeper 2 -> broker 3 -> ids 4 -> [broker_id] ## temp znode, value is host:port 5 -> topics 6 -> [topic_id] 7 -> [partition_id] ## temp znode, value is partition refer 8 -> consumers 9 -> [group_id] 10 -> ids 11 -> [consumer_id] ## temp znode, value is partition_id list 12 -> offsets 13 -> [topic_id] 14 -> [broker_id-partition_id] ## persist znode, value is offset 15 -> owners 16 -> [topic_id] 17 -> [broker_id-partition_id] ## temp znode, value is consumer_id
Zookeeper實現的功能如下:
#1 broker node注冊:新上線的broker會在zookeeper上創建一個temp znode以維護自己的活躍狀態,znode value為broker的訪問地址;broker下線或者session失效都會導致temp znode被刪除。
#2 broker topic注冊:新上線的broker還會根據自身存儲的topic-partition創建對應的temp znode,znode value為partition的索引,用於失效轉移的時候進行狀態對比。
#3 consumer node注冊:新上線的consumer會在zookeeper上創建一個temp znode以維護自己的活躍狀態,znode value為該consumer正在訪問的topic-partition列表;consumer的上線下線都會觸發kafka的rebalance動作。
#4 consumer-partition offset注冊:一個CG中新上線的consumer會根據自己正在訪問的partition對應的offset在zookeeper上創建一個persist znode,znode value為offset的值;當consumer下線之后,同一個CG內的其他consumer會繼續消費這個offset對應的partition msg;
#5 partition owner注冊:新上線的consumer會根據自己正在消費的topic-partition在zookeeper上創建一個temp znode,表示當前CG內所有正在被消費的partition都有哪些consumer在消費。
一個新的consumer上線之后會觸發如下操作:
#1 進行consumer node注冊;
#2 在consumers/[group_id]/ids路徑下注冊一個watcher用於監聽當前group中其他consumer臨時節點的變動,如果有變動則觸發負載均衡,通知當前consumer node重新計算可消費的topic-partition;
#3 在broker/ids路徑下注冊一個watcher用於監聽所有broker臨時節點的變動,如果有變動則觸發負載均衡,通知當前consumer node重新計算可消費的topic-partition;
維護topic下partition的數量,同步和過期策略
kafka將每一個topics拆分成多個partition(0.8.+引入)以便於負載均衡到多個broker上,由於一個topics的msg被分拆到了多個partition,則 kafka只能保證按一個partition中的msg按順序讓consumer進行消費(除partition所在的broker下線的情況),並不保證一個topic內多個partition間的msg的消費順序。一個topics的msg划分到哪個partition的策略有兩種,一是采用Key Hash算法,一是采用Round Robin算法。
Kafka通過partition log文件在文件系統上存儲msg,msg的寫入和讀取都可以是批量線性的,同時基於read-ahead,write-behind,線性讀寫,系統頁緩存的操作方式使得kafka對partition log文件的操作非常快,並且優於JVM的內存操作效率;傳統的RPC文件讀取流程會經歷四個步驟:磁盤到內核頁緩存,內核頁緩存到用戶空間緩存,用戶空間緩存到內核socket緩存,內核socket緩存到網卡緩存,最終發送給用戶;而利用sendfile和zero-copy技術可以將內核頁緩存的數據直接復制到網卡緩存,從而可以讓kafka實現近似緩存級別的數據操作速度。
Broker上典型的存儲msg的文件格式如下,~/leo-chen/kafka-msg表示log.dirs指向的根目錄,然后是按照topic以及partition划分的子目錄,[topic-name] - [partition-num],數字表示partition的編號,同一個topic下的partition盡量不要分布在一個broker下;
1 leo-chen-broker 2 -> kafka-msg 3 -> topic_report-0 4 -> 34477849968.index 5 -> 34477849968.log 6 -> 35551592052.index 7 -> 35551592052.log 8 -> topic_report-3 9 -> topic_launch_info-0 10 -> topic_api_call-0 11 -> topic_api_call-1
producer向指定的topic發送msg也就是尋找對應的topic目錄,並將一條msg entry添加到文件末尾的過程,一個日志文件由*.index和*.log組成,前者為msg的位置索引,后者是msg本身,這樣的存儲設計有如下優勢:
#1 segment file的分段存儲方式方便獨立加載,檢索和刪除數據;
#2 獨立存儲索引信息*.index的方式可以避免冗余IO操作,快速定位數據;
下圖是一個topic下一個partition log的邏輯抽象圖,所有的partition log files都以topic name作為根目錄,該broker上存儲的關於該topic的日志文件都位於此目錄下;每一條日志由三個部分組成,8個字節的offset用於唯一標記該msg,4個字節的num表示該msg的總長度,n個字節的content表示消費內容,其他就是一些版本和校驗字段;每一個segment file由一個offset區間段的msg組成,文件名是該區間最小的offset,因此獲取消息時指定一個起始offset和maximum chunk size就可以定位目標msg;將msg分文件存儲的一個好處就是如果指定了過期時間,則刪除過期的msg就只需要簡單刪除獨立的文件。
另外kafka允許給每個partition設置一個或多個replication,但只有一個partition會作為leader對外同時提供讀寫服務,其他的replication僅作為備份,他們之間的關系由zookeeper進行維護;當當前的partition leader下線后,zookeeper維護的臨時節點會因為session失效而自動刪除,因此其他的follower可以競爭成為新的leader,實現故障轉移;kafka會為每個partition維護一個a set of in-sync replicas的列表(ISR,總數為n的集群里只要有1個節點存活就能正常工作,不同於zookeeper的majority vote策略,需要總數為2n+1的集群里至少有n+1個節點存活才能正常工作,優勢在於系統的latency取決於吞吐量最快的node),存儲所有replication中與之前的partition leader同步狀態保持一致的節點,而新的leader也將從這個列表里誕生,而不需要經過投票過程產生新的leader。
Replication與partition leader同步的過程類似於consumer消費msg的過程,也是順序批量的將partition leader上的消息pull到本地;而kafka會通過兩個狀態值判定一個replication的健康狀態,一個是replication與zookeeper的heartbeat,另一個是partition與replication上最大offset的差值,只有滿足兩個條件的replication才會被加入ISR;一個msg只有在所有的replication上都進行同步之后,msg的狀態才設置為committed,表示這條msg可被consumer消費。
producer使用batch或者async的方式向broker發送消息
producer需要將msg發送到broker之前,會先為msg指定一個partitionKey,並通過可自定義的hash算法獲取一個partition refer,然后向zookeeper獲取對應partition的host:port;通過這樣的方式保證在一個topic下,所有標記為partitionKey的msg都會被發送到同一個partition上;為了提升性能,producer可以當msg累積到一定量之后統一將一批消息發送到broker,可以是累積消息數量,時間間隔,或者累積msg數據大小;Kafka支持gzip,snappy等多種數據壓縮方式。
另外由於kafka會為每個leader partition提供一個或者多個replication以保證容錯性,因此leader partition在收到msg之后會將數據同步到其他的replication上, producer可以通過設置acks參數(0|1|-1)要求同步\異步等待成功被同步了msg的replication的數量。一個msg只有在對應的所有replication上都sync之后才會在partition leader上被標注為committed狀態,表示可以被producer消費。
consumer基於consumer group和offset從broker取出msg
kafka定義一個 topic可以被多個CG(Consumer Group)監聽消費,CG內的consumer可以消費多個不同的partitions,但對於一個topic內的partitions,一個CG下的consumers只能消費不同的partitions,也就是對於一個partition而言禁止同一個CG內的consumer進行並發訪問,這樣可以最小的代價保證一個CG內的msg消費順序。
因此如果想實現topic內的消息廣播(一個msg被所有consumer消費)則為每一個consumer都創建一個CG,如果想實現topic內的消息單播(一個msg只被一個consumer消費)則所有的consumer都放到一個CG里。一般情況下一定是一個CG處理一個topic下所有的partition的所有msg,為了達到最優效率,一個topic下所有partition的數量需要大於等於一個CG內所有consumer的數量(盡量讓所有consumer都處理工作狀態),同時也要大於所有broker的數量以便於均衡分配到不同的broker上。
一個msg是否被消費是通過partition上msg隊列中的位置(offset)決定,因此consumer可以通過修改offset的值從而讀取partition上任意位置的msg,由於這個offset是交由consumers進行維護,如果是多個CG消費同一個partition的msg的話,需要各自維護自己的offset,因此不存在鎖的競爭,並且通過簡單增加broker的數量就可以提升訪問並發量,通過配置producer.property可以將offset存儲於zookeeper,或者producer自己維護 。
kafka提供三種級別的msg消費一致性語義
#1 at most once:fetch msg, update offset, consume msg,由於更新offset是在處理msg之前,所以可能出現msg丟失的場景。
#2 at least once:fetch msg, consume msg, update offset,由於更新offset是在處理msg之后,所以可能出現msg被重復消費的場景,kafka推薦配置。
#3 exactly once:在at least once的基礎上,在處理msg之前添加一個接口冪等性判定,或者基於2階段提交。
Kafka如何保證消息機制的可靠性
消息系統中由於參與方較多以及網絡延遲等問題,需要保證幾個點,
#1 保證msg發送成功:producer會同步等待broker的返回,並確認replication同步的結果,以保證消息成功被多個broker保存;但如果設置為等待所有replication都同步才返回的話會極大降低producer的吞吐量。
#2 保證msg的消費順序與發送順序一致:kafka可以保證一個broker下一個partition接受到的msg可以依次發送到consumer,但需要處理幾個常見的問題:
一個問題是由於網絡原因導致先發送的msg晚於后發送的msg到達broker/consumer,這樣的問題可以通過producer在msg上添加version,並在consumer方按照version的先后順序進行消費。
另外一個問題就是當一個broker下線之后,即使對應的partition在其他broker上有replication可以支持故障轉移,但由於partition leader被新的replication替代,CG針對原來partition鎖記錄的offset不再可用,也就是不再能保證當前msg的消費順序。
#3 保證msg被成功消費后不再重復消費:在at most once/at least once/exactly once中,kafka使用的是at least once,因此msg有可能被重復消費,而exactly once可以保證一條消息有且只有一次消費過程,可以在at least once的基礎上在producer端添加冪等性判定,由於不同msgId可能表示同一個業務消息,因此還需要從業務層面定制一個全局唯一性的標識 。