8、RabbitMQ-消息的確認機制(生產者)


RabbitMQ 之消息確認機制(事務+Confirm)

https://blog.csdn.net/u013256816/article/details/55515234

 

概述:

在 Rabbitmq 中我們可以通過持久化來解決因為服務器異常而導致丟失的問題
 
除此之外我們還會遇到一個問題: 生產者將消息發送出去之后,消息到底有沒有正
確到達 Rabbit 服務器呢?如果不錯得數處理,我們是不知道的,(即 Rabbit 服務器
不會反饋任何消息給生產者),也就是默認的情況下是不知道消息有沒有正確到達;

 

導致的問題:消息到達服務器之前丟失,那么持久化也不能解決此問題,因為消息根本就沒有到達 Rabbit 服務器!

 

RabbitMQ 為我們提供了兩種方式:
1. 通過 AMQP 事務機制實現,這也是 AMQP 協議層面提供的解決方案;
2. 通過將 channel 設置成 confirm 模式來實現

 

 

事務機制                                                                                                                                                                          RabbitMQ 中與事務機制有關的方法有三個:txSelect(), txCommit()以及 txRollback(), 
txSelect 用於將當前 channel 設置成 transaction 模式,txCommit 用於提交事務,
txRollback 用於回滾事務,在通過 txSelect 開啟事務之后,我們便可以發布消息
給 broker 代理服務器了,如果 txCommit 提交成功了,則消息一定到達了 broker 了,
如果在 txCommit執行之前 broker 異常崩潰或者由於其他原因拋出異常,這個時候
我們便可以捕獲異常通過 txRollback 回滾事務了。                                                                                                                                 
txSelect   txCommit  txRollback                                                                                                                                txSelect:用戶將當前channel設置成transation模式                                                                                                          txCommit :用於提交事務
txRollback :用戶回滾事務

 

生產者:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class TXsend {
     private static final String QUEUE_NAMW = "test_tx_queue";
     
     public static void main(String[] args) throws IOException,  TimeoutException {
         Connection conn = ConnectionUtils.getConnection();
           Channel channel = conn.createChannel();
           
           channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
           
           String msg = "tx";
           
           try {
                //開啟事務模式、  channel.txSelect();
                channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
                //模擬事故
                int i = 1/0;
                //提交
 channel.txCommit();
           } catch (Exception e) {
          //進行事務回滾 channel.txRollback(); System.
out.println("TxRollback..."); } channel.close(); conn.close(); } }

 

 

  消費者:

 

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.util.ConnectionUtils;
public class TxReceive {
     
     private static final String QUEUE_NAMW = "test_tx_queue";
public static void main(String[] args) throws IOException,  TimeoutException {
           
           Connection conn = ConnectionUtils.getConnection();
           
           Channel channel = conn.createChannel();
           
           //隊列聲明
           channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
           
           channel.basicQos(1);
           
           //綁定隊列到交換機轉發器
           
           //channel.queueBind(QUEUE_NAMW, "", "");
           
                     //定義一個消費者
                     Consumer consumer = new  DefaultConsumer(channel){
                           //收到消息就會觸發這個方法
                           @Override
                           public void handleDelivery(String  consumerTag, Envelope envelope, BasicProperties properties,  byte[] body)
                                     throws IOException {
                                String msg = new  String(body,"utf-8");
                                System.out.println("消費者1接收到的消息" + msg);
                                
                                try {
                                     Thread.sleep(1500);
                                } catch (InterruptedException e)  {
                                     e.printStackTrace();
                                }finally{
                                     System.out.println("消費者1處理完成!");
                                     //手動回執
                                     channel.basicAck(envelope.getDeliveryTag(), false);
                                }
                           }
                     };
                     //監聽隊列
                     //自動應答false
                     boolean autoAck = false;
                     channel.basicConsume(QUEUE_NAMW, autoAck,  consumer);
     }
}

 

 此時消費者不會接收到消息

 

此種模式還是很耗時的,采用這種方式 降低了 Rabbitmq 的消息吞吐量   

 

 

 Confirm模式 

概述 

上面我們介紹了 RabbitMQ 可能會遇到的一個問題,即生成者不知道消息是否真正到達 broker,隨
后通過 AMQP 協議層面為我們提供了事務機制解決了這個問題,但是采用事務機制實現會降低
RabbitMQ 的消息吞吐量,那么有沒有更加高效的解決方式呢?答案是采用 Confirm 模式。 

 

  producer 端 confirm 模式的實現原理 

 

   該模式最大的好處就是異步的!!!     

 

 

開啟 confirm 模式的方法                                                                                                                     已經在 transaction 事務模式的 channel 是不能再設置成 confirm 模式的,即這兩種模式是不能共存的。                生產者通過調用 channel 的 confirmSelect 方法將 channel 設置為 confirm 模式                                                            核心代碼:

//生產者通過調用channel的confirmSelect方法將channel設置為confirm模式
channel.confirmSelect();     

 

 編程模式

 

1. 普通 confirm 模式:每發送一條消息后,調用 waitForConfirms()方法,等待服務器端
    confirm。實際上是一種串行 confirm 了。
2. 批量 confirm 模式:每發送一批消息后,調用 waitForConfirms()方法,等待服務器端
    confirm。
3. 異步 confirm 模式:提供一個回調方法,服務端 confirm 了一條或者多條消息后 Client 端會回
    調這個方法。

 

 普通模式:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class confirm{
     private static final String QUEUE_NAMW =  "test_tx_confirm1";
     
     public static void main(String[] args) throws IOException,  TimeoutException, InterruptedException {
         Connection conn = ConnectionUtils.getConnection();
           Channel channel = conn.createChannel();
           
           channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
           
           //生產者調用confirmSelect,將channel設置為confirm模式  channel.confirmSelect();
           String msg = "confirm";
           channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
           if(!channel.waitForConfirms()){ System.out.println("send failed"); }else{ System.out.println("send ok"); }
           channel.close();
           conn.close();
     }
}

 

 

 

批量模式
批量 confirm 模式稍微復雜一點,客戶端程序需要定期(每隔多少秒)
或者定量(達到多少條)或者兩則結合起來publish 消息,然后等待
服務器端 confirm, 相比普通 confirm 模式,批量極大提升 confirm
 效率,但是問題在於一旦出現 confirm 返回 false 或者超時的情
況時,客戶端需要將這一批次的消息全部重發,這會帶來明顯的重復消息數
量,並且,當消息經常丟失時,批量 confirm 性能應該是不升反降的。

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class TXsend {
private static final String QUEUE_NAMW = "test_tx_confirm1";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
      Connection conn = ConnectionUtils.getConnection();
      Channel channel = conn.createChannel();

      channel.queueDeclare(QUEUE_NAMW, false, false, false, null);

      //1
      //生產者調用confirmSelect,將channel設置為confirm模式
      channel.confirmSelect();

      //2
      String msg = "confirm";
      //批量發送
      for(int i=1;i<=10;i++){
        channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes());
      }

      //3
      //確認
      if(!channel.waitForConfirms()){
        System.out.println("send failed");
      }else{
        System.out.println("send ok");
      }

      channel.close();
      conn.close();
}
}

 

 

異步模式
Channel 對象提供的 ConfirmListener()回調方法只包含 deliveryTag
(當前 Chanel 發出的消息序號),我們 需要自己為每一個 Channel 
維護一個 unconfirm 的消息序號集合,每 publish 一條數據,集合中
元素加 1, 每回調一次 handleAck方法,unconfirm 集合刪掉相應的
一條(multiple=false)或多條(multiple=true)記錄。從程序運行
效率上看,這個unconfirm 集合最好采用有序集合 SortedSet 存儲結構。
實際上,SDK 中的 waitForConfirms()方法也是通過 SortedSet維護消息序號的。
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class TXsend {
     private static final String QUEUE_NAMW =  "test_tx_confirm3";
     
     public static void main(String[] args) throws IOException,  TimeoutException, InterruptedException {
         Connection conn = ConnectionUtils.getConnection();
           Channel channel = conn.createChannel();
           
           channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
           
           //生產者調用confirmSelect,將channel設置為confirm模式
 channel.confirmSelect(); //未確認的消息標識
 final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); //頻道加一個監聽
           channel.addConfirmListener(new ConfirmListener() { //回調/重發重試 可以1s之后再發 10s之后再發 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if(multiple){ System.out.println("handleNack-----multiple =1"); confirmSet.headSet(deliveryTag+1).clear();; }else{ System.out.println("handleNack-----multiple =0"); confirmSet.remove(deliveryTag); } } //沒問題的handleAck @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if(multiple){ System.out.println("handleAck-----multiple =1"); confirmSet.headSet(deliveryTag+1).clear();; }else{ System.out.println("handleAck-----multiple =0"); confirmSet.remove(deliveryTag); } } });
           
           
           String msg = "confirm";
           //模擬插入數據 while(true){ long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes()); confirmSet.add(seqNo); }
     }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM