碼字好辛苦的,這篇文章反反復復修改了好幾天,如果榮幸被轉載的話, 請注明來源 https://www.cnblogs.com/snow-man/p/10062394.html
用途,背景
--------Kafka--------
是LinkedIn開源的分布式發布-訂閱消息系統,目前歸屬於Apache定級項目。Kafka主要特點是基於Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用於日志收集和傳輸。0.8版本開始支持復制,不支持事務,對消息的重復、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務(行為跟蹤,日志收集等)。
默認通訊接口9092
-------RabbitMQ------
是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。本身支持很多的協議:AMQP,XMPP, SMTP, STOM它變的非常重量級,更適合於企業級的開發。同時實現了一個經紀人(Broker)構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)或者數據持久化都有很好的支持。
特點: 通過交換機(Exchange)實現消息的靈活路由。
默認通訊接口5672,webui端口15672
-------RocketMQ------
是阿里開源的消息中間件,它是純Java開發,具有高吞吐量、高可用性、適合大規模分布式系統應用的特點。RocketMQ思路起源於Kafka,但並不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優化,目前在阿里集團被廣泛應用於交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景。
特點:
1 億級別消息堆積能力; 2 采用零拷貝的原理,順序寫盤,隨機讀; 3 底層通信框架采用Netty NIO; 4 NameServer代替Zookeeper,實現服務尋址和服務協調; 5 消息失敗重試機制、消息可查詢;經過多次雙十一的考驗
默認端口9876
集群對比
--------Kafka--------
一個典型的kafka集群包含若干Producer(可以是應用節點產生的消息,也可以是通過Flume收集日志 產生的事件),若干個Broker(kafka支持水平擴展)、若干個Consumer Group,以及一個 zookeeper集群。
kafka通過zookeeper管理集群配置及服務協同。Producer使用push模式將消息發布到broker,consumer通過監聽使用pull模式從broker訂閱並消費消息。
多個broker協同工作,producer和consumer部署在各個業務邏輯中。三者通過zookeeper管理協調請 求和轉發。這樣就組成了一個高性能的分布式消息發布和訂閱系統。
-------RabbitMQ------
因為 Erlang 天生具備分布式的特性, 所以 RabbitMQ 天然支持集群,不需要通過引入 ZK 或者數據庫來實現數據同步。RabbitMQ 通過/var/lib/rabbitmq/.erlang.cookie 來驗證身份
RabbitMQ 分兩種節點
一種是磁盤節點(Disc Node),一種是內存節點(RAM Node)。
集群中至少需要一個磁盤節點用來持久化元數據,否則全部內存節點崩潰時,就無 從同步元數據。未指定類型的情況下,默認為磁盤節點。
RabbitMQ集群分兩種:普通集群、鏡像集群
普通集群模式:
不同的節點之間只會相互同步元數據,隊列不同步,只在自己的broker中,普通集群模式不能保證隊列的高可用性,因為隊列內容不會復制。如果節點失效將導致相關隊列不可用
鏡像隊列模式:
消息內容會在鏡像節點間同步,保證 100% 數據不丟失。在實際工作中也是用得最多的,並且實現非常的簡單,一般互聯網大廠都會構建這種鏡像集群模式。
mirror 鏡像隊列,目的是為了保證 rabbitMQ 數據的高可靠性解決方案,主要就是實現數據的同步,一般來講是 2-3個節點實現數據同步。對於 100% 數據可靠性解決方案,一般是采用3個節點。
-------RocketMQ------
RocketMQ由四部分組成
1)、Name Server 可集群部署,節點之間無任何信息同步。提供輕量級的服務發現和路由
2)、Broker(消息中轉角色,負責存儲消息,轉發消息) 部署相對復雜,Broker 分為Master 與Slave,一 個Master 可以對應多個Slave,但是一個Slave 只能對應一個Master,
Master 與Slave 的對應關系通過 指定相同的BrokerName,不同的BrokerId來定 義,BrokerId為0 表示Master,非0 表示Slave。 Master 也可以部署多個。 3)、Producer,生產者,擁有相同 Producer Group 的 Producer 組成一個集群, 與Name Server 集群 中的其中一個節點(隨機選擇)建立長連接,
定期從Name Server 取Topic 路由信息,並向提供Topic 服務的Master 建立長連接,且定時向Master 發送心跳。Producer 完全無狀態,可集群部署。 4)、Consumer,消費者,接收消息進行消費的實例,擁有相同 Consumer Group 的 Consumer 組成 一個集群,與Name Server 集群中的其中一個節點(隨機選擇)建立長連接,
定期從Name Server 取 Topic 路由信息,並向提供Topic 服務的Master、Slave 建立長連接,且定時向Master、Slave 發送心跳。
Consumer既可以從Master 訂閱消息,也可以從Slave 訂閱消息,訂閱規則由Broker 配置決定。
要使用rocketmq,至少需要啟動兩個進程,nameserver、broker,前者是各種topic注冊中心,后者是真正的broker。
其他一些概念:
NameServer可以部署多個,相互之間獨立,其他角色同時向多NameServer機器上報狀態信息,從而達到熱備份的目的。
NameServer本身是無狀態的,也就是說NameServer中的Broker、Topic等狀態信息不會持久存儲,都是由各個角色定時上報並 存儲到內存中的(NameServer支持配置參數的持久化,一般用不到)。
為何不用ZooKeeper?
ZooKeeper的功能很強大,包括自動Master選舉等,RocketMQ的架構設計決定了它不需要進行Master選舉, 用不到這些復雜的功能,只需要一個輕量級的元數據服務器就足夠了。
值得注意的是,NameServer並沒有提供類似Zookeeper的watcher機制, 而是采用了每30s心跳機制。
心跳機制
- 單個Broker跟所有Namesrv保持心跳請求,心跳間隔為30秒,心跳請求中包括當前Broker所有的Topic信息。Namesrv會反查Broer的心跳信息, 如果某個Broker在2分鍾之內都沒有心跳,則認為該Broker下線,調整Topic跟Broker的對應關系。但此時Namesrv不會主動通知Producer、Consumer有Broker宕機。
- Consumer跟Broker是長連接,會每隔30秒發心跳信息到Broker。Broker端每10秒檢查一次當前存活的Consumer,若發現某個Consumer 2分鍾內沒有心跳, 就斷開與該Consumer的連接,並且向該消費組的其他實例發送通知,觸發該消費者集群的負載均衡(rebalance)。
- 生產者每30秒從Namesrv獲取Topic跟Broker的映射關系,更新到本地內存中。再跟Topic涉及的所有Broker建立長連接,每隔30秒發一次心跳。 在Broker端也會每10秒掃描一次當前注冊的Producer,如果發現某個Producer超過2分鍾都沒有發心跳,則斷開連接.
原文鏈接:https://blog.csdn.net/javahongxi/article/details/84931747
RocketMQ天生對集群的支持非常友好
1)單Master
優點:除了配置簡單沒什么優點
缺點:不可靠,該機器重啟或宕機,將導致整個服務不可用
2)多Master 優點:配置簡單,性能最高 缺點:可能會有少量消息丟失(配置相關),單台機器重啟或宕機期間,該機器下未被消費的消息在機 器恢復前不可訂閱,影響消息實時性 3)多Master多Slave,每個Master配一個Slave,有多對Master-Slave,集群采用異步復制方式,主備有短暫消息延遲,毫秒級 優點:性能同多Master幾乎一樣,實時性高,主備間切換對應用透明,不需人工干預 缺點:Master宕機或磁盤損壞時會有少量消息丟失 4)多Master多Slave,每個Master配一個Slave,有多對Master-Slave,集群采用同步雙寫方式,主備都寫成功,向應用返回成功 優點:服務可用性與數據可用性非常高 缺點:性能比異步集群略低,當前版本主宕備不能自動切換為主
Broker特點及分區容錯、副本機制
--------Kafka--------
1. Broker 不維護數據消費狀態,只是負責數據的順序讀寫,功能單一,不需要創建對象及GC操作,效率高。
2. kafka各broker關系: 各broker之間關系平等, 只有具體topic下的partition才有主從關系,其中master節點負責client的讀寫,follower不負責備份(ISR機制)。
3. Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把 這條消息放進哪個queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個 Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。【分區是為了性能,副本是為了容錯】
4. kafka包括一個默認的topic: consumer__*, 默認是50個分區,分區時機是(分區位置= groupid.hascode()%50)記錄每個ConsumerGroup的offset信息,這個之前是存儲在zk中的。后來考慮的ZK性能問題,改村到kafka自己的topic中
partition 分區副本
kafka 一個Topic下分成若干個partition(partition數量不得超過broker數量),每個partition可以有若干個副本。
好處是便於數據橫向擴展,提高相率,同時支持災難恢復。 副本集中只有一個leader,若干個follower, leader負責數據的讀寫,follower只負責數據備份。
幾個概念
leader副本:響應clients端讀寫請求的副本
follower副本:被動地備份leader副本中的數據,不能響應clients端讀寫請求。 ISR副本:(In Sync Relica)包含了leader副本和所有與leader副本保持同步的follower副本 LEO:即日志末端位移(log end offset),告訴producer,數據已經錄入 HW:即上面提到的水位值。HW值不會大於LEO值。用來對應consumer--數據可以消費了,
leader分區中 HW~LEO 之間的的表示leader已經存儲,但是還沒有同步到follower
producer寫入流程大體如下:
1. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader 2. producer 將消息發送給該 leader 3. leader 將消息寫入本地 log 4. followers 從 leader pull 消息,寫入本地 log 后 leader 發送 ACK 5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 並向 producer 發送 ACK
ISR必須滿足如下條件:
leader leo與follower leo同步時間不能太長replica.lag.time.max.min,如果該follower在此時間間隔內一直沒有追 上過leader的所有消息,則該follower就會被剔除isr列表
副本Leader選舉
當partition leader宕機時,zk會啟動Leader選舉
a) 優先從isr列表中選出第一個作為leader副本,這個叫優先副本,理想情況下有限副本就是該分區的leader副本
b) 如果isr列表為空,則查看該topic的unclean.leader.election.enable配置。
為true則代表允許選用非isr列表的副本作為leader,那么此時就意味着數據可能丟失,為false的話,則表示不允許,直接拋出NoReplicaOnlineException異常,造成leader副本選舉失敗。
c) 如果上述配置為true,則從其他副本中選出一個作為leader副本,並且isr列表只包含該leader 副本。一旦選舉成功,則將選舉后的leader和isr和其他副本信息寫入到該分區的對應的zk路徑上。
-------RabbitMQ------
消息從交換機路由到隊列如果保證可靠性。
有兩種方式處理交換機無法路由隊列的問題,一種就是讓服務端重發給生產者,一種是讓 交換機路由到另一個備份的交換機。
第一種方式是消息回發,添加ReturnListener,同時Mandatory=false.
channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("=========監聽器收到了無法路由,被返回的消息============"); } }); // 第三個參數是設置的mandatory,如果mandatory是false,消息也會被直接丟棄 channel.basicPublish("","gupaodirect",true, properties,"只為更好的你".getBytes());
第二種方式:
消息路由到備份交換機的方式:在創建交換機的時候,從屬性中指定備份交換機。
Map<String,Object> arguments = new HashMap<String,Object>(); arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); // 指定交換機的備份交換機 channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
-------RocketMQ------
消息類型
Broker上存Topic信息,Topic由多個隊列組成,隊列會平均分散在多個Broker上。Producer的發送機制保證消息盡量平均分布到 所有隊列中,最終效果就是所有消息都平均落在每個Broker上。
1.普通消息
2.有序消息
有序消息就是按照一定的先后順序的消息類型。
- 全局有序消息:只有一個隊列一個消費者,效率受限
- 局部有序消息:配置算法,讓所有相關消息進入同一個隊列。
3.延時消息
延時消息,簡單來說就是當 producer 將消息發送到 broker 后,會延時一定時間后才投遞給 consumer 進行消費。
消息分發策略
--------Kafka--------
消息是kafka中最基本的數據單元,在kafka中,一條消息由key、value兩部分構成,
在發送一條消息時,我們可以指定這個key,那么producer會根據key和partition機制來判斷當前這條消息應該發送並存儲到哪個partition中。
我們可以根據需要進行擴展producer的partition機制。
一個topic下多個partition對應consumer group的分配策略包括三種
1. RangeAssignor(范圍分區): 分區按照序號進行排序,消費者按照字 母順序進行排序 2. RoundRobinAssignor(輪詢分區): 輪詢分區策略是把所有partition和所有consumer線程都列出來,然后按照hashcode進行排序。最后通過輪詢算法分配partition給消費線程 3. StrickyAssignor 分配策略 粘性策略的目的是兩個:分區的分配盡可能的均勻,分區的分配盡可能和上次分配保持相同
producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪一個 partition。其路由機制為:
1. 指定了 patition,則直接使用; 2. 未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition 3. patition 和 key 都未指定,使用輪詢選出一個 patition。
什么時候會引起kafka consumer的rebalance
1. 同一個consumer group內新增了消費者
2. 消費者離開當前所屬的consumer group,比如主動停機或者宕機
3. topic新增了分區(也就是分區數量發生了變化)
小結:
消費者和分區的綁定關系是固定的,除非其他原因引起的Rebalance操作, 這樣最大的好處是不需要考慮leader選舉的情況,更不用開率並發的情況,一切為了效率。
-------RabbitMQ------
除了AMQP 之外,RabbitMQ 支持多種協議,STOMP、MQTT、HTTP and WebSockets。
特有組件,VHost,Exchange,Channel
VHost: VHOST 除了可以提高硬件資源的利用率之外,還可以實現資源的隔離和權限的控制,它的作用類似於編程語言中的 namespace 和 package,不同的 VHOST 中可以有 同名的 Exchange 和 Queue,它們是完全透明的。 Exchange: 交換機是一個綁定列表,用來查找匹配的綁定關系。隊列使用綁定鍵(Binding Key)跟交換機建立綁定關系。 生產者發送的消息需要攜帶路由鍵(Routing Key),
交換機收到消息時會根據它保存的綁定列表,決定將消息路由到哪些與它綁定的隊列上
Channel:
AMQP 里面引入了 Channel 的概念,它是一個虛擬的連接。我們就可以在保持的 TCP 長連接里面去創建和釋放 Channel,大大了減少了資源消耗.
Queue:
隊列是真正用來存儲消息的,是一個獨立運行的進程,有自己的數據庫(Mnesia)。
路由方式有三種:Direct,Topic,Fanout
Direct : Exchange+bingKey 精確綁定 Topic: Exchange+bingKey(綁定鍵中使用通配符) Fanout 只需要Exchange,不需要bingKey
-------RocketMQ------
消息過濾:
在 RocketMQ 中消費者是可以按照 Tag 對消息進行過濾。對於消息分類,我們可以選擇創建多個 Topic 來區分,也可以選擇在同一個 Topic 下創建多個 tag 來區分。這兩種方式都是可行的,但是一般情況下,不同的 Topic 之間的消息是沒有什么必然聯系的,使用 tag 來區分同一個 Topic 下相互關聯的消息則更加合適一些。
消息重試:
概念:就是當消費者消費消息失敗后,broker 會重新投遞該消息,直到消費成功,消息重試只針對集群消費模式,廣播消費沒有消息重試的特性。如果消費重試一直失敗,最終消息會進入死信隊列,我們還需要對死信隊列做單獨的補償機制。
消費重試來源:
返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 返回 null 拋出異常
Producer => Broker可靠性投遞
--------Kafka--------
kafka對於消息的發送,可以支持同步和異步。
從本質上來說,kafka都是采用異步的方式來發送消息到broker,但是kafka並不是每次發送消息都會直接發送到broker上,而是把消息放到了一個發送隊列中,然后通過一個后台線程不斷從隊列取出消息進行發送,發送成功后會觸發callback。
kafka客戶端會積累一定量的消息統一組裝成一個批量消息發送出 去,觸發條件是前面提到的batch.size和linger.ms
batch.size和linger.ms這兩個參數是kafka性能優化的關鍵參數
生產者發送消息的可靠性
也就是我要保證我這個消息一定是到了broker並且完成了多副本的持久化,配置項為request.required.acks,如:properties.put("request.required.acks","-1");。
它有幾個可選項
1: 生產者把消息發送到leader副本,leader副本在成功寫入到本地日志之后就告訴生產者消息提交成功,但是如果isr集合中的follower副本還沒來得及同步leader副本的消息, leader掛了,就會造成消息丟失。
-1 :消息不僅僅寫入到leader副本,並且被ISR集合中所有副本同步完成之后才告訴生產者已 經提交成功,這個時候即使leader副本掛了也不會造成數據丟失。
0:表示producer不需要等待broker的消息確認。這個選項時延最小但同時風險最大(因為 當server宕機時,數據將會丟失)。
Kafka delivery guarantee(message傳送保證):
(1)At most once消息可能會丟,絕對不會重復傳輸;
(2)At least once 消息絕對不會丟,但是可能會重復傳輸; (3)Exactly once每條信息肯定會被傳輸一次且僅傳輸一次,這是用戶想要的。
說明:當 producer 向 broker 發送消息時,一旦這條消息被 commit,由於 replication 的存在,它就不會丟。但是如果 producer 發送數據給 broker 后,遇到網絡問題而造成通信中斷,那 Producer 就無法判斷該條消息是否已經 commit。雖然 Kafka 無法確定網絡故障期間發生了什么,但是 producer 可以生成一種類似於主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了 Exactly once,但目前還並未實現。所以目前默認情況下一條消息從 producer 到 broker 是確保了 At least once,可通過設置 producer 異步發送實現At most once。
-------RabbitMQ------
整個RabbitMQ的架構如下
在Producer發送消息到Broker整個流程中,Broker確認機制有兩種。第一種是 Transaction(事務)模式,第二種 Confirm(確認)模式。
1. 事務機制:
我們通過一個 channel.txSelect()開啟事務了,發送成功之后 channel.txCommit();否則使用channel.txRollback()
spring boot中設置rabbitTemplate.setChannelTransacted(true);
try {
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); channel.txCommit(); System.out.println("消息發送成功"); } catch (Exception e) { channel.txRollback(); System.out.println("消息已經回滾"); }
缺點:它是阻塞的,一條消息沒有發送完畢,不能發送下一條消息,它會榨干 RabbitMQ 服務器的性能。所以不建 議大家在生產環境使用。
2. 確認(Confirm)模式
有三種
1. 是普通確認模式。 channel.confirmSelect() ,缺點:發送 1 條確認 1 條
2. 批量確認模式:channel.waitForConfirmsOrDie();
3. 異步確認: 添加ConfirmListener , channel.addConfirmListener(new ConfirmListener())
demo:
channel.addConfirmListener(new ConfirmListener() {
public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Broker未確認消息,標識:" + deliveryTag); if (multiple) { // headSet表示后面參數之前的所有元素,全部刪除 confirmSet.headSet(deliveryTag + 1L).clear(); } else { confirmSet.remove(deliveryTag); } // 這里添加重發的方法 } public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 如果true表示批量執行了deliveryTag這個值以前(小於deliveryTag的)的所有消息,如果為false的話表示單條確認 System.out.println(String.format("Broker已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple)); if (multiple) { // headSet表示后面參數之前的所有元素,全部刪除 confirmSet.headSet(deliveryTag + 1L).clear(); } else { // 只移除一個元素 confirmSet.remove(deliveryTag); } System.out.println("未確認的消息:"+confirmSet); } });
-------RocketMQ------
SendResult中,有一個sendStatus狀態,表示消息的發送狀態。一共有四種狀態
1. FLUSH_DISK_TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策Ill創立設置成 SYNC_FLUSH 才會報這個錯誤) 。
2. FLUSH_SLAVE_TIMEOUT :表示在主備方式下,並且Broker 被設置成SYNC_MASTER 方式,沒有 在設定時間內完成主從同步。 3. SLAVE_NOT_AVAILABLE : 這個狀態產生的場景和FLUSH_SLAVE_TIMEOUT 類似, 表示在主備方 式下,並且Broker 被設置成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。 4. SEND OK :表示發送成功。
RocketMQ消息支持的模式
1.NormalProducer(普通) /消息同步發送
普通消息的發送和接收在前面已經演示過了,在上面的案例中是基於同步消息發送模式。也就是說消息 發送出去后,producer會等到broker回應后才能繼續發送下一個消息
2.消息異步發送
MQ 的異步發送,需要用戶實現異步發送回調接口(SendCallback)。
3.OneWay
單向(Oneway)發送特點為發送方只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答.效率最高
半消息(HalfMessage):
指的是發送方已經將消息發送給MQ服務器,但是服務器端未收到生產者對該消息的二次確認,此時消息就會被標記成 "暫不能投遞狀態" ,處於該狀態的消息即
半消息消息回查(MessageStatusCheck):
由於網絡閃斷,生產者應用重啟等原因,導致某條事務消息的二次確認丟失。MQ服務器通過掃描發現某條消息長時間處於 "半消息" 時,需要主動向消息生產者詢問該消息的最終狀態(Commit還是Rollback),該過程就是消息回查。
RocketMQ事務消息的三種返回狀態
1. ROLLBACK_MESSAGE:回滾事務
2. COMMIT_MESSAGE: 提交事務 3. UNKNOW: broker會定時的回查Producer消息狀態,直到徹底成功或失敗。
當executeLocalTransaction方法返回ROLLBACK_MESSAGE時,表示直接回滾事務,當返回 COMMIT_MESSAGE提交事務
當返回UNKNOW時,Broker會在一段時間之后回查checkLocalTransaction,根據 checkLocalTransaction返回狀態執行事務的操作(回滾或提交),
RocketMQ消息的事務架構設計
1.生產者執行本地事務,修改訂單支付狀態,並且提交事務
2.生產者發送事務消息到broker上,消息發送到broker上在沒有確認之前,消息對於consumer是不可見狀態
3.生產者確認事務消息,使得發送到broker上的事務消息對於消費者可見
4.消費者獲取到消息進行消費,消費完之后執行ack進行確認
5.這里可能會存在一個問題,生產者本地事務成功后,發送事務確認消息到broker上失敗了怎么辦?這個時候意味着消費者無法正常消費到這個消息。所以RocketMQ提供了消息回查機制,如果 事務消息一直處於中間狀態,broker會發起重試去查詢broker上這個事務的處理狀態。一旦發現事務處理成功,則把當前這條消息設置為可見。
Broker => Consumer 可靠性消費
--------Kafka--------
consumer group中所有consumer平均消費同一個topic下的消息,並且每個consumer只能消費一個分區。
coordinator(負載最小的broker)來執行對於consumer group的管理(Rebalance),以及確定整個consumer group的使用的分區策略
(每個消費者可以有自己的分區策略,但最終策略呦coordinator確定)
enable.auto.commit:默認為true,也就是自動提交offset,自動提交是批量執行的,但會帶來重復提交或者消息丟失的問題,
對於高可靠性要求的程序,要使用手動提交。 對於高可靠要求的應用來說,寧願重復消費也不應該因為消費異常而導致消息丟失
Properties props = new Properties(); props.put("bootstrap.servers", "xxxxxx");//服務器ip:端口號,集群用逗號分隔 props.put("group.id", "test"); //取消自動回復 props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("itsm-test")); //消費 while(true){ ConsumerRecords<String, String> records = consumer.poll(100); if (records.count() > 0) { for (ConsumerRecord<String, String> record : records) { String message = record.value(); System.out.println("從kafka接收到的消息是:" + message); } } //同步或者異步提交 consumer.commitSync() //異步 //consumer.commitAsync()
auto.offset.reset:這個參數是針對新的groupid中的消費者而言的,
auto.offset.reset=latest情況下,新的消費者將會從其他消費者最后消費的offset處開始消費Topic下的消息 auto.offset.reset= earliest情況下,新的消費者會從該topic最早的消息開始消費 auto.offset.reset=none情況下,新的消費者加入以后,由於之前不存在offset,則會直接拋出異常。
-------RabbitMQ------
RabbitMQ 提供了消費者的消息確認機制(message acknowledgement),消費 者可以自動或者手動地發送 ACK 給服務端。
消費者在訂閱隊列時,可以指定 autoAck 參數,當 autoAck 等於 false 時,RabbitMQ 會等待消費者顯式地回復確認信號后才從隊列中移去消息。
設置:
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
或者在spring 配置文件中設置
spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual
akg返回值有三種
NONE:自動 ACK MANUAL: 手動 ACK
AUTO:如果方法未拋出異常,則發送 ack。
當拋出 AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕且不重新入隊。 當拋出 ImmediateAcknowledgeAmqpException 異常,則消費者會發送 ACK。 其他的異常,則消息會被拒絕,且 requeue = true 會重新入隊。
其他:
生產者如何確認消息已經被確認:
1) 消費者收到消息,處理完畢后,調用生產者的 API(思考:是否破壞解耦?)
2) 消費者收到消息,處理完畢后,發送一條響應消息給生產者
-------RocketMQ------
RocketMQ提供了兩種消息消費模型,一種是pull主動拉去,另一種是push,被動接收。但實際上 RocketMQ都是pull模式,只是push在pull模式上做了一層封裝,也就是pull到消息以后觸發業務消費 者注冊到這里的callback. RocketMQ是基於長輪訓來實現消息的pull。
存儲方式對比
從主流的幾種MQ消息隊列采用的存儲方式來看,主要會有三種
-
分布式KV存儲,比如ActiveMQ中采用的levelDB、Redis, 這種存儲方式對於消息讀寫能力要求不高的情況下可以使用
-
文件系統存儲,常見的比如kafka、RocketMQ、RabbitMQ都是采用消息刷盤到所部署的機器上的文件系統來做持久化,這種方案適合對於有高吞吐量要求的消息中間件,因為消息刷盤是一種高效率,高可靠、高性能的持久化方式,除非磁盤出現故障,否則一般是不會出現無法持久化的問題
-
關系型數據庫,比如ActiveMQ可以采用mysql作為消息存儲,關系型數據庫在單表數據量達到千 萬級的情況下IO性能會出現瓶頸,所以ActiveMQ並不適合於高吞吐量的消息隊列場景。
總的來說,對於存儲效率,文件系統要優於分布式KV存儲,分布式KV存儲要優於關系型數據庫
--------Kafka--------
Topic的多個partition在物理磁盤上的保存路徑為/tmp/kafka-logs/topic_partition,kafka是通過分段的方式將Log分為多個LogSegment,LogSegment是一個邏輯上的概念,一個 LogSegment對應磁盤上的一個日志文件和一個索引文件,其中日志文件是用來記錄消息的。
索引文件是稀疏索引(索引項中只對應主文件中的部分記錄),用來保存一段index的區間。
一個消息的查找算法:
1. 根據offset的值,查找segment段中的index索引文件。由於索引文件命名是以上一個文件的最后 一個offset進行命名的,所以,使用二分查找算法能夠根據offset快速定位到指定的索引文件。 2. 找到索引文件后,根據offset進行定位,找到索引文件中的符合范圍的索引。(kafka采用稀疏索 引的方式來提高查找性能) 3. 得到position以后,再到對應的log文件中,從position出開始查找offset對應的消息,將每條消息 的offset與目標offset進行比較,直到找到消息
因為Kafka是使用文件存儲,采用的是順序寫順序讀,針對這種情況,kafka的優化方案包括:
零拷貝:
在Linux中,是通過sendfile系 統調用來完成的。Java提供了訪問這個系統調用的方法:FileChannel.transferTo API
頁緩存:
Kafka中大量使用了頁緩存, 這是Kafka實現高吞吐的重要因素之一 。
包括同步刷盤及間斷性強制刷盤(fsync), 可以通過 log.flush.interval.messages 和 log.flush.interval.ms 參數來控制。
同步刷盤能夠保證消息的可靠性,避免因為宕機導致頁緩存數據還未完成同步時造成的數據丟失。但是實際使用上,我們沒必要去考慮這樣的因素以及這種問題帶來的損失,消息可靠性可以由多副本來解決,同步刷盤會帶來性能的影響。 刷盤的操作由操作系統去完成即可
-------RabbitMQ------
不管是持久化的消息還是非持久化的消息都可以被寫入到磁盤。持久化的消息在到達隊列時就被寫入到磁盤,並且如果可以,持久化的消息也會在內存中保存一個備份,這樣就可以提高一定的性能,當內存吃緊的時候會從內存中清除。非持久化的消息一般只保存在內存中,在內存吃緊的時候會被換入到磁盤中,以節省內存空間。這兩種類型的消息的落盤處理都在RabbitMQ的“持久層”中完成。
RabbitMQ的持久層只是一個邏輯上的概念,實際包含兩個部分:
- 隊列索引(rabbit_queue_index):負責維護隊列中落盤消息的信息,包括消息的存儲地點、是否己被交付給消費者、是否己被消費者ack等。 每個隊列都有與之對應的一個rabbit_queue_index
- 消息存儲(rabbit_msg_store):以鍵值對的形式存儲消息,它被所有vhost中的隊列共享,在每個vhost中有且只有一個。rabbit_msg_store具體還可以分為 msg_store_persistent和msg_store_transient,msg_store_persistent負責持久化消息的持久化,重啟后消息不會丟失;msg_store_transient負責 非持久化消息的持久化,重啟后消息會丟失。
-------RocketMQ------
RocketMQ就是采用文件系統的方式來存儲消息,消息的存儲是由ConsumeQueue和CommitLog配合完成的。
CommitLog
CommitLog是用來存放消息的物理文件,有點類似於數 據庫的索引文件,每個broker上的commitLog本當前機器上的所有 consumerQueue共享,不做任何的區分。
CommitLog中的文件默認大小為1G,可以動態配置; 當一個文件寫滿以后,會生成一個新的 commitlog文件。所有的Topic數據是順序寫入在CommitLog文件中的。
文件名的長度為20位,左邊補0,剩余未起始偏移量,比如00000000000000000000 表示第一個文件, 文件大小為102410241024,當第一個文件寫滿之后,生成第二個文件
ConsumeQueue
consumeQueue表示消息消費的邏輯隊列,這里面包含MessageQueue在commitlog中的其實物理位 置偏移量offset,消息實體內容的大小和Message Tag的hash值。
對於實際物理存儲來說, consumeQueue對應每個topic和queueid下的文件,每個consumeQueue類型的文件也是有大小,每個文件默認大小約為600W個字節,如果文件滿了后會也會生成一個新的文件
- RocketMQ的高性能在於順序寫盤(CommitLog)、零拷貝和跳躍讀(盡量命中PageCache),高可靠性在於刷盤和Master/Slave,
- 另外NameServer 全部掛掉不影響已經運行的Broker,Producer,Consumer。
- 發送消息負載均衡,且發送消息線程安全(可滿足多個實例死循環發消息),集群消費模式下消費者端負載均衡,這些特性加上上述的高性能讀寫, 共同造就了RocketMQ的高並發讀寫能力。
- 刷盤和主從同步均為異步(默認)時,broker進程掛掉(例如重啟),消息依然不會丟失,因為broker shutdown時會執行persist。 當物理機器宕機時,才有消息丟失的風險。
- 另外,master掛掉后,消費者從slave消費消息,但slave不能寫消息。
與Kafka消息隊列的比較
kafka中Topic的Partition數量過多,隊列文件會過多,那么會給磁盤的IO讀寫造成比較大的壓力,也就造成了性能瓶頸。所以RocketMQ進行了優化,消息主題統一存儲在CommitLog中。
當然,這種設計並不是銀彈,它也有它的優缺點
優點在於:由於消息主題都是通過CommitLog來進行讀寫,ConsumerQueue中只存儲很少的數據, 所以隊列更加輕量化。對於磁盤的訪問是串行化從而避免了磁盤的競爭
缺點在於:消息寫入磁盤雖然是基於順序寫,但是讀的過程確是隨機的。讀取一條消息會先讀取 ConsumeQueue,再讀CommitLog,會降低消息讀的效率。
過期方式,消除策略對比
--------Kafka--------
日志的清理策略有兩個
1. 根據消息的保留時間,當消息在kafka中保存的時間超過了指定的時間,就會觸發清理過程
2. 根據topic存儲的數據大小,當topic所占的日志文件大小大於一定的閥值,則可以開始刪除最舊的消息。kafka會啟動一個后台線程,定期檢查是否存在可以刪除的消息
-------RabbitMQ------
數據消費完了,就會被移除,不能重復消費
-------RocketMQ------
1. 消息文件過期(默認72小時),且到達清理時點(默認是凌晨4點),刪除過期文件。
2. 消息文件過期(默認72小時),且磁盤空間達到了水位線(默認75%),刪除過期文件。
3. 磁盤已經達到必須釋放的上限(85%水位線)的時候,則開始批量清理文件(無論是否過期),直
到空間充足。 注:若磁盤空間達到危險水位線(默認90%),出於保護自身的目的,broker會拒絕寫入服務。
UI
--------Kafka--------
-------RabbitMQ------
RabbitMQ 可以通過命令(RabbitMQ CLI)、HTTP API 管理,也可以通過可視化 的界面去管理,這個網頁就是 managment 插件。
ul默認地址是15672
linux啟動插件。 cd /usr/lib/rabbitmq/bin ./rabbitmq-plugins enable rabbitmq_management
-------RocketMQ------
rocket官方提供了一個可視化控制台,地址:https://github.com/apache/rocketmq-externals
這個是rocketmq的擴展,里面不僅包含控制台的擴展,也包含對大數據flume、hbase等組件的對接和 擴展。
消息容錯-死信隊列
-------RabbitMQ------
TTL(Time To Live)
過期時間設置分兩種
1) 通過隊列屬性設置消息過期時間 所有隊列中的消息超過時間未被消費時,都會過期。
2)設置單條消息的過期時間 在發送消息的時候指定消息屬性。
//第一種 @Bean("ttlQueue") public Queue queue() { Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 11000); // 隊列中的消息未被消費 11 秒后過期 return new Queue("GP_TTL_QUEUE", true, false, false, map); } //第二種 MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("4000"); // 消息的過期屬性,單位 ms Message message = new Message("這條消息 4 秒后過期".getBytes(), messageProperties); rabbitTemplate.send("GP_TTL_EXCHANGE", "gupao.ttl", message);
什么情況下消息會變成死信?
1)消息被消費者拒絕並且未設置重回隊列:(NACK || Reject ) && requeue == false
2)消息過期
3)隊列達到最大長度,超過了 Max length(消息數)或者 Max length bytes (字節數),最先入隊的消息會被發送到 DLX。
如何使用?
在聲明正常隊列的時候配置好死信隊列(交換機)
//聲明隊列時,指定死信隊列 @Bean("oriUseQueue") public Queue queue() { Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 10000);
// 10 秒鍾后成為死信map.put("x-dead-letter-exchange", "DEAD_LETTER_EXCHANGE");
// 隊列中的消息變成死信后,進入死信交換機,找到匹配死信隊列 return new Queue(“commonQueue”, true, false, false, map); }
擴展:
如何構造延遲隊列?總的來說有三種實現方案
1、 先存儲到數據庫,用定時任務掃描
2、 利用 RabbitMQ 的死信隊列(Dead Letter Queue)實現
3、 利用 rabbitmq-delayed-message-exchange 插件
流量控制
-------Kafka------
max.poll.records:
此設置限制每次調用poll返回的消息數,這樣可以更容易的預測每次poll間隔要處理的最大值。通過調 整此值,可以減少poll間隔
-------RabbitMQ------
服務端流控(Flow Control)
隊列有兩個控制長度的屬性:
x-max-length:隊列中最大存儲最大消息數,超過這個數量,隊頭的消息會被丟 棄。
x-max-length-bytes:隊列中存儲的最大消息容量(單位 bytes),超過這個容 量,隊頭的消息會被丟棄。
消費端限流
可以基於 Consumer 或者 channel 設置 prefetch count 的值,含義為 Consumer 端的最大的 unacked messages 數目。當超過這個數值的消息未被確認,RabbitMQ 會 停止投遞新的消息給該消費者。
channel.basicQos(2); // 如果超過 2 條消息沒有發送 ACK,當前消費者不再接受隊列消息 channel.basicConsume(QUEUE_NAME, false, consumer); //SimpleMessageListenerContainer 配置 container.setPrefetchCount(2); //Spring Boot 配置: spring.rabbitmq.listener.simple.prefetch=2
好文章參考:
https://www.cnblogs.com/cyfonly/p/5954614.html