RabbitMQ 之消息確認機制(事務+Confirm)
https://blog.csdn.net/u013256816/article/details/55515234
概述:
生產者:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtils; public class TXsend { private static final String QUEUE_NAMW = "test_tx_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAMW, false, false, false, null); String msg = "tx"; try { //開啟事務模式、 channel.txSelect(); channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes()); //模擬事故 int i = 1/0; //提交 channel.txCommit(); } catch (Exception e) {
//進行事務回滾 channel.txRollback(); System.out.println("TxRollback..."); } channel.close(); conn.close(); } }
消費者:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.util.ConnectionUtils; public class TxReceive { private static final String QUEUE_NAMW = "test_tx_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //隊列聲明 channel.queueDeclare(QUEUE_NAMW, false, false, false, null); channel.basicQos(1); //綁定隊列到交換機轉發器 //channel.queueBind(QUEUE_NAMW, "", ""); //定義一個消費者 Consumer consumer = new DefaultConsumer(channel){ //收到消息就會觸發這個方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消費者1接收到的消息" + msg); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("消費者1處理完成!"); //手動回執 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //監聽隊列 //自動應答false boolean autoAck = false; channel.basicConsume(QUEUE_NAMW, autoAck, consumer); } }
此時消費者不會接收到消息
此種模式還是很耗時的,采用這種方式 降低了 Rabbitmq 的消息吞吐量
Confirm模式
概述
producer 端 confirm 模式的實現原理
開啟 confirm 模式的方法 已經在 transaction 事務模式的 channel 是不能再設置成 confirm 模式的,即這兩種模式是不能共存的。 生產者通過調用 channel 的 confirmSelect 方法將 channel 設置為 confirm 模式 核心代碼:
編程模式
普通模式:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtils; public class confirm{ private static final String QUEUE_NAMW = "test_tx_confirm1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAMW, false, false, false, null); //生產者調用confirmSelect,將channel設置為confirm模式 channel.confirmSelect(); String msg = "confirm"; channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes()); if(!channel.waitForConfirms()){ System.out.println("send failed"); }else{ System.out.println("send ok"); } channel.close(); conn.close(); } }
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class TXsend {
private static final String QUEUE_NAMW = "test_tx_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection conn = ConnectionUtils.getConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAMW, false, false, false, null);
//1
//生產者調用confirmSelect,將channel設置為confirm模式
channel.confirmSelect();
//2
String msg = "confirm";
//批量發送
for(int i=1;i<=10;i++){
channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes());
}
//3
//確認
if(!channel.waitForConfirms()){
System.out.println("send failed");
}else{
System.out.println("send ok");
}
channel.close();
conn.close();
}
}
import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtils; public class TXsend { private static final String QUEUE_NAMW = "test_tx_confirm3"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAMW, false, false, false, null); //生產者調用confirmSelect,將channel設置為confirm模式 channel.confirmSelect(); //未確認的消息標識 final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); //頻道加一個監聽 channel.addConfirmListener(new ConfirmListener() { //回調/重發重試 可以1s之后再發 10s之后再發 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if(multiple){ System.out.println("handleNack-----multiple =1"); confirmSet.headSet(deliveryTag+1).clear();; }else{ System.out.println("handleNack-----multiple =0"); confirmSet.remove(deliveryTag); } } //沒問題的handleAck @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if(multiple){ System.out.println("handleAck-----multiple =1"); confirmSet.headSet(deliveryTag+1).clear();; }else{ System.out.println("handleAck-----multiple =0"); confirmSet.remove(deliveryTag); } } }); String msg = "confirm"; //模擬插入數據 while(true){ long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes()); confirmSet.add(seqNo); } } }