准備
1.引入客戶端和配置文件依賴類
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
2.properties文件配置
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=liqiang spring.rabbitmq.password=liqiang
3.Test父類
@TestPropertySource("classpath:application.properties") public class BaseTest { ConnectionFactory connectionFactory; @Autowired RabbitMQConfig rabbitMQConfig; public BaseTest(){ } public void initConnectionFactory(){ if(connectionFactory==null) { connectionFactory = new ConnectionFactory(); connectionFactory.setUsername(rabbitMQConfig.getUsername()); connectionFactory.setPassword(rabbitMQConfig.getPassword()); connectionFactory.setHost(rabbitMQConfig.getHost()); connectionFactory.setPort(rabbitMQConfig.getPort()); } } public Connection newConnection() throws IOException, TimeoutException { initConnectionFactory(); return connectionFactory.newConnection(); } }
manadatory參數
說明
當次參數設置為true時 交換器無法根據自身類型和路由鍵找到符合條件的隊列name將通過Basic.Retrun命令將消息返回給生產者 為false則直接丟棄
例子
String exchangeName = "test"; String queueName = "testQueue"; String routingKey = "testRoutingKey"; Connection connection = newConnection(); //聲明一個channel一個連接可以監聽多個channel 連接復用 Channel channel = connection.createChannel(); //聲明一個名字為test 非自動刪除的 direct類型的exchange 更多配置書37頁 channel.exchangeDeclare(exchangeName, "direct", true); //聲明一個持久化,非排他,非自動刪除的隊列 channel.queueDeclare(queueName, true, false, false, null); //將隊列與交換器綁定 channel.queueBind(queueName, exchangeName, routingKey); //mandatory設置為true 如果根據routing key找不到隊列則會回調通知 false則直接丟棄(這里將routing key設置為""字符串)運行則會觸發通知 channel.basicPublish(exchangeName, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好呀".getBytes()); //未名字路由的回調 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("回調通知" + new String(body)); } });
imanadatory參數
當immediate為true時如果隊列沒有消費者 則會通過Basic.Retrun返回 3.0已經移除
備份交換器
說明
路由不成功的時候 不是返回給生產者 而是存放到指定隊列
demo
public void backupsTest() throws IOException, TimeoutException { String exchangeName = "test"; //備份exchange String backupsExchange = "testBackup"; String queueName = "testQueue"; //備份隊列名字 String backupsQueueName = "backupsTestQueue"; String routingKey = "testRoutingKey"; //設置參數 Map<String, Object> args = new HashMap<>(); args.put("alternate-exchange", backupsExchange); Connection connection = newConnection(); //聲明一個channel一個連接可以監聽多個channel 連接復用 Channel channel = connection.createChannel(); //聲明一個exchange並指定備份exchange 如果路由失敗則路由到備份exchange channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, args); //聲明備份exchage fanout類型 是因為 備份不需要路由key channel.exchangeDeclare(backupsExchange, BuiltinExchangeType.FANOUT, true, false, null); //聲明一個持久化,非排他,非自動刪除的隊列 channel.queueDeclare(queueName, true, false, false, null); //將隊列與交換器綁定 channel.queueBind(queueName, exchangeName, routingKey); //聲明備份隊列 channel.queueDeclare(backupsQueueName, true, false, false, null); //備份隊列交換器是fanout類型 所以不需要routingkey channel.queueBind(backupsQueueName, backupsExchange, ""); channel.basicPublish(exchangeName, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好呀".getBytes()); //不會觸發回調通知 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("回調通知" + new String(body)); } }); }
備份交換器幾種情況
- 如果設置備份交換器不存在 則消息會丟失 服務器不會報錯
- 如果備份交換器沒有綁定任何隊列客戶端和rabbitMq則客戶端和服務端都不會出現 消息會丟失
- 如果備份交換器沒有綁定任何隊列客戶端和rabbitMq都不會出現異常情況 消息會丟失
- 如果備份交換器和manadatory一起使用 則manadatory無效
過期消息
/** * mq消息過期測試 書60頁 * 通過直接給隊列設置 則消息到了過期日期則自動移除 * 通過每條消息單獨設置 消息過期不會馬上移除,而是消費的時候判斷是否過期 才移除 */ //@Test public void messageTTLTest() throws IOException, TimeoutException { String exchangeName = "test"; String queueName = "testQueue"; String routingKey = "testRoutingKey"; Connection connection = newConnection(); //聲明一個channel一個連接可以監聽多個channel 連接復用 Channel channel = connection.createChannel(); //聲明一個名字為test 非自動刪除的 direct類型的exchange 更多配置書37頁 channel.exchangeDeclare(exchangeName, "direct", true); //設置參數 Map<String, Object> args = new HashMap<>(); //設置單位 毫秒 超時沒消費則會被丟棄 如果設置為0 則如果有消費者直接投遞 沒有消費者則丟棄 可以替代imanadatory
args.put("x-message-ttl", 6000); //聲明一個持久化,非排他,非自動刪除的隊列 並設置整個隊列消息的過期時間 channel.queueDeclare(queueName, true, false, false, args); //將隊列與交換器綁定 channel.queueBind(queueName, exchangeName, routingKey); //mandatory設置為true 如果根據routing key找不到隊列 則會回調通知 false則直接丟棄 channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好呀".getBytes()); //針對單條消息過期時間設置 // AMQP.BasicProperties.Builder builder=new AMQP.BasicProperties.Builder(); // builder.deliveryMode(2);//持久化消息 // builder.expiration("6000");//設置ttl為6000 // AMQP.BasicProperties properties=builder.build(); // channel.basicPublish(exchangeName,routingKey,true,properties,"你好呀".getBytes()); }
死信隊列(DLX)
說明
當一個消息由一個交換器變成死信后他會重新發送到另外一個交換器(稱之為死信交換器)改交換器綁定的隊列就是死信隊列
消息變成死信的幾種情況
- 消息拒絕Basic/Reject 並設置requeue為false
- 消息過期
- 隊列達到最大長度
demo
/** * 死信隊列 DLX 書63頁 通過ttl加死信 可以實現延遲消息 如果訂單半小時沒支付關閉 * 說明:死信隊列本質也是隊列,綁定死信交換器的隊列叫死信隊列 * 以下三種情況將發送到死信交換器 * 消息被拒絕 並設置 requeue為false * 消息過期(這里以消息過期為例子) * 隊列達到最大長度 */ //@Test public void queueDLXTest() throws IOException, TimeoutException { String exchangeName = "test"; //死信交換器名字 String dlxExchangeName = "dlx_exchange"; String queueName = "testQueue"; //死信隊列名字 String dlxQueueName = "dlxQueueName"; String routingKey = "testRoutingKey"; Connection connection = newConnection(); //聲明一個channel一個連接可以監聽多個channel 連接復用 Channel channel = connection.createChannel(); //聲明一個名字為test 非自動刪除的 direct類型的exchange 更多配置書37頁 channel.exchangeDeclare(exchangeName, "direct", true); //聲明一個交換器 用於死信隊列 channel.exchangeDeclare(dlxExchangeName, "direct", true); //為死信隊列綁定一個隊列 channel.queueDeclare(dlxQueueName, true, false, false, null); //將隊列與交換器綁定 channel.queueBind(dlxQueueName, dlxExchangeName, routingKey); //設置參數 Map<String, Object> args = new HashMap<>(); //設置單位 毫秒 超時沒消費則會被丟棄 如果設置為0 則如果有消費者直接投遞 沒有消費者則丟棄 args.put("x-message-ttl", 6000); //指定對應的死信交換器 args.put("x-dead-letter-exchange", dlxExchangeName); //可以為死信交換器指定路由key如果不指定 則默認使用原routingkey //args.put("x-dead-letter-routing-key","dlx-routing-key"); //聲明一個持久化,非排他,非自動刪除的隊列 並設置整個隊列消息的過期時間 channel.queueDeclare(queueName, true, false, false, args); //將隊列與交換器綁定 channel.queueBind(queueName, exchangeName, routingKey); //mandatory設置為true 如果根據routing key找不到隊列 則會回調通知 false則直接丟棄 channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好呀".getBytes()); }
隊列過期
指定時間沒有被使用則自動移除
/** * 隊列過期測試 * 隊列指定時間沒有被使用則移除 */ //@Test public void queueTTLTest() throws IOException, TimeoutException { //設置參數 Map<String, Object> args = new HashMap<>(); //如果隊列6秒沒被使用則移除 args.put("x-expires", 6000); String exchangeName = "test"; String queueName = "queueTTl"; String routingKey = "testRoutingKey"; Connection connection = newConnection(); //聲明一個channel一個連接可以監聽多個channel 連接復用 Channel channel = connection.createChannel(); //隊列6秒沒被使用則移除 channel.queueDeclare(queueName, true, false, false, args); }
延遲隊列
說明
處理類似訂單30分鍾未支付自動關閉這種需求,或者延遲發短信 通過TTL+DLX實現
demo
/** * 死信隊列 DLX 書63頁 通過ttl加死信 可以實現延遲消息 如果訂單半小時沒支付關閉 * 說明:死信隊列本質也是隊列,綁定死信交換器的隊列叫死信隊列 * 以下三種情況將發送到死信交換器 * 消息被拒絕 並設置 requeue為false * 消息過期(這里以消息過期為例子) * 隊列達到最大長度 */ //@Test public void queueDLXTest() throws IOException, TimeoutException { String exchangeName = "test"; //死信交換器名字 String dlxExchangeName = "dlx_exchange"; String queueName = "testQueue"; //死信隊列名字 String dlxQueueName = "dlxQueueName"; String routingKey = "testRoutingKey"; Connection connection = newConnection(); //聲明一個channel一個連接可以監聽多個channel 連接復用 Channel channel = connection.createChannel(); //聲明一個名字為test 非自動刪除的 direct類型的exchange 更多配置書37頁 channel.exchangeDeclare(exchangeName, "direct", true); //聲明一個交換器 用於死信隊列 channel.exchangeDeclare(dlxExchangeName, "direct", true); //為死信隊列綁定一個隊列 channel.queueDeclare(dlxQueueName, true, false, false, null); //將隊列與交換器綁定 channel.queueBind(dlxQueueName, dlxExchangeName, routingKey); //設置參數 Map<String, Object> args = new HashMap<>(); //設置單位 毫秒 超時沒消費則會被丟棄 如果設置為0 則如果有消費者直接投遞 沒有消費者則丟棄 args.put("x-message-ttl", 6000); //指定對應的死信交換器 args.put("x-dead-letter-exchange", dlxExchangeName); //可以為死信交換器指定路由key如果不指定 則默認使用原routingkey //args.put("x-dead-letter-routing-key","dlx-routing-key"); //聲明一個持久化,非排他,非自動刪除的隊列 並設置整個隊列消息的過期時間 channel.queueDeclare(queueName, true, false, false, args); //將隊列與交換器綁定 channel.queueBind(queueName, exchangeName, routingKey); //mandatory設置為true 如果根據routing key找不到隊列 則會回調通知 false則直接丟棄 channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_TEXT_PLAIN, "你好呀".getBytes()); }
優先級隊列
說明
消息里面有阻塞情況 保證消息的 優先級高的先執行
demo
/** * 只針對隊列里面有阻塞情況下 優先級 不然發送一條消費一條優先級就沒有意義 * 可以在管理頁面同時get10條看是否是有序的 */ @Test public void priorityTest() throws IOException, TimeoutException { String exchangeName = "test"; String queueName = "testQueue"; String routingKey = "testRoutingKey"; Connection connection = newConnection(); //設置參數 Map<String, Object> args = new HashMap<>(); //優先級最大標識 args.put("x-max-priority", 10); //聲明一個channel一個連接可以監聽多個channel 連接復用 Channel channel = connection.createChannel(); //聲明一個名字為test 非自動刪除的 direct類型的exchange 更多配置書37頁 channel.exchangeDeclare(exchangeName, "direct", true); //聲明一個持久化,非排他,非自動刪除的隊列 channel.queueDeclare(queueName, true, false, false, args); //將隊列與交換器綁定 channel.queueBind(queueName, exchangeName, routingKey); // for (int i = 0; i <= 10; i++) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); int priority = new Random().nextInt(10); builder.priority(priority);//用於優先級序列 AMQP.BasicProperties properties = builder.build(); //mandatory設置為true 如果根據routing key找不到隊列 則會回調通知 false則直接丟棄 channel.basicPublish(exchangeName, routingKey, true, properties, ("你好呀" + priority).getBytes()); channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("回調通知" + new String(body)); } }); } }
消息持久化
說明
- 交換器持久化(未設置持久化重啟 交換器會消息 會路由不到數據不會影響隊列和消息)
- 隊列持久化(未設置持久化話,重啟隊列會消失 就算消息設置了持久化)
- 消息持久化(如果隊列是持久化 消息也是持久化重啟后消息不會消失)
注意:如果將交換器/隊列/消息都設置持久化 會降低消息的吞吐量 需要在可靠性和吞吐量做權衡
場景1:autoack 消費者受到消息還沒來得及處理就掛掉了
場景2:服務發送方發送消息到mq 消息還沒來得及寫入磁盤就掛了(寫入磁盤和寫入內存是異步的)
解決方案
1:解決此功能是引入MQ的鏡像隊列 如果master掛了 快速且切到從(並不能完全保證 但是可靠性會高很多 生產環境一般都是鏡像隊列)
2.通過事物消息 但是事物消息性能很低因為發送消息比普通發送消息多了幾個步驟(書:75)
3.通過發送方ack確認的方式(需要保證消費端冪等性 因為網絡原因可能未能正確收到ack)
4.ack有2種 一種是同步一種是異步 異步性能優於同步(隊列和消息設置了持久化 name將在成功落盤后才能收到ack)
事物消息
可以保證消息不會丟失但是性能很低
public Channel createChannel() throws IOException, TimeoutException { String exchangeName = "test"; String queueName = "testQueue"; String routingKey = "testRoutingKey"; Connection connection = newConnection(); //聲明一個channel一個連接可以監聽多個channel 連接復用 Channel channel = connection.createChannel(); //聲明一個名字為test 非自動刪除的 direct類型的exchange 更多配置書37頁 channel.exchangeDeclare(exchangeName, "direct", true); //聲明一個持久化,非排他,非自動刪除的隊列 channel.queueDeclare(queueName, true, false, false, null); //將隊列與交換器綁定 channel.queueBind(queueName, exchangeName, routingKey); return channel; }
public void transactionCommit() throws IOException, TimeoutException { Channel channel = createChannel(); try { //向broker發送tx.select指令 將信道設置為事物模式 broker響應tx.select-ok表示設置成功 channel.txSelect(); //在事物信道執行發送消息指令 可以多個 channel.basicPublish("test","testRoutingKey",MessageProperties.PERSISTENT_TEXT_PLAIN,"滴滴".getBytes()); //向broker發送tx.commit執行 broker響應tx.commit-OK表示成功成功才會羅盤 channel.txCommit(); } catch (Exception e) { e.printStackTrace(); //向broker發送tx.rollback broker響應tx.rollback-ok表示成功 channel.txRollback(); } }
生產者ACK
原理
將信道設置為ack模式 所有在此信道上面發送的消息都會分配一個唯一id 當消息投遞到指定隊列后 將會在回傳的deliveryTag包含此消息
channel.basicAck(消費端ack)的 multiple參數表示這個序號之前的都已經確認進行批量確認
如果設置了持久化 將在寫入磁盤后ack通知
同步ack
/** * 同步ack * 注意事物信道和confirm信道不能共存 */ //@Test public void synchroAck() throws IOException, TimeoutException, InterruptedException { Channel channel = createChannel(); //發送tx.configSelect 將信道設置為publisher confirm模式 channel.confirmSelect(); //在confirm信道發送消息指令 如果多個 則將channel.basicPublish channel.waitForConfirms包在循環里面 channel.basicPublish("test","testRoutingKey",MessageProperties.PERSISTENT_TEXT_PLAIN,"滴滴".getBytes()); //同步等待ack如果非confirm模式 調用此方法會報錯 4個重載 書79頁 if(channel.waitForConfirms()){ System.out.println("發送消息成功"); } }
批量發送
/** * 批量發送 * 性能優於上面一種方式 * @throws IOException * @throws TimeoutException */ //@Test public void batchSynchroAck() throws IOException, TimeoutException { Channel channel=createChannel(); AMQP.Confirm.SelectOk selectOk=channel.confirmSelect(); try{ for(int i=0;i<10;i++){ channel.basicPublish("test","testRoutingKey",MessageProperties.PERSISTENT_TEXT_PLAIN,("滴滴"+i).getBytes()); } /** * 發送一條消息會生成一個ID(從1開始) mq會回傳ack或者nack 客戶端里面做了處理 basicpublish內部維護一個SortedSet 回傳一個ack則移除一個 實現批量監聽ack消息 * 其中有一條未確認就會拋異常 * 缺點是其中一條失敗 則全部要重發,所以批次不能太大 */ if(channel.waitForConfirms()){ System.out.println("發送消息成功"); } }catch (InterruptedException e){ /** * 重發邏輯 */ } }
異步ack
demo異步方式優於前面2種方式
public void AsynchroAck() throws IOException, TimeoutException, InterruptedException { Channel channel=createChannel(); channel.confirmSelect(); SortedSet confirmSet=new TreeSet(); /** * 注意 比如你發10條 不一定回調10次 因為id從1開始 如果回調一次10 表示簽名都被確認 */ channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("已確認"+deliveryTag); //為true表示批量確認 if(multiple){ //小於e 之前的元素不包括e SortedSet ackSet=confirmSet.headSet(deliveryTag+1); ackSet.clear(); }else{ //刪除deliveryTag對象 confirmSet.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //為true表示批量NACK if(multiple){ //清除之前的 /** * confirmSet.headSet(deliveryTag+1)重發只需要獲得當前元素和之前的就行了 */ //小於e 之前的元素不包括e SortedSet ackSet=confirmSet.headSet(deliveryTag+1); ackSet.clear(); }else{ confirmSet.remove(deliveryTag); } //處理消息重發邏輯 } }); channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("returnListener"+replyCode); } }); for(int i=0;i<10;i++){ System.out.println("消息"+i+"正在發送"); channel.basicPublish("test","testRoutingKey",MessageProperties.PERSISTENT_BASIC,("滴滴"+i).getBytes()); confirmSet.add(channel.getNextPublishSeqNo());//獲得生成的序號 } Thread.sleep(10000); System.out.println("未確認"+confirmSet.size()); }
消息分發
/** * 消息分發 * 當一個隊列擁有多個消費者的時候.模式是通過輪詢的方式分發(有消費者n 當前第m條消息發送給 m%n的方式確認消費者) * 但是有個缺點,當某個消費者任務繁重來不及消費消息 則uack消息會堆疊再那里 導致整體消息處理吞吐量下降 *可以通過設置Qos 當uack消息到達一定限量后將不再給當前消息發送消息 每次ack后-1 才繼續發 * 此參數對拉模式的消費模式無效 */ // @Test public void basicQos() throws IOException, TimeoutException, InterruptedException { Channel channel=createChannel(); //內部會維護一個計數每推送一條消息+1 ack后-1 到達上限后將不推送 /** * 一個 channel可以定義多個消費者 重載可以通過global來確認是否是用於整個信道 * channel.basicQos(3,true); * channel.basicQos(5,false); * 如果設置了true和false呢 那么表示多個消費者最多收到3個 +起來不超過5 */ channel.basicQos(5);//默認0表示不限量 channel.basicConsume("testQueue",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println(new String(body)+"已消費,deliveryTag:"+envelope.getDeliveryTag()); //channel.basicAck(envelope.getDeliveryTag(),false); } }); //可以通過管理頁面發現unack是5 就沒有再接收消息了 Thread.sleep(5000); }