消息中間件MessageQuene
- 解耦且可擴展:業務復雜度的提升帶來的也是耦合度的提高,消息隊列在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
- 冗余:有些業務在處理過程中如果失敗了,數據在未進行持久化的時候就已經消失,消息隊列把數據持久化直到他們被處理,避免了數據的丟失
- 處理並發:大數據量訪問的時候我們可以將消息放入隊列中,然后在隊列里面按照系統的吞吐能力來進行穩定的抽取數據並進行業務處理。
- 可恢復:一部分系統出現問題,可能影響整個程序穩定,消息隊列由於將數據持久化,所以在出現問題的時候可以起到一個備份的作用,系統穩定之后可以進行數據重新消費。
- 送達保證:大多數消息隊列都有一套自己的消息處理機制,一般分為消息處理多次,消息至少被處理一次等情況,這使得我們處理業務減少了數據丟失情況的發生。
- 順序處理:按照一定的順序發送消息,使得消息在隊列中是有序存在的,所以在消費數據的時候我們也是有序處理的(先進先出)。
- 異步通信:很多時候,你不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許你把一個消息放入隊列,但並不立即處理它。你想向隊列中放入多少消息就放多少,然后在你樂意的時候再去處理它們。
常用MQ對比
- RedisMQ:Redis是基於Key-Value的NoSql數據庫,本身支持MQ隊列操作,是一個輕量級的隊列服務,在使用Redis作為緩存的項目中,可以優先使用RedisMQ作為消息隊列進行業務處理。優點:輕量級,容易開發。缺點:數據大的時候入隊速度較慢
- ActiveMQ:Apache的一個開源子項目,支持多種語言以及網絡協議。優點:容易開發,並且有自帶重連機制。缺點:傳入文件不方便,數據大的時候效率一般,消費失敗的數據將會丟失。
- Kafka:Apache的一個開源子項目,高性能大吞吐量並且能夠滿足跨語言平台分布式。優點:快速持久化,大吞吐量,支持hadoop數據並行加載,可以進行離線消息處理。缺點:開發困難,配置文件復雜,開源代碼較少。
Kafka元素介紹
- broker:kafka搭建的集群服務器稱為broker,集群中一台服務器可以搭載多個broker。
- Topic:每條發布到Kafka集群的消息都有一個類別,這個類別被稱為topic。我們可以認為同一種類型的消息存到同一個topic下,這點和Map的key值有相似之處。
- Partiton:parition是物理上的概念,每個topic包含一個或多個partition,創建topic的時候可以指定partiton的數量。每一個partition對應一個文件夾。文件夾里面存放索引文件以及數據文件。
- Producer:負責發布消息到Broker中
- Consumer:每個消費者屬於特定的消費者組,每個消費者組只能消費一個partition的數據,多個組可以同時消費同一個partition的數據。
Kafka的架構
一個典型的kafka集群中包含若干producer,若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高), 若干consumer group,以及一個 Zookeeper 集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將消息發布到broker,consumer使用pull模式從 broker訂閱並消費消息。
Kafka處理消息的機制
- producer:Producer向Broker推送消息(push)
- consumer:consumer從broker拉取消息(pull)
- push模式下很難適應不同消費速率的消費者處理消息,因為發送消息的速度是由broker決定的。push模式的目的是為了盡快的消費,但是這樣容易造成consumer來不及處理消息。典型的表現就是網絡阻塞和拒絕服務。
- pull模式可以根據消費者消費速率而抓取消息,這樣避免了網絡阻塞的情況發生。
Topic與Partition認識——簡單分布
Partition分布我們主要分成單Broker和多個Broker:
單個Broker:
創建一個partition為3,Replica為1,Topic名字為order的topic。我們得到的分布式在配置好的LOG文件夾中生成三個分別為:order-0、order-1、order-2的文件夾用來存儲Partition下的信息的.index文件.log文件和.timeindex文件。
多個Broker:
創建一個partition為3,Replica為1,Topic名字為order的topic。我們在Broker0中對應的LOG文件夾中只是發現了order-0的文件夾,在其他Broker中分別發現了Partition的文件夾。如果Broker數大於Partition數,那么有Broker中沒有對應的Partition;如果Broker小於Partition數,Broker中會存在多個Partition。
下面以一個Kafka集群中4個Broker舉例,創建1個topic包含4個Partition,2個 Replication:
當集群中新增2節點,Partition增加到6個時分布情況:
Topic與Partition認識——分布邏輯
副本分配邏輯:
- 在kafka集群中,每個Broker分配Partition的leader機會是均等的
- 每個Broker(按照brokerid有序)依次分配leader的Partition,下一個Broker為副本,如此循環迭代分配,多副本也遵循此規則。
副本分配算法:
- 將所有的Broker和待分配的Partition進行排序
- 將第i個Partition分配到第(i mod n)個Broker上
- 將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上.
Partition——Leader與Follower
Leader和Follower:
- opic的Replication不是必選
- 如果Replication中的Leader宕機了,那么副本Follower將作為Leader Follower的數據是從Leader中拉取的,而不是Leader發送的
- 判斷節點是否alive:首先節點必須可以維護和zookeeper的連接,zookeeper通過心跳機制檢查每個節點的鏈接;其次是如果這個節點是follower,必須能夠同步leader的寫操作,延時不能太久。(符合條件的節點為同步中狀態,如果出現上述問題的節點,leader將會把節點移除,時間由replica.lag.time.max.ms決定,消息由replica.lag.max.messages決定)
- 如果有副本,那么當所有副本都加入日志中才算這條信息commited,只有消息是commited狀態的才能被消費者消費。(Producer可以根據acks參數來決定是否等待消息提交通知反饋)
- Kafka只要保證有一個節點是同步中的,已經commited的消息就不會丟失。
Partition——Leader選舉
- 分布式Leader選舉機制有Follower投票,以及Follower命中率機制來等,這兩種機制都是根據副本節點的狀態來動態選擇的。而kafka並不使用這兩種。
- 雖然kafka使用zookeeper進行leader選擇,但是它采用FastLeaderElection的方式和傳統方式有所不同。
- kafka自己維護同步狀態的副本集合(ISR),這個里面的節點都是和Leader數據保持高度一致的,任何一條消息保證每個節點的消息追加到日志中才會告訴Leader這個消息已經commited,所以任何Follower節點在Leader宕掉之候都可能被選擇為Leader,並且這些節點都是通過zookeeper管理維護。
- Follower節點為N,如果N-1個節點都宕,kafka依然可以正常工作,如果某一個宕掉節點又重新alive,由會被重新加入到ISR中。
- 如果N個節點都宕掉了,就會等待ISR中任何一個重新alive的節點作為Leader,或者選擇所有節點中重新alive的節點作為Leader(貌似新版本kafka中可以配置Leader選舉的這種情況的選擇方式)
Partition——文件存儲方式
- 每個partion(目錄)相當於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便old segment file快速被刪除。
- 每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。
- 磁盤在進行非線性寫入的效率很低,在知道讀寫順序的情況下效率很高,這是kafka效率高,滿足高吞吐量的條件。
Partition——文件
- 段落文件由三部分組成:.index文件、.log文件以及.timeindex文件。(0.8版本之前的kafka沒有timeindex文件)
- index文件為索引文件,命名規則為從0開始到,后續的由上一個文件的最大的offset偏移量來開頭(19位數字字符長度)
- .log文件為數據文件,存放具體消息數據
- .timeindex文件,是kafka的具體時間日志
- 如果設置log失效時間比較短,就會出現下面這種狀況
Partition——index和log的匹配關系
索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。其中以索引文件中 元數據3,497為例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移 地址為497。
Partition——Message的物理結構
Partition——通過Offset查找Message
讀取Offset為368776的Message:
1.查詢段落文件:
假設partition有兩個index文件,分別為:0...0.index文件和0...383562.index文件,由於我們第二個的偏移量初始值為383562+1,而要查詢的文件為368776,所以我們查詢的Message在第一個0...0.log中。使用offset **二分查找**文件列表,就可以快速定位到具體文件。
2.段落文件索引與Message文件進行匹配:
通過segment file查找message通過第一步定位到segment file,當offset=368776時,依次定位到0...000.index的元數據物理位置和 0..000.log的物理偏移地址,然后再通過0...0.log順序查找直到 offset=368776為止
Consumer——High Level
很多時候,客戶程序只是希望從Kafka讀取數據,不太關心消息offset的處理。同時也希望提供一些語義,例如同一條消息只被某一個Consumer消費(單播)或被所有Consumer消費(廣播)。因此,Kafka Hight Level Consumer提供了一個從Kafka消費數據的高層抽象,從而屏蔽掉其中的細節並提供豐富的語義。
Consumer——Consumer Group
每一個consumer實例都屬於一個consumer group,每一條消息只會被同一個consumer group里的一個consumer實例消費。(不同consumer group可以同時消費同一條消息)
Consumer——Rebalance
Kafka保證同一Consumer Group中只有一個Consumer會消費某條消息,實際上,Kafka保證的是穩定狀態下每一個Consumer實例只會消費某一個或多個特定Partition的數據,而某個Partition的數據只會被某一個特定的Consumer實例所消費。
Kafka對消息的分配是以Partition為單位分配的。這樣設計的劣勢是無法保證同一個消費者組里的Consumer均勻消費數據,優勢是每個Consumer不用都跟大量的Broker通信,減少通信開銷,同時也降低了分配難度,實現也更簡單。另外,因為同一個Partition里的數據是有序的,這種設計可以保證每個Partition里的數據可以被有序消費。
如果某Consumer Group中Consumer數量少於Partition數量,則至少有一個Consumer會消費多個Partition的數據,如果Consumer的數量與Partition數量相同,則正好一個Consumer消費一個Partition的數據。而如果Consumer的數量多於Partition的數量時,會有部分Consumer無法消費該Topic下任何一條消息。
kafkar的Rebalance的算法:
- 將目標Topic下的所有Partirtion排序,存於PT
- 對某Consumer Group下所有Consumer排序,存於CG
- 第i個Consumer記為Ci
- N=size(PT)/size(CG),向上取整
- 解除Ci對原來分配的Partition的消費權(i從0開始)
- 將第i∗N到(i+1)∗N−1個Partition分配給Ci
分區的算法:
分區數=Tt/Max(Tp,Tc)
Tp:producer吞吐量 Tc:consumer吞吐量 Tt目標的吞吐量
Consumer——Low Level
使用Low Level(Simple Consumer)原因是用戶希望比ConsumerGroup更好的控制數據的消費:
- 同一條消息消費多次
- 只讀某一個topic下的固定分區數據
- 管理事務,某一條消息僅被消費一次
額外工作:
- 必須在應用程序中跟蹤offset,從而確定下一條應該消費哪條消息
- 應用程序需要通過程序獲知每個Partition的Leader是誰
- 必須處理Leader的變化
由於操作比較復雜,所以我們在滿足需求的同時選擇使用High Level Consumer