mq
消息隊列 先進先出
1.為什么要使用mq?
異步 削峰 解耦
1.流量削峰
使用消息隊列做一個緩沖
2.應用解耦
可以解決系統之間的調用問題。如果物流系統出現故障,需要幾分鍾修復,通過消息隊列作為中間件,在這幾分鍾內,物流系統要處理的內存被緩存在消息隊列中,用戶可以正常下單。
缺點
3.異步處理
A調用B 只需要監聽b處理完成的消息,B處理完成之后,會發送一條消息給MQ ,MQ會將這條消息轉給A服務。
mq的種類
ActiveMQ
單機吞吐量高 時效性ms級,可用性高,消息可靠性高
官方社區對其維護越來越少,高吞吐量場景較少使用
Kafka
大數據領域內的消息傳輸 百萬級別吞吐量
優點 吞吐量高 時效期ms級,分布式 少數機器宕機,不會導致不可用,消息有序,能保證所有消息被消費且只能消費一次 在日志領域比較成熟
主要用於大數據領域的實時計算以及日志采集
缺點:消息失敗不支持重試 單機超過64個分區,load(CPU)會發生明顯的飆高
采用短輪詢方式,實時性取決於輪詢間隔時間
一台代理宕機,會產生亂序
Rocketmq
訂單 交易 充值 日志流式處理
優點:單機吞吐量十萬級 可用性高 分布式 消息可以做到0丟失 擴展性好 支持大量數據的數據堆積
缺點;支持語言少 支持java和c++
Rabbitmq
由於erlang的高並發性,吞吐量到萬級,支持多種語言,開源,提供了管理頁面,社區活躍性高
缺點;商業版需要收費
mq的選擇
Kafka 大量數據的互聯網公司
Rocketmq 金融互聯網
Rabbitmq 中小型公司
Rabbitmq
接收 存儲 轉發消息
Rabbitmq
接收 存儲 轉發消息
生產者 交換機 隊列 消費者
六大模式
簡單模式 工作模式 發布訂閱模式 路由模式 主題模式 發布確認模式
Broker 接收和分發消息的應用 mq的服務器
-exchange
-quenue
Channel 信道
連接里面多個信道 減少建立連接的開銷
Broker 里面有多個virtual host 每個用戶在自己的vhost創建exchange/queue
簡單模式
一個消費者 mq 一個生產者
工作模式
工作隊列的主要思想是避免立即執行資源密集型任務,而不得不等待它完成,相反我們安排任務在之后完成。我們把任務封裝為消息並將其發送到隊列。在后台運行的工作進程將彈出任務並最終執行作業。當有多個工作線程,這些工作線程將一起處理這些任務。
生產者大量發消息給隊列,造成很多消息停留在隊列中,無法進行及時處理。通過多個工作線程,采用輪詢的方式來處理。
消費者-》多個工作線程。輪詢 競爭關系
一個消息只能被處理一次 不可以處理多次
消息應答
問題:
某個消費者處理一個長的工作任務並且僅完成了部分就突然掛掉了。rabbitmq一旦向消費者發送了某條消息,就立即將消息設置為刪除。這種情況下,我們將會丟失正在處理的消息,以及后續發送給該消費者的消息,它將無法接收。
消息應答:消費者接收到消息並處理完消息之后,告訴rabbitmq消息已經處理了,rabbitmq可以把消息刪除了。
自動應答
高吞吐量和數據傳輸安全要有保證
手動應答
手動應答的方法
basicAck 肯定確認(如果批量應答 是否批量 true)
basicNack 否定確認 比另一個多一個參數。是否批量
basicReject 否定確認
批量應答 最好別 multiple
消息的自動重新入隊
消息未發送ACK確認 會重新入隊 rabbitmq會安排另一個消費者處理
消息手動應答時是不丟失的 放回隊列中重新消費
//手動應答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
rabbitmq持久化
確保消息不丟失 隊列和消息持久化
1.隊列持久化 durable true
boolean durable=true;
channel.queueDeclare(normal_queue,durable,false,false,arguments);
隊列不是持久化的 需要把原來的隊列先刪除掉 或者重新創建一個持久化的隊列 不然會報錯
2.消息持久化
生產者發消息時通知mq消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("",task_queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(“UTF-8"));
不公平分發
channel.basicQos(1);
消費者接收消息之前設置不公平分發
預取值
指定消費者分多少條消息
prefetchCount 5
prefetchCount 2
channel.basicQos(prefetchCount);
如果超過7條 按照不公平分發
發布確認原理
生產者
設置隊列必須持久化
設置要求隊列中的消息必須持久化
發布確認 :mq 把消息保存到磁盤上 ,保存成功后 通知生產者
1) 單個確認發布
發布速度特別慢 如果沒有確認發布的消息就會阻塞后續所有消息的發布
channel.confirmSelect();
channel.waitForConfirms() //
2) 批量確認發布
當發生故障導致發布出現問題時,不知道是哪個消息出現問題了。
3) 異步確認發布. 利用回調函數 保證是否投遞成功
如何處理異步確認中確認失敗的消息?把未確認的消息放到一個基於內存的能被發布線程訪問的隊列
//異步確認 public static void publishMessageAsync() throws Exception{ Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true,false,false,null); //發布確認 //線程安全有序的一個哈希表 適用於高並發的情況下 /** * 1.輕松將序號和消息關聯 * 2。輕松批量刪除條目 只要給到序號 * 3。支持高並發 */ ConcurrentSkipListMap<Long,String> outstandingConfirms =new ConcurrentSkipListMap<>(); //消息確認成功 ConfirmCallback ackCallback=(deliveryTag, multiple) -> { //刪除掉已經確認成功的消息 if(multiple){ ConcurrentNavigableMap<Long, String> confimed= outstandingConfirms.headMap(deliveryTag); confimed.clear(); //批量 }else{ outstandingConfirms.remove(deliveryTag); //單個 } System.out.println("確認成功的消息"+deliveryTag); }; //消失確認失敗 ConfirmCallback nackCallback=(deliveryTag, multiple) -> { String message = outstandingConfirms.get(deliveryTag); System.out.println("確認失敗的消息"+message); }; channel.confirmSelect(); //准備消息的監聽器 channel.addConfirmListener(ackCallback,nackCallback); int batch =1000; long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message=i+""; channel.basicPublish("",queueName,null,message.getBytes()); //記錄下所有的消息 outstandingConfirms.put(channel.getNextPublishSeqNo(), message); } long end = System.currentTimeMillis(); System.out.println("發布1000個單獨確認需要時間:"+(end-start)); }
交換機
發布訂閱模式 一個消息想被多個消費者消費
生產者-消息- 交換機 -routingkey-隊列- 消息只能被消費一次-消費者
-routingkey-隊列-消息只能被消費一次 -消費者
生產者只能將消息發送到交換機,交換機一方面接收來自生產者的消息,另一方面將它們推入隊列,
exchange
直接direct 主題topic 標題headers(不常用) 扇出fanout
“”表示無名或者默認交換機
routingkey 綁定key 指定交換機
臨時隊列
不帶有持久化 名字是隨機的 隊列 一旦斷開了消息者的隊列,隊列將被自動刪除。
String queueName=Channel.queueDeclare().getQueue();
綁定 (交換機 queue 之間的橋梁)
通過routingkey進行綁定
通過routing key 區分不同的隊列
1)Fanout(廣播) 發布訂閱模式。 扇出
將接收到的所有的消息廣播到所有隊列中
綁定交換機和隊列
Channel.queueBind(queueName,Exchange_NAME,“”);//第三個參數 routingKey
兩個隊列的Routingkey相同 將都接收到消息
2)direct交換機 路由模式 routingkey模式
聲明隊列的時候 指明交換機是direct類型
生產者-消息- 交換機 -routingkey-隊列- 消息只能被消費一次-消費者
-routingkey-隊列-消息只能被消費一次 -消費者
routingkey相同是扇出交換機 不同是直接交換機
direct_logs 交換機->console 隊列 ->nfo routingkey
- >console 隊列->warming(多重綁定)
->disk 隊列->error
一個隊列,擁有多個routing key 多重綁定
誰能接收到消息 完全取決於rouingkey
routingkey->info console接收
routingkey-> warming console接收
routingkey->error disk接收
3)topic交換機
Routingkey不同,直接交換機只能給一個隊列發消息
主題交換機的routing key 必須是一個單詞列表 以點號分割
“quick,orange.rabbit” 不能超過255個字節
(*.orange.*)匹配三個單詞中間是orange
(lazy.#)#匹配多個單詞
當一個隊列綁定鍵是#,那么這個隊列將接收所有的數據,類似於fanout
當隊列綁定鍵中沒有#和*出現,那這個隊列綁定類型類似於direct
死信隊列
消息無法被消費
某些時候由於特定的原因導致queue中的某些消息無法被消費,這樣的消息如果沒有后續的處理,就變成了死信
應用場景:為了保證訂單業務的消息數據不丟失,需要用到死信隊列機制。
當消息發生異常,將消息投入死信隊列。
支付超時未付款訂單會自動失效
死信的來源
消息ttl過期(存活時間)
隊列達到最大長度
消息被拒絕
生產者 -普通交換機 type=direct->普通隊列—-》c1
|
消息TTL過期
隊列達到最大長度 成為死信
消息被拒絕
|
死信交換機 type=direct
|
死信隊列———》c2
死信消息
//通過參數轉發消息到死信隊列
HashMap<String, Object> arguments = new HashMap<>(); //過期時間. ms arguments.put("x-message-ttl",100000); //正常隊列設置死信交換機 arguments.put("x-dead-letter-exchange",dead_exchange); //死信routingkey arguments.put("x-dead-letter-routing-key","lisi"); channel.queueDeclare(normal_queue,false,false,false,arguments);ggu p
設置過期時間:
一種在普通隊列 設置過期時間
另一種在生產方發消息時設置過期時間。(比較靈活 可以隨意修改過期時間)
發消息時設置過期時間:props 設置死信消息的過期時間
AMQP.BasicProperties properties =new AMQP.BasicProperties().builder().expiration("1000").build();
設置死信隊列的長度
arguments.put("x-max-length",6);
超過部分會成為死信消息
消息被拒絕 指定某條消息被拒絕
需要開啟手動應答
if(message.equals(“info5")){
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);//(消息的標志,是否放回隊列)
}
延遲隊列(死信隊列中ttl過期)
隊列內部是有序的
在某個事件發生之前或者之后的指定時間完成某一項任務
訂單十分鍾內未支付則關閉
整合springboot:跳過
延遲隊列:延遲指定時間消費消息
優化
每新增一個時間需求,就要新增一個隊列
QA QB指定了過期時間。【QC不指定過期時間 沒設置ddl時間】
解決方法:發送消息的時候設置過期時間
rabbitTemplate.convertAndSend(“X”,”XC”,message,msg->{ msg.getMessageProperties().setExpiration(ttlTime); return msg; });
基於死信的延遲存在問題:
發送多條信息會排隊,rabbitmq只會檢查第一個隊列,如果第一個消息的延時時長很長,第二個消息的延時時長很短,第二條消息並不會得到優待。
基於延遲消息插件的延遲隊列:延遲交換機 x-delayed-message
生產者 -》延遲交換機 -〉隊列 -》消費者
聲明一個延遲交換機 基於插件的延時隊列
public CustomExchange delayedExchange(){ Map<String,Object> arguments =new HashMap<>(); arguments.put("x-delayed_type","direct"); return new CustomExchange(delayed_exchange_name,"x-delayed-message",true,false,arguments); }
延時隊列:使用rabbitmq實現延時隊列可以很好的利用rabbitmq的特性,消息的可靠發送,可靠投遞,利用死信隊列保證消息至少被消費一次 以及未被正確處理的消息不會被丟棄。
Rabbitmq集群的特性 可以解決單點故障的問題 不會因為單個節點掛掉導致延時隊列不可用或者消息丟失。