/**
* rabbitmq 工具類
*
* @author yangxj
* @date 2020-03-25 20:30
*
* 基本概念: queue 通過 routeKey 綁定 exchange; producer 發送消息到 exchange, exchange 通過 routeKey 發送與之綁定的 queue
* exchange 類型: direct, fanout, topic
*/
public class RabbitMQUtils {
/**
* 獲取rabbitMq 連接
*
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setUsername("guest");
return factory.newConnection();
}
}
/**
* 1. 通過事務機制( 缺點: 降低了mq 的吞吐量)
*/
public static void useTx() throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
// 聲明一個queue
channel.queueDeclare("test-queue", false, false, false, null);
// 聲明一個topic 類型的 exchange
channel.exchangeDeclare("test-topic-exchange", BuiltinExchangeType.TOPIC);
// queue 通過 routeKey 綁定 exchange
channel.queueBind("test-queue", "test-topic-exchange", "test.*");
String message = "hello rabbit mq!";
channel.txSelect(); // 開啟事務
try {
channel.basicPublish("test-topic-exchange", "test.message", null, message.getBytes()); // 消息發送
channel.txCommit();
System.out.println("message send success..");
} catch (Exception e) {
channel.txRollback(); // 回滾
System.out.println("message send fail..");
} finally {
channel.close();
connection.close();
}
}
/**
* 2. 生產者 confirm模式(串型)
* 原理: 當信道channel 進入confrim模式, 所有在該信道發布的消息都會被指派一個唯一的id
* 當消息被投遞到所有匹配的隊列后, broker就會發送一個確認給生產者(包含消息id);
*/
public void producterConfirmSync() throws Exception {
Connection connection = getConnection(); // 獲取rabbitMq 連接
Channel channel = connection.createChannel();
channel.confirmSelect(); // 開啟confirm模式
String message = "hello rabbit mq!";
String message2 = "hello rabbit mq2!";
channel.basicPublish("", "", MessageProperties.BASIC, message.getBytes());
channel.basicPublish("", "", MessageProperties.BASIC, message2.getBytes());
if (channel.waitForConfirms()) { // 支持多條發送后再確認
System.out.println("message send success..");
} else {
System.out.println("message send fail..");
}
channel.close();
connection.close();
}
/**
* 3. 生產者 confirm模式(異步)
*/
public void producterConfirmAsync() throws Exception {
Connection connection = getConnection(); // 獲取rabbitMq 連接
Channel channel = connection.createChannel();
channel.confirmSelect(); // 開啟confirm模式
// ack ids
SortedSet<Long> ackIds = new TreeSet<>();
// not ack ids
SortedSet<Long> nackIds = new TreeSet<>();
for (int i = 0; i < 100; i++) {
String message = "hello rabbit mq: " + i;
channel.basicPublish("", "", null, message.getBytes());
}
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 發送成功(確認)
// multiple 單條確認 or多條確認 (為true, 表示到這個序列號之前的所有消息都已經得到了處理)
ackIds.add(deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 發送失敗(未確認)
nackIds.add(deliveryTag);
}
});
}
/**
* 消費者 confirm
*/
public void consumerConfirm() throws Exception {
Connection connection = getConnection(); // 獲取rabbitMq 連接
Channel channel = connection.createChannel();
try {
channel.basicConsume("test", false, // 關閉消費者自動ack
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
//發布的每一條消息都會獲得一個唯一的deliveryTag,deliveryTag在channel范圍內是唯一的
long deliveryTag = envelope.getDeliveryTag();
//第二個參數是批量確認標志。如果值為true,則執行批量確認,此deliveryTag之前收到的消息全部進行確認;
// 如果值為false,則只對當前收到的消息進行確認
channel.basicAck(deliveryTag, true); // 確認
}
});
} catch (IOException e) {
e.printStackTrace();
}finally {
channel.close();
connection.close();
}
}