/** * rabbitmq 工具類 * * @author yangxj * @date 2020-03-25 20:30 * * 基本概念: queue 通過 routeKey 綁定 exchange; producer 發送消息到 exchange, exchange 通過 routeKey 發送與之綁定的 queue * exchange 類型: direct, fanout, topic */ public class RabbitMQUtils { /** * 獲取rabbitMq 連接 * * @return * @throws Exception */ public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setUsername("guest"); return factory.newConnection(); }
}
-
事務機制
/** * 1. 通過事務機制( 缺點: 降低了mq 的吞吐量) */ public static void useTx() throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); // 聲明一個queue channel.queueDeclare("test-queue", false, false, false, null); // 聲明一個topic 類型的 exchange channel.exchangeDeclare("test-topic-exchange", BuiltinExchangeType.TOPIC); // queue 通過 routeKey 綁定 exchange channel.queueBind("test-queue", "test-topic-exchange", "test.*"); String message = "hello rabbit mq!"; channel.txSelect(); // 開啟事務 try { channel.basicPublish("test-topic-exchange", "test.message", null, message.getBytes()); // 消息發送 channel.txCommit(); System.out.println("message send success.."); } catch (Exception e) { channel.txRollback(); // 回滾 System.out.println("message send fail.."); } finally { channel.close(); connection.close(); } }
-
生產者confirm模式 (同步)
/** * 2. 生產者 confirm模式(串型) * 原理: 當信道channel 進入confrim模式, 所有在該信道發布的消息都會被指派一個唯一的id * 當消息被投遞到所有匹配的隊列后, broker就會發送一個確認給生產者(包含消息id); */ public void producterConfirmSync() throws Exception { Connection connection = getConnection(); // 獲取rabbitMq 連接 Channel channel = connection.createChannel(); channel.confirmSelect(); // 開啟confirm模式 String message = "hello rabbit mq!"; String message2 = "hello rabbit mq2!"; channel.basicPublish("", "", MessageProperties.BASIC, message.getBytes()); channel.basicPublish("", "", MessageProperties.BASIC, message2.getBytes()); if (channel.waitForConfirms()) { // 支持多條發送后再確認 System.out.println("message send success.."); } else { System.out.println("message send fail.."); } channel.close(); connection.close(); }
-
生產者confirm (異步)
/** * 3. 生產者 confirm模式(異步) */ public void producterConfirmAsync() throws Exception { Connection connection = getConnection(); // 獲取rabbitMq 連接 Channel channel = connection.createChannel(); channel.confirmSelect(); // 開啟confirm模式 // ack ids SortedSet<Long> ackIds = new TreeSet<>(); // not ack ids SortedSet<Long> nackIds = new TreeSet<>(); for (int i = 0; i < 100; i++) { String message = "hello rabbit mq: " + i; channel.basicPublish("", "", null, message.getBytes()); } channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 發送成功(確認) // multiple 單條確認 or多條確認 (為true, 表示到這個序列號之前的所有消息都已經得到了處理) ackIds.add(deliveryTag); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 發送失敗(未確認) nackIds.add(deliveryTag); } }); }
-
消費者confirm
/** * 消費者 confirm */ public void consumerConfirm() throws Exception { Connection connection = getConnection(); // 獲取rabbitMq 連接 Channel channel = connection.createChannel(); try { channel.basicConsume("test", false, // 關閉消費者自動ack new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //發布的每一條消息都會獲得一個唯一的deliveryTag,deliveryTag在channel范圍內是唯一的 long deliveryTag = envelope.getDeliveryTag(); //第二個參數是批量確認標志。如果值為true,則執行批量確認,此deliveryTag之前收到的消息全部進行確認; // 如果值為false,則只對當前收到的消息進行確認 channel.basicAck(deliveryTag, true); // 確認 } }); } catch (IOException e) { e.printStackTrace(); }finally { channel.close(); connection.close(); } }