Kafka、RabbitMQ、RocketMQ 全方位對比


 

碼字好辛苦的,這篇文章反反復復修改了好幾天,如果榮幸被轉載的話, 請注明來源 https://www.cnblogs.com/snow-man/p/10062394.html

用途,背景

--------Kafka--------

是LinkedIn開源的分布式發布-訂閱消息系統,目前歸屬於Apache定級項目。Kafka主要特點是基於Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用於日志收集和傳輸。0.8版本開始支持復制,不支持事務,對消息的重復、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務(行為跟蹤,日志收集等)。

默認通訊接口9092

-------RabbitMQ------

是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。本身支持很多的協議:AMQP,XMPP, SMTP, STOM它變的非常重量級,更適合於企業級的開發。同時實現了一個經紀人(Broker)構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)或者數據持久化都有很好的支持。

特點: 通過交換機(Exchange)實現消息的靈活路由。

默認通訊接口5672,webui端口15672

-------RocketMQ------

是阿里開源的消息中間件,它是純Java開發,具有高吞吐量、高可用性、適合大規模分布式系統應用的特點。RocketMQ思路起源於Kafka,但並不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優化,目前在阿里集團被廣泛應用於交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景。

特點:

1 億級別消息堆積能力;
2 采用零拷貝的原理,順序寫盤,隨機讀;
3 底層通信框架采用Netty NIO; 
4 NameServer代替Zookeeper,實現服務尋址和服務協調;
5 消息失敗重試機制、消息可查詢;經過多次雙十一的考驗

 

默認端口9876

 


 

集群對比

--------Kafka--------

一個典型的kafka集群包含若干Producer(可以是應用節點產生的消息,也可以是通過Flume收集日志 產生的事件),若干個Broker(kafka支持水平擴展)、若干個Consumer Group,以及一個 zookeeper集群。

kafka通過zookeeper管理集群配置及服務協同。Producer使用push模式將消息發布到broker,consumer通過監聽使用pull模式從broker訂閱並消費消息。 

多個broker協同工作,producer和consumer部署在各個業務邏輯中。三者通過zookeeper管理協調請 求和轉發。這樣就組成了一個高性能的分布式消息發布和訂閱系統。 

 

-------RabbitMQ------

因為 Erlang 天生具備分布式的特性, 所以 RabbitMQ 天然支持集群,不需要通過引入 ZK 或者數據庫來實現數據同步。RabbitMQ 通過/var/lib/rabbitmq/.erlang.cookie 來驗證身份 

RabbitMQ 分兩種節點

  一種是磁盤節點(Disc Node),一種是內存節點(RAM Node)。 

  集群中至少需要一個磁盤節點用來持久化元數據,否則全部內存節點崩潰時,就無 從同步元數據。未指定類型的情況下,默認為磁盤節點。 

 RabbitMQ集群分兩種:普通集群、鏡像集群

普通集群模式:

  不同的節點之間只會相互同步元數據,隊列不同步,只在自己的broker中,普通集群模式不能保證隊列的高可用性,因為隊列內容不會復制。如果節點失效將導致相關隊列不可用 

 

 

 

 鏡像隊列模式:

  消息內容會在鏡像節點間同步,保證 100% 數據不丟失。在實際工作中也是用得最多的,並且實現非常的簡單,一般互聯網大廠都會構建這種鏡像集群模式。

 mirror 鏡像隊列,目的是為了保證 rabbitMQ 數據的高可靠性解決方案,主要就是實現數據的同步,一般來講是 2-3個節點實現數據同步。對於 100% 數據可靠性解決方案,一般是采用3個節點。

 

-------RocketMQ------

RocketMQ由四部分組成

1)、Name Server 可集群部署,節點之間無任何信息同步。提供輕量級的服務發現和路由

2)、Broker(消息中轉角色,負責存儲消息,轉發消息) 部署相對復雜,Broker 分為Master 與Slave,一 個Master 可以對應多個Slave,但是一個Slave 只能對應一個Master,
Master 與Slave 的對應關系通過 指定相同的BrokerName,不同的BrokerId來定 義,BrokerId為0 表示Master,非0 表示Slave。 Master 也可以部署多個。 3)、Producer,生產者,擁有相同 Producer Group 的 Producer 組成一個集群, 與Name Server 集群 中的其中一個節點(隨機選擇)建立長連接,
定期從Name Server 取Topic 路由信息,並向提供Topic 服務的Master 建立長連接,且定時向Master 發送心跳。Producer 完全無狀態,可集群部署。 4)、Consumer,消費者,接收消息進行消費的實例,擁有相同 Consumer Group 的 Consumer 組成 一個集群,與Name Server 集群中的其中一個節點(隨機選擇)建立長連接,
定期從Name Server 取 Topic 路由信息,並向提供Topic 服務的Master、Slave 建立長連接,且定時向Master、Slave 發送心跳。
Consumer既可以從Master 訂閱消息,也可以從Slave 訂閱消息,訂閱規則由Broker 配置決定。

 

要使用rocketmq,至少需要啟動兩個進程,nameserver、broker,前者是各種topic注冊中心,后者是真正的broker。

Producer 與 NameServer 集群中的其中一個節點(隨機選擇)建立長連接,定期從 NameServer 獲取 Topic 路由信息,並向提供 Topic 服務的 Broker Master 建立長連接,且定時向 Broker 發送心跳。
Producer 只能將消息發送到 Broker master,但是 Consumer 則不一樣,它同時和提供 Topic 服務的 Master 和 Slave建立長連接,
既可以從 Broker Master 訂閱消息,也可以從 Broker Slave 訂閱消息。

其他一些概念:
NameServer
NameServer可以部署多個,相互之間獨立,其他角色同時向多NameServer機器上報狀態信息,從而達到熱備份的目的。 
NameServer本身是無狀態的,也就是說NameServer中的Broker、Topic等狀態信息不會持久存儲,都是由各個角色定時上報並 存儲到內存中的(NameServer支持配置參數的持久化,一般用不到)。 為何不用ZooKeeper? ZooKeeper的功能很強大,包括自動Master選舉等,RocketMQ的架構設計決定了它不需要進行Master選舉, 用不到這些復雜的功能,只需要一個輕量級的元數據服務器就足夠了。
值得注意的是,NameServer並沒有提供類似Zookeeper的watcher機制, 而是采用了每30s心跳機制。

心跳機制

  1. 單個Broker跟所有Namesrv保持心跳請求,心跳間隔為30秒,心跳請求中包括當前Broker所有的Topic信息。Namesrv會反查Broer的心跳信息, 如果某個Broker在2分鍾之內都沒有心跳,則認為該Broker下線,調整Topic跟Broker的對應關系。但此時Namesrv不會主動通知Producer、Consumer有Broker宕機。
  2. Consumer跟Broker是長連接,會每隔30秒發心跳信息到Broker。Broker端每10秒檢查一次當前存活的Consumer,若發現某個Consumer 2分鍾內沒有心跳, 就斷開與該Consumer的連接,並且向該消費組的其他實例發送通知,觸發該消費者集群的負載均衡(rebalance)。
  3. 生產者每30秒從Namesrv獲取Topic跟Broker的映射關系,更新到本地內存中。再跟Topic涉及的所有Broker建立長連接,每隔30秒發一次心跳。 在Broker端也會每10秒掃描一次當前注冊的Producer,如果發現某個Producer超過2分鍾都沒有發心跳,則斷開連接.

原文鏈接:https://blog.csdn.net/javahongxi/article/details/84931747

 

RocketMQ天生對集群的支持非常友好

1)單Master
優點:除了配置簡單沒什么優點 缺點:不可靠,該機器重啟或宕機,將導致整個服務不可用 2)多Master 優點:配置簡單,性能最高 缺點:可能會有少量消息丟失(配置相關),單台機器重啟或宕機期間,該機器下未被消費的消息在機 器恢復前不可訂閱,影響消息實時性 3)多Master多Slave,每個Master配一個Slave,有多對Master-Slave,集群采用異步復制方式,主備有短暫消息延遲,毫秒級 優點:性能同多Master幾乎一樣,實時性高,主備間切換對應用透明,不需人工干預 缺點:Master宕機或磁盤損壞時會有少量消息丟失 4)多Master多Slave,每個Master配一個Slave,有多對Master-Slave,集群采用同步雙寫方式,主備都寫成功,向應用返回成功 優點:服務可用性與數據可用性非常高 缺點:性能比異步集群略低,當前版本主宕備不能自動切換為主

 


 Broker特點及分區容錯、副本機制

--------Kafka--------

1. Broker 不維護數據消費狀態,只是負責數據的順序讀寫,功能單一,不需要創建對象及GC操作,效率高。

2. kafka各broker關系: 各broker之間關系平等, 只有具體topic下的partition才有主從關系,其中master節點負責client的讀寫,follower不負責備份(ISR機制)。

3. Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把 這條消息放進哪個queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個 Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。【分區是為了性能,副本是為了容錯】

4. kafka包括一個默認的topic: consumer__*, 默認是50個分區,分區時機是(分區位置= groupid.hascode()%50)記錄每個ConsumerGroup的offset信息,這個之前是存儲在zk中的。后來考慮的ZK性能問題,改村到kafka自己的topic中

 

partition 分區副本

kafka 一個Topic下分成若干個partition(partition數量不得超過broker數量),每個partition可以有若干個副本。

好處是便於數據橫向擴展,提高相率,同時支持災難恢復。  副本集中只有一個leader,若干個follower, leader負責數據的讀寫,follower只負責數據備份。

幾個概念

leader副本:響應clients端讀寫請求的副本 
follower副本:被動地備份leader副本中的數據,不能響應clients端讀寫請求。 ISR副本:(In Sync Relica)包含了leader副本和所有與leader副本保持同步的follower副本 LEO:即日志末端位移(log end offset),告訴producer,數據已經錄入
HW:即上面提到的水位值。HW值不會大於LEO值。用來對應consumer--數據可以消費了,

 leader分區中 HW~LEO 之間的的表示leader已經存儲,但是還沒有同步到follower

 

producer寫入流程大體如下:

1. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader
2. producer 將消息發送給該 leader
3. leader 將消息寫入本地 log
4. followers 從 leader pull 消息,寫入本地 log 后 leader 發送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 並向 producer 發送 ACK

 

 ISR必須滿足如下條件:

  leader leo與follower leo同步時間不能太長replica.lag.time.max.min,如果該follower在此時間間隔內一直沒有追 上過leader的所有消息,則該follower就會被剔除isr列表

副本Leader選舉

當partition leader宕機時,zk會啟動Leader選舉  

a) 優先從isr列表中選出第一個作為leader副本,這個叫優先副本,理想情況下有限副本就是該分區的leader副本

b) 如果isr列表為空,則查看該topic的unclean.leader.election.enable配置。
  為true則代表允許選用非isr列表的副本作為leader,那么此時就意味着數據可能丟失,為false的話,則表示不允許,直接拋出NoReplicaOnlineException異常,造成leader副本選舉失敗。

c) 如果上述配置為true,則從其他副本中選出一個作為leader副本,並且isr列表只包含該leader 副本。一旦選舉成功,則將選舉后的leader和isr和其他副本信息寫入到該分區的對應的zk路徑上。

 

-------RabbitMQ------

消息從交換機路由到隊列如果保證可靠性。

有兩種方式處理交換機無法路由隊列的問題,一種就是讓服務端重發給生產者,一種是讓 交換機路由到另一個備份的交換機。

第一種方式是消息回發,添加ReturnListener,同時Mandatory=false.

        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode,
                                     String replyText,
                                     String exchange,
                                     String routingKey,
                                     AMQP.BasicProperties properties,
                                     byte[] body)
                    throws IOException {
                System.out.println("=========監聽器收到了無法路由,被返回的消息============");
                
            }
        });
        // 第三個參數是設置的mandatory,如果mandatory是false,消息也會被直接丟棄
        channel.basicPublish("","gupaodirect",true, properties,"只為更好的你".getBytes());

 

第二種方式:

消息路由到備份交換機的方式:在創建交換機的時候,從屬性中指定備份交換機。

Map<String,Object> arguments = new HashMap<String,Object>(); arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); // 指定交換機的備份交換機 ​
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);

   

-------RocketMQ------

消息類型

 Broker上存Topic信息,Topic由多個隊列組成,隊列會平均分散在多個Broker上。Producer的發送機制保證消息盡量平均分布到 所有隊列中,最終效果就是所有消息都平均落在每個Broker上。

1.普通消息

  普通消息也叫做無序消息,簡單來說就是沒有順序的消息,producer 只管發送消息,consumer 只管接收消息,至於消息和消息之間的順序並沒有保證,可能先發送的消息先消費,也可能先發送的消息后消費。

2.有序消息
  有序消息就是按照一定的先后順序的消息類型。
  • 全局有序消息:只有一個隊列一個消費者,效率受限
  • 局部有序消息:配置算法,讓所有相關消息進入同一個隊列。

3.延時消息
延時消息,簡單來說就是當 producer 將消息發送到 broker 后,會延時一定時間后才投遞給 consumer 進行消費。

 


消息分發策略

--------Kafka--------

消息是kafka中最基本的數據單元,在kafka中,一條消息由key、value兩部分構成

在發送一條消息時,我們可以指定這個key,那么producer會根據key和partition機制來判斷當前這條消息應該發送並存儲到哪個partition中。

我們可以根據需要進行擴展producer的partition機制。

一個topic下多個partition對應consumer group的分配策略包括三種
1. RangeAssignor(范圍分區):   分區按照序號進行排序,消費者按照字 母順序進行排序 2. RoundRobinAssignor(輪詢分區):   輪詢分區策略是把所有partition和所有consumer線程都列出來,然后按照hashcode進行排序。最后通過輪詢算法分配partition給消費線程 3. StrickyAssignor 分配策略   粘性策略的目的是兩個:分區的分配盡可能的均勻,分區的分配盡可能和上次分配保持相同
producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪一個 partition。其路由機制為:
1. 指定了 patition,則直接使用;
2. 未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition
3. patition 和 key 都未指定,使用輪詢選出一個 patition。

 

什么時候會引起kafka consumer的rebalance

1. 同一個consumer group內新增了消費者
2. 消費者離開當前所屬的consumer group,比如主動停機或者宕機 
3. topic新增了分區(也就是分區數量發生了變化)

 小結:

消費者和分區的綁定關系是固定的,除非其他原因引起的Rebalance操作, 這樣最大的好處是不需要考慮leader選舉的情況,更不用開率並發的情況,一切為了效率。

-------RabbitMQ------

除了AMQP 之外,RabbitMQ 支持多種協議,STOMP、MQTT、HTTP and WebSockets。

特有組件,VHost,Exchange,Channel

VHost: 
    VHOST 除了可以提高硬件資源的利用率之外,還可以實現資源的隔離和權限的控制,它的作用類似於編程語言中的 namespace 和 package,不同的 VHOST 中可以有 同名的 Exchange 和 Queue,它們是完全透明的。

Exchange: 
    交換機是一個綁定列表,用來查找匹配的綁定關系。隊列使用綁定鍵(Binding Key)跟交換機建立綁定關系。 生產者發送的消息需要攜帶路由鍵(Routing Key),
  交換機收到消息時會根據它保存的綁定列表,決定將消息路由到哪些與它綁定的隊列上 

Channel:
    AMQP 里面引入了 Channel 的概念,它是一個虛擬的連接。我們就可以在保持的 TCP 長連接里面去創建和釋放 Channel,大大了減少了資源消耗.

Queue:  
    隊列是真正用來存儲消息的,是一個獨立運行的進程,有自己的數據庫(Mnesia)。

 

路由方式有三種:Direct,Topic,Fanout

Direct : Exchange+bingKey 精確綁定
Topic: Exchange+bingKey(綁定鍵中使用通配符)
Fanout 只需要Exchange,不需要bingKey

-------RocketMQ------

這些 consumer 實例使用同一個 group name。注意同一個消費組的 tag 也必須是一樣的。
 
集群消費:
  同一個topic下的消息會給消費組里的消費平分,當一個消息消費失敗的話,會被轉發到別的消費者
 
廣播消費:
  每條消息都會被 consumer 集群內所有的 consumer 實例消費一次

消息過濾
  在 RocketMQ 中消費者是可以按照 Tag 對消息進行過濾。對於消息分類,我們可以選擇創建多個 Topic 來區分,也可以選擇在同一個 Topic 下創建多個 tag 來區分。這兩種方式都是可行的,但是一般情況下,不同的 Topic 之間的消息是沒有什么必然聯系的,使用 tag 來區分同一個 Topic 下相互關聯的消息則更加合適一些。

消息重試:

  概念:就是當消費者消費消息失敗后,broker 會重新投遞該消息,直到消費成功,消息重試只針對集群消費模式,廣播消費沒有消息重試的特性。如果消費重試一直失敗,最終消息會進入死信隊列,我們還需要對死信隊列做單獨的補償機制。

消費重試來源:

返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
返回 null
拋出異常

 


 Producer => Broker可靠性投遞

--------Kafka--------

kafka對於消息的發送,可以支持同步和異步。

從本質上來說,kafka都是采用異步的方式來發送消息到broker,但是kafka並不是每次發送消息都會直接發送到broker上,而是把消息放到了一個發送隊列中,然后通過一個后台線程不斷從隊列取出消息進行發送,發送成功后會觸發callback。

kafka客戶端會積累一定量的消息統一組裝成一個批量消息發送出 去,觸發條件是前面提到的batch.size和linger.ms 

  batch.size和linger.ms這兩個參數是kafka性能優化的關鍵參數

生產者發送消息的可靠性

也就是我要保證我這個消息一定是到了broker並且完成了多副本的持久化,配置項為request.required.acks,如:properties.put("request.required.acks","-1");。

它有幾個可選項 

1: 生產者把消息發送到leader副本,leader副本在成功寫入到本地日志之后就告訴生產者消息提交成功,但是如果isr集合中的follower副本還沒來得及同步leader副本的消息, leader掛了,就會造成消息丟失。

-1 :消息不僅僅寫入到leader副本,並且被ISR集合中所有副本同步完成之后才告訴生產者已 經提交成功,這個時候即使leader副本掛了也不會造成數據丟失。
0:表示producer不需要等待broker的消息確認。這個選項時延最小但同時風險最大(因為 當server宕機時,數據將會丟失)。

Kafka delivery guarantee(message傳送保證):

(1)At most once消息可能會丟,絕對不會重復傳輸;
(2)At least once 消息絕對不會丟,但是可能會重復傳輸; (3)Exactly once每條信息肯定會被傳輸一次且僅傳輸一次,這是用戶想要的。

說明:當 producer 向 broker 發送消息時,一旦這條消息被 commit,由於 replication 的存在,它就不會丟。但是如果 producer 發送數據給 broker 后,遇到網絡問題而造成通信中斷,那 Producer 就無法判斷該條消息是否已經 commit。雖然 Kafka 無法確定網絡故障期間發生了什么,但是 producer 可以生成一種類似於主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了 Exactly once,但目前還並未實現。所以目前默認情況下一條消息從 producer 到 broker 是確保了 At least once,可通過設置 producer 異步發送實現At most once。

 

-------RabbitMQ------

整個RabbitMQ的架構如下

  

 在Producer發送消息到Broker整個流程中,Broker確認機制有兩種。第一種是 Transaction(事務)模式,第二種 Confirm(確認)模式。

1. 事務機制:

我們通過一個 channel.txSelect()開啟事務了,發送成功之后 channel.txCommit();否則使用channel.txRollback()

spring boot中設置rabbitTemplate.setChannelTransacted(true);

        try {
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); channel.txCommit(); System.out.println("消息發送成功"); } catch (Exception e) { channel.txRollback(); System.out.println("消息已經回滾"); }

缺點:它是阻塞的,一條消息沒有發送完畢,不能發送下一條消息,它會榨干 RabbitMQ 服務器的性能。所以不建 議大家在生產環境使用。

2. 確認(Confirm)模式

 有三種

  1. 是普通確認模式。 channel.confirmSelect() ,缺點:發送 1 條確認 1 條 

  2. 批量確認模式:channel.waitForConfirmsOrDie();

  3. 異步確認: 添加ConfirmListener , channel.addConfirmListener(new ConfirmListener())

  demo:

        channel.addConfirmListener(new ConfirmListener() {
            public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Broker未確認消息,標識:" + deliveryTag); if (multiple) { // headSet表示后面參數之前的所有元素,全部刪除 confirmSet.headSet(deliveryTag + 1L).clear(); } else { confirmSet.remove(deliveryTag); } // 這里添加重發的方法  } public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 如果true表示批量執行了deliveryTag這個值以前(小於deliveryTag的)的所有消息,如果為false的話表示單條確認 System.out.println(String.format("Broker已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple)); if (multiple) { // headSet表示后面參數之前的所有元素,全部刪除 confirmSet.headSet(deliveryTag + 1L).clear(); } else { // 只移除一個元素  confirmSet.remove(deliveryTag); } System.out.println("未確認的消息:"+confirmSet); } });

 

-------RocketMQ------

SendResult中,有一個sendStatus狀態,表示消息的發送狀態。一共有四種狀態

1. FLUSH_DISK_TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策Ill創立設置成 SYNC_FLUSH 才會報這個錯誤) 。

2. FLUSH_SLAVE_TIMEOUT :表示在主備方式下,並且Broker 被設置成SYNC_MASTER 方式,沒有 在設定時間內完成主從同步。 3. SLAVE_NOT_AVAILABLE : 這個狀態產生的場景和FLUSH_SLAVE_TIMEOUT 類似, 表示在主備方 式下,並且Broker 被設置成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。 4. SEND OK :表示發送成功。

 

RocketMQ消息支持的模式 

1.NormalProducer(普通) /消息同步發送 
  普通消息的發送和接收在前面已經演示過了,在上面的案例中是基於同步消息發送模式。也就是說消息 發送出去后,producer會等到broker回應后才能繼續發送下一個消息 

2.消息異步發送 
  MQ 的異步發送,需要用戶實現異步發送回調接口(SendCallback)。 

3.OneWay 
  單向(Oneway)發送特點為發送方只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答.效率最高 

 

半消息(HalfMessage):

指的是發送方已經將消息發送給MQ服務器,但是服務器端未收到生產者對該消息的二次確認,此時消息就會被標記成 "暫不能投遞狀態" ,處於該狀態的消息即

半消息消息回查(MessageStatusCheck):

由於網絡閃斷,生產者應用重啟等原因,導致某條事務消息的二次確認丟失。MQ服務器通過掃描發現某條消息長時間處於 "半消息" 時,需要主動向消息生產者詢問該消息的最終狀態(Commit還是Rollback),該過程就是消息回查。

 

RocketMQ事務消息的三種返回狀態

1. ROLLBACK_MESSAGE:回滾事務
2. COMMIT_MESSAGE: 提交事務 3. UNKNOW: broker會定時的回查Producer消息狀態,直到徹底成功或失敗。

當executeLocalTransaction方法返回ROLLBACK_MESSAGE時,表示直接回滾事務,當返回 COMMIT_MESSAGE提交事務

當返回UNKNOW時,Broker會在一段時間之后回查checkLocalTransaction,根據 checkLocalTransaction返回狀態執行事務的操作(回滾或提交),

RocketMQ消息的事務架構設計

1.生產者執行本地事務,修改訂單支付狀態,並且提交事務

2.生產者發送事務消息到broker上,消息發送到broker上在沒有確認之前,消息對於consumer是不可見狀態

3.生產者確認事務消息,使得發送到broker上的事務消息對於消費者可見

4.消費者獲取到消息進行消費,消費完之后執行ack進行確認

5.這里可能會存在一個問題,生產者本地事務成功后,發送事務確認消息到broker上失敗了怎么辦?這個時候意味着消費者無法正常消費到這個消息。所以RocketMQ提供了消息回查機制,如果 事務消息一直處於中間狀態,broker會發起重試去查詢broker上這個事務的處理狀態。一旦發現事務處理成功,則把當前這條消息設置為可見。

 


 

Broker => Consumer 可靠性消費

--------Kafka--------

consumer group中所有consumer平均消費同一個topic下的消息,並且每個consumer只能消費一個分區。 

coordinator(負載最小的broker)來執行對於consumer group的管理(Rebalance),以及確定整個consumer group的使用的分區策略
 (每個消費者可以有自己的分區策略,但最終策略呦coordinator確定)

enable.auto.commit:默認為true,也就是自動提交offset,自動提交是批量執行的,但會帶來重復提交或者消息丟失的問題,

 

對於高可靠性要求的程序,要使用手動提交。 對於高可靠要求的應用來說,寧願重復消費也不應該因為消費異常而導致消息丟失 

        Properties props = new Properties();
        props.put("bootstrap.servers", "xxxxxx");//服務器ip:端口號,集群用逗號分隔
        props.put("group.id", "test");
        //取消自動回復
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("itsm-test"));

//消費
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(100);
            if (records.count() > 0) {
                for (ConsumerRecord<String, String> record : records) {
                    String message = record.value();
                    System.out.println("從kafka接收到的消息是:" + message);
                }
            }
//同步或者異步提交
consumer.commitSync()
//異步
//consumer.commitAsync()

 

auto.offset.reset:這個參數是針對新的groupid中的消費者而言的,

auto.offset.reset=latest情況下,新的消費者將會從其他消費者最后消費的offset處開始消費Topic下的消息
auto.offset.reset= earliest情況下,新的消費者會從該topic最早的消息開始消費 
auto.offset.reset=none情況下,新的消費者加入以后,由於之前不存在offset,則會直接拋出異常。

 

 

-------RabbitMQ------

RabbitMQ 提供了消費者的消息確認機制(message acknowledgement),消費 者可以自動或者手動地發送 ACK 給服務端。

消費者在訂閱隊列時,可以指定 autoAck 參數,當 autoAck 等於 false 時,RabbitMQ 會等待消費者顯式地回復確認信號后才從隊列中移去消息。

設置:

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
或者在spring 配置文件中設置
spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual

 

akg返回值有三種

NONE:自動 ACK 
MANUAL: 手動 ACK
AUTO:如果方法未拋出異常,則發送 ack。
  當拋出 AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕且不重新入隊。   當拋出 ImmediateAcknowledgeAmqpException 異常,則消費者會發送 ACK。   其他的異常,則消息會被拒絕,且 requeue
= true 會重新入隊。

 

其他:

  生產者如何確認消息已經被確認:

1) 消費者收到消息,處理完畢后,調用生產者的 API(思考:是否破壞解耦?) 
2) 消費者收到消息,處理完畢后,發送一條響應消息給生產者

  

-------RocketMQ------

RocketMQ提供了兩種消息消費模型,一種是pull主動拉去,另一種是push,被動接收。但實際上 RocketMQ都是pull模式,只是push在pull模式上做了一層封裝,也就是pull到消息以后觸發業務消費 者注冊到這里的callback. RocketMQ是基於長輪訓來實現消息的pull。

 

 


 

存儲方式對比

從主流的幾種MQ消息隊列采用的存儲方式來看,主要會有三種

  1. 分布式KV存儲,比如ActiveMQ中采用的levelDB、Redis, 這種存儲方式對於消息讀寫能力要求不高的情況下可以使用

  2. 文件系統存儲,常見的比如kafka、RocketMQ、RabbitMQ都是采用消息刷盤到所部署的機器上的文件系統來做持久化,這種方案適合對於有高吞吐量要求的消息中間件,因為消息刷盤是一種高效率,高可靠、高性能的持久化方式,除非磁盤出現故障,否則一般是不會出現無法持久化的問題

  3. 關系型數據庫,比如ActiveMQ可以采用mysql作為消息存儲,關系型數據庫在單表數據量達到千 萬級的情況下IO性能會出現瓶頸,所以ActiveMQ並不適合於高吞吐量的消息隊列場景。

總的來說,對於存儲效率,文件系統要優於分布式KV存儲,分布式KV存儲要優於關系型數據庫

--------Kafka--------

  Topic的多個partition在物理磁盤上的保存路徑為/tmp/kafka-logs/topic_partition,kafka是通過分段的方式將Log分為多個LogSegment,LogSegment是一個邏輯上的概念,一個 LogSegment對應磁盤上的一個日志文件和一個索引文件,其中日志文件是用來記錄消息的。

索引文件是稀疏索引(索引項中只對應主文件中的部分記錄),用來保存一段index的區間。

  一個消息的查找算法:

1. 根據offset的值,查找segment段中的index索引文件。由於索引文件命名是以上一個文件的最后 一個offset進行命名的,所以,使用二分查找算法能夠根據offset快速定位到指定的索引文件。
2. 找到索引文件后,根據offset進行定位,找到索引文件中的符合范圍的索引。(kafka采用稀疏索 引的方式來提高查找性能)
3. 得到position以后,再到對應的log文件中,從position出開始查找offset對應的消息,將每條消息 的offset與目標offset進行比較,直到找到消息

 因為Kafka是使用文件存儲,采用的是順序寫順序讀,針對這種情況,kafka的優化方案包括:

零拷貝:
  在Linux中,是通過sendfile系 統調用來完成的。Java提供了訪問這個系統調用的方法:FileChannel.transferTo API
頁緩存:
    Kafka中大量使用了頁緩存, 這是Kafka實現高吞吐的重要因素之一 。 
  包括同步刷盤及間斷性強制刷盤(fsync), 可以通過 log.flush.interval.messages 和 log.flush.interval.ms 參數來控制。
   同步刷盤能夠保證消息的可靠性,避免因為宕機導致頁緩存數據還未完成同步時造成的數據丟失。但是實際使用上,我們沒必要去考慮這樣的因素以及這種問題帶來的損失,消息可靠性可以由多副本來解決,同步刷盤會帶來性能的影響。 刷盤的操作由操作系統去完成即可

 

 

 

-------RabbitMQ------

 不管是持久化的消息還是非持久化的消息都可以被寫入到磁盤。持久化的消息在到達隊列時就被寫入到磁盤,並且如果可以,持久化的消息也會在內存中保存一個備份,這樣就可以提高一定的性能,當內存吃緊的時候會從內存中清除。非持久化的消息一般只保存在內存中,在內存吃緊的時候會被換入到磁盤中,以節省內存空間。這兩種類型的消息的落盤處理都在RabbitMQ的“持久層”中完成。

RabbitMQ的持久層只是一個邏輯上的概念,實際包含兩個部分:

  • 隊列索引(rabbit_queue_index):負責維護隊列中落盤消息的信息,包括消息的存儲地點、是否己被交付給消費者、是否己被消費者ack等。 每個隊列都有與之對應的一個rabbit_queue_index
  • 消息存儲(rabbit_msg_store):以鍵值對的形式存儲消息,它被所有vhost中的隊列共享,在每個vhost中有且只有一個。rabbit_msg_store具體還可以分為 msg_store_persistent和msg_store_transient,msg_store_persistent負責持久化消息的持久化,重啟后消息不會丟失;msg_store_transient負責 非持久化消息的持久化,重啟后消息會丟失。

-------RocketMQ------

RocketMQ就是采用文件系統的方式來存儲消息,消息的存儲是由ConsumeQueue和CommitLog配合完成的。

CommitLog

CommitLog是用來存放消息的物理文件,有點類似於數 據庫的索引文件,每個broker上的commitLog本當前機器上的所有 consumerQueue共享,不做任何的區分。

CommitLog中的文件默認大小為1G,可以動態配置; 當一個文件寫滿以后,會生成一個新的 commitlog文件。所有的Topic數據是順序寫入在CommitLog文件中的。

文件名的長度為20位,左邊補0,剩余未起始偏移量,比如00000000000000000000 表示第一個文件, 文件大小為102410241024,當第一個文件寫滿之后,生成第二個文件

ConsumeQueue

consumeQueue表示消息消費的邏輯隊列,這里面包含MessageQueue在commitlog中的其實物理位 置偏移量offset,消息實體內容的大小和Message Tag的hash值。

對於實際物理存儲來說, consumeQueue對應每個topic和queueid下的文件,每個consumeQueue類型的文件也是有大小,每個文件默認大小約為600W個字節,如果文件滿了后會也會生成一個新的文件

 

  1. RocketMQ的高性能在於順序寫盤(CommitLog)、零拷貝和跳躍讀(盡量命中PageCache),高可靠性在於刷盤和Master/Slave,
  2. 另外NameServer 全部掛掉不影響已經運行的Broker,Producer,Consumer。
  3. 發送消息負載均衡,且發送消息線程安全(可滿足多個實例死循環發消息),集群消費模式下消費者端負載均衡,這些特性加上上述的高性能讀寫, 共同造就了RocketMQ的高並發讀寫能力。
  4. 刷盤和主從同步均為異步(默認)時,broker進程掛掉(例如重啟),消息依然不會丟失,因為broker shutdown時會執行persist。 當物理機器宕機時,才有消息丟失的風險。
  5. 另外,master掛掉后,消費者從slave消費消息,但slave不能寫消息。

 

 

  

與Kafka消息隊列的比較

  kafka中Topic的Partition數量過多,隊列文件會過多,那么會給磁盤的IO讀寫造成比較大的壓力,也就造成了性能瓶頸。所以RocketMQ進行了優化,消息主題統一存儲在CommitLog中。

當然,這種設計並不是銀彈,它也有它的優缺點

優點在於:由於消息主題都是通過CommitLog來進行讀寫,ConsumerQueue中只存儲很少的數據, 所以隊列更加輕量化。對於磁盤的訪問是串行化從而避免了磁盤的競爭

缺點在於:消息寫入磁盤雖然是基於順序寫,但是讀的過程確是隨機的。讀取一條消息會先讀取 ConsumeQueue,再讀CommitLog,會降低消息讀的效率。

 

 


 

過期方式,消除策略對比

--------Kafka--------

日志的清理策略有兩個

1. 根據消息的保留時間,當消息在kafka中保存的時間超過了指定的時間,就會觸發清理過程
2. 根據topic存儲的數據大小,當topic所占的日志文件大小大於一定的閥值,則可以開始刪除最舊的消息。kafka會啟動一個后台線程,定期檢查是否存在可以刪除的消息

-------RabbitMQ------

數據消費完了,就會被移除,不能重復消費

-------RocketMQ------

1. 消息文件過期(默認72小時),且到達清理時點(默認是凌晨4點),刪除過期文件。
2. 消息文件過期(默認72小時),且磁盤空間達到了水位線(默認75%),刪除過期文件。
3. 磁盤已經達到必須釋放的上限(85%水位線)的時候,則開始批量清理文件(無論是否過期),直

到空間充足。 注:若磁盤空間達到危險水位線(默認90%),出於保護自身的目的,broker會拒絕寫入服務。

 


 

UI

--------Kafka--------

-------RabbitMQ------

RabbitMQ 可以通過命令(RabbitMQ CLI)、HTTP API 管理,也可以通過可視化 的界面去管理,這個網頁就是 managment 插件。 

ul默認地址是15672

linux啟動插件。
cd /usr/lib/rabbitmq/bin
./rabbitmq-plugins enable rabbitmq_management

 

-------RocketMQ------

rocket官方提供了一個可視化控制台,地址:https://github.com/apache/rocketmq-externals

這個是rocketmq的擴展,里面不僅包含控制台的擴展,也包含對大數據flume、hbase等組件的對接和 擴展。


 

消息容錯-死信隊列

-------RabbitMQ------

TTL(Time To Live)

過期時間設置分兩種

1) 通過隊列屬性設置消息過期時間 所有隊列中的消息超過時間未被消費時,都會過期。 

2)設置單條消息的過期時間 在發送消息的時候指定消息屬性。 

//第一種
@Bean("ttlQueue") public Queue queue() {
Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 11000); // 隊列中的消息未被消費 11 秒后過期 return new Queue("GP_TTL_QUEUE", true, false, false, map);
}

//第二種
MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("4000"); // 消息的過期屬性,單位 ms
Message message = new Message("這條消息 4 秒后過期".getBytes(), messageProperties); rabbitTemplate.send("GP_TTL_EXCHANGE", "gupao.ttl", message);

什么情況下消息會變成死信?

1)消息被消費者拒絕並且未設置重回隊列:(NACK || Reject ) && requeue == false 

2)消息過期 

3)隊列達到最大長度,超過了 Max length(消息數)或者 Max length bytes (字節數),最先入隊的消息會被發送到 DLX 

如何使用?

  在聲明正常隊列的時候配置好死信隊列(交換機) 

//聲明隊列時,指定死信隊列
@Bean("oriUseQueue") public Queue queue() { 
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl", 10000); 
// 10 秒鍾后成為死信map.put("x-dead-letter-exchange", "DEAD_LETTER_EXCHANGE");
// 隊列中的消息變成死信后,進入死信交換機,找到匹配死信隊列 return new Queue(“commonQueue”, true, false, false, map); }

 

擴展:

 如何構造延遲隊列?總的來說有三種實現方案

1、 先存儲到數據庫,用定時任務掃描 

2、 利用 RabbitMQ 的死信隊列(Dead Letter Queue)實現 

3利用 rabbitmq-delayed-message-exchange 插件 


流量控制

-------Kafka------

max.poll.records:

 此設置限制每次調用poll返回的消息數,這樣可以更容易的預測每次poll間隔要處理的最大值。通過調 整此值,可以減少poll間隔 

-------RabbitMQ------

服務端流控(Flow Control) 

  隊列有兩個控制長度的屬性: 

x-max-length:隊列中最大存儲最大消息數,超過這個數量,隊頭的消息會被丟 棄。
x-max-length-bytes:隊列中存儲的最大消息容量(單位 bytes),超過這個容 量,隊頭的消息會被丟棄。 

 

消費端限流 

可以基於 Consumer 或者 channel 設置 prefetch count 的值,含義為 Consumer 端的最大的 unacked messages 數目。當超過這個數值的消息未被確認,RabbitMQ 會 停止投遞新的消息給該消費者。 

channel.basicQos(2); // 如果超過 2 條消息沒有發送 ACK,當前消費者不再接受隊列消息 channel.basicConsume(QUEUE_NAME, false, consumer); 

//SimpleMessageListenerContainer 配置
container.setPrefetchCount(2); 

//Spring Boot 配置: 
spring.rabbitmq.listener.simple.prefetch=2 

 

 

 

 好文章參考:

https://www.cnblogs.com/cyfonly/p/5954614.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM