1、RabbitMQ交換機的作用:
生產者發送消息不會像傳統方式直接將消息投遞到隊列中,而是先將消息投遞到交換機中,在由交換機轉發到具體的隊列,隊列再將消息以推送或者拉取方式給消費者進行消費。交換機的作用根據具體的路由策略分發到不同的隊列中。
2、RabbitMQ的Exchange(交換器)分為四種類型:
direct(默認)、headers、fanout、topic。其中headers交換器允許你匹配AMQP消息的header而非路由鍵,除此之外headers交換器和direct交換器完全一致,但性能卻很差,幾乎用不到,所以本文也不做講解。fanout、topic交換器是沒有歷史數據的,也就是說對於中途創建的隊列,獲取不到之前的消息。
2.1 Direct Exchange(直連交換器):
1)也是默認的交換機,是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的
2)直連交換機的特性:
a.公平調度:當接收端訂閱者有多個的時候,direct會輪詢公平的分發給每個訂閱者(訂閱者消息確認正常);
b.消息的發后既忘特性:發后既忘模式是指接受者不知道消息的來源,如果想要指定消息的發送者,需要包含在發送內容里面,這點就像我們在信件里面注明自己的姓名一樣,只有這樣才能知道發送者是誰;
c.消息確認:消息接收到之后必須使用channel.basicAck()方法手動確認(非自動確認刪除模式下);
c.1如果應用程序接收了消息,缺忘記確認接收的話(可能因為bug),消息在隊列的狀態會從“Ready”變為“Unacked”;
c.2如果消息收到卻未確認,Rabbit將不會再給這個應用程序發送更多的消息了,這是因為Rabbit認為你沒有准備好接收下一條消息。此條消息會一直保持Unacked的狀態,直到你確認了消息,或者斷開與Rabbit的連接,Rabbit會自動把消息改回Ready狀態,分發給其他訂閱者。當然你可以利用這一點,讓你的程序延遲確認該消息,直到你的程序處理完相應的業務邏輯,這樣可以有效的防治Rabbit給你過多的消息,導致程序崩潰。
d.消息拒絕:消息在確認之前,可以有兩個選擇:
d.1 斷開與Rabbit的連接,這樣Rabbit會重新把消息分派給另一個消費者;
d.2 拒絕Rabbit發送的消息使用channel.basicReject(long deliveryTag, boolean requeue)
參數說明:【參數1:消息的id;
參數2:處理消息的方式;
如果是true,Rabbib會重新分配這個消息給其他訂閱者,如果設置成false的話,Rabbit會把消息發送到一個特殊的“死信”隊列,用來存放被拒絕而不重新放入隊列的消息;
】
2.2 Fanout Exchange(廣播交換器):Fanout 中文意思為 扇出
1)fanout有別於direct交換器,fanout是一種發布/訂閱模式的交換器,當你發送一條消息的時候,交換器會把消息廣播到所有附加到這個交換器的隊列上。
比如用戶上傳了自己的頭像,這個時候圖片需要清除緩存,同時用戶應該得到積分獎勵,你可以把這兩個隊列綁定到圖片上傳的交換器上,這樣當有第三個、第四個上傳完圖片需要處理的需求的時候,原來的代碼可以不變,只需要添加一個訂閱消息即可,這樣發送方和消費者的代碼完全解耦,並可以輕而易舉的添加新功能了。
2)廣播交換機的特點(也可以說是與直連交換器的區別)
a.在發送消息時需新增channel.exchangeDeclare(ExchangeName, "fanout"),這行代碼聲明fanout交換器;
b.在接受消息時需要聲明fanout路由器,並且fanout需要綁定隊列到對應的交換器用於訂閱消息;
channel.queueDeclare().getQueue()為隨機隊列,Rabbit會隨機生成隊列名稱,一旦消費者斷開連接,該隊列會自動刪除。
注意:對於fanout交換器來說routingKey(路由鍵)是無效的,這個參數是被忽略的。
2.3 Topic Exchange(主題交換器):是一種匹配訂閱模式
1)topic交換器運行和fanout類似,但是可以更靈活的匹配自己想要訂閱的信息,這個時候routingKey路由鍵就排上用場了,使用路由鍵進行消息(規則)匹配。
2)假設我們現在有一個日志系統,會把所有日志級別的日志發送到交換器,warning、log、error、fatal,但我們只想處理error以上的日志,要怎么處理?這就需要使用topic路由器了。topic路由器的關鍵在於定義路由鍵,定義routingKey名稱不能超過255字節。主題交換器使用“.”作為分隔符;"*"匹配一個分段(用“.”分割)的內容; "#"匹配0和多個字符;
示列:例如發布了一個“com.mq.rabbit.error”的消息:
能匹配上的路由鍵:
cn.mq.rabbit.*
cn.mq.rabbit.#
#.error
cn.mq.#
#
不能匹配上的路由鍵:
cn.mq.*
*.error
*
所以如果想要訂閱所有消息,可以使用“#”匹配。
注意:fanout、topic交換器是沒有歷史數據的,也就是說對於中途創建的隊列,獲取不到之前的消息。
3、RabbitMQ 直連交換機、廣播交換機、主題交換機使用案例:
3.1 依賴引入:
<!--rabbitmq依賴引用-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
mq.properties文件配置:
rabbitmq.address=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
3.2 代碼示列:
1)生產和消費請求:RabbitMqController

package com.zj.weblearn.controller.rabbitmq; import com.zj.weblearn.serviceImpl.rabbitmq.DirectExchangeServiceImpl; import com.zj.weblearn.serviceImpl.rabbitmq.FanoutExchangeServiceImpl; import com.zj.weblearn.serviceImpl.rabbitmq.TopicExchangeServiceImpl; import com.zj.weblearn.utils.ResponseDTO; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.servlet.http.HttpServletRequest; import java.util.Arrays; import java.util.List; /* * @Copyright (C), 2002-2020, * @ClassName: MqController * @Author: * @Date: 2020/9/23 9:49 * @Description: * @History: * @Version:1.0 */ @Controller @RequestMapping("/mq/") public class RabbitMqController { @Autowired DirectExchangeServiceImpl directExchangeServiceImpl; @Autowired FanoutExchangeServiceImpl fanoutExchangeServiceImpl; @Autowired TopicExchangeServiceImpl topicExchangeServiceImpl; private final static String CANCLED_ORDER_QUEUE = "cancled_order_queue"; private final static String CANCLED_ORDER_FAIR_QUEUE = "cancled_order_queue"; /** * @Method: * @Author: * @Description: 將退貨的訂單通過直連交換器(Direct Exchange)讓入消費隊里中 * 訪問路徑:http://localhost:8080/mq/sendCancleOrderToRabbitMqQueue.do?orderNos=1111111,2222222 * param: * @Return: * @Exception: * @Date: 2020/12/8 14:18 */ @RequestMapping("/sendCancleOrderToRabbitMqQueue") @ResponseBody public Object saveOrderInfoToQueue(HttpServletRequest request) { String orderNos = request.getParameter("orderNos"); List orderList = null; if (StringUtils.isNotEmpty(orderNos)) { orderList = Arrays.asList(orderNos.split(",")); } return directExchangeServiceImpl.productMessageByDirectExchange(orderList, CANCLED_ORDER_QUEUE); } /** * @Method: * @Author: * @Description: 通過直連交換器(Direct Exchange)消費 隊列 cancle_order_queue 消費消息 * http://localhost:8080/mq/consumerQueueInfo.do * param: * @Return: * @Exception: * @Date: 2020/12/8 15:07 */ @RequestMapping("/consumerQueueOnNoFairWay") @ResponseBody public Object toConsumeQueueInfo() { return directExchangeServiceImpl.consumeMessageByDirectExchange(CANCLED_ORDER_QUEUE); } //http://localhost:8080/mq/saveOrderToQueueFairForward.do /** * @Method: * @Author: * @Description: 通過直連交換器(Direct Exchange)創建消費隊列,但是采用 RabbitMQ的公平轉發進行消費 * * param: * @Return: * @Exception: * @Date: 2020/12/8 15:32 */ @RequestMapping("/sendCancleOrderToRabbitMqFairQueue") @ResponseBody public Object saveOrderToQueueFairForward(HttpServletRequest request) { String orderNos = request.getParameter("orderNos"); List orderList = null; if (StringUtils.isNotEmpty(orderNos)) { orderList = Arrays.asList(orderNos.split(",")); } return directExchangeServiceImpl.productMessQueueOnFairForward(CANCLED_ORDER_FAIR_QUEUE,orderList); } @RequestMapping("/consumerQueueOnFairWay") @ResponseBody public Object consumerQueueOnFairWay() { ResponseDTO responseDTO= directExchangeServiceImpl.consumerMessageOnFariWay(CANCLED_ORDER_FAIR_QUEUE); ResponseDTO responseDTO2= directExchangeServiceImpl.consumerMessageOnFariWay(CANCLED_ORDER_FAIR_QUEUE); if(!responseDTO.isSuccess()){ return responseDTO; }else if(!responseDTO2.isSuccess()){ return responseDTO2; } return responseDTO2; } }
2)創建連接工具方法:RabbitMqConnectionUtils

package com.zj.weblearn.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /* * @Copyright (C), 2002-2020, * @ClassName: RabbitMqConnectionUtils * @Author: * @Date: 2020/9/23 9:42 * @Description: * @History: * @Version:1.0 */ public class RabbitMqConnectionUtils { public static Connection getConnection() { //創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務器地址 factory.setHost(ReadPropertiesUtils1.getValue("rabbitmq.address")); //設置端口號 factory.setPort(Integer.parseInt(ReadPropertiesUtils1.getValue("rabbitmq.port"))); //設置用戶名 factory.setUsername(ReadPropertiesUtils1.getValue("rabbitmq.username")); //設置密碼 factory.setPassword(ReadPropertiesUtils1.getValue("rabbitmq.password")); //設置vhost factory.setVirtualHost("/"); try { //創建連接 return factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } }
3)通過直連交換器生產和消費消息:DirectExchangeServiceImpl

package com.zj.weblearn.serviceImpl.rabbitmq; import com.rabbitmq.client.*; import com.zj.weblearn.enums.ErrorCodeEnum; import com.zj.weblearn.utils.RabbitMqConnectionUtils; import com.zj.weblearn.utils.ResponseDTO; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeoutException; /* * @Copyright (C), 2002-2020, * @ClassName: DirectExchangeServiceImpl * @Author: * @Date: 2020/9/23 20:39 * @Description: * @History: * @Version:1.0 */ @Service public class DirectExchangeServiceImpl { /** * @Method: * @Author: * @Description: 通過直連交換器(Direct Exchange)向 queueName 中生產消息 * param: * @Return: * @Exception: * @Date: 2020/9/23 20:43 */ public ResponseDTO productMessageByDirectExchange(List<String> orderNos, String queueName) { ResponseDTO responseDTO = new ResponseDTO(ErrorCodeEnum.OK); //1、獲取連接 Connection connection = RabbitMqConnectionUtils.getConnection(); Channel channel = null; try { //2、創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務 channel = connection.createChannel(); //3、聲明隊列 如果Rabbit中沒有此隊列將自動創建。 //【方法入參: // 參數1:隊列的名稱; // 參數2:是否持久化,代表隊列在服務器重啟后是否還存在; // 參數3:是否獨占此鏈接,是否是排他性隊列。排他性隊列只能在聲明它的 Connection中使用(可以在同一個 Connection 的不同的 channel 中使用),連接斷開時自動刪除; // 參數4:隊列不再使用時是否自動刪除; // 參數5:隊列的其他屬性 Map<String, Object> arguments // 】 channel.queueDeclare(queueName, false, false, false, null); //4、發送消息: // 【方法入參: // 參數1: Exchange的名稱,如果沒有指定,則使用Default Exchange; // 參數2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列; // 參數3:消息包含的屬性; // 參數4:消息體 // 】 //這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定哪個默認的交換機,但是不能顯示綁定或解除綁定認的交換機,routingKey(路由鍵)等於隊列名稱 for (String orderNo : orderNos) { channel.basicPublish("", queueName, null, orderNo.getBytes()); } System.out.println("message send body orderNos:" + orderNos); channel.close(); connection.close(); } catch (IOException e) { responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501); responseDTO.setErrorMsg(e.toString()); } catch (TimeoutException e) { responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501); responseDTO.setErrorMsg(e.toString()); } return responseDTO; } /** * @Method: * @Author: * @Description: 通過直連交換器(Direct Exchange)消費 隊列queueName 消費消息 * param: * @Return: * @Exception: * @Date: 2020/9/23 20:43 */ public ResponseDTO consumeMessageByDirectExchange(String queueName) { ResponseDTO responseDTO = new ResponseDTO(ErrorCodeEnum.OK); //1、得到連接 Connection connection = RabbitMqConnectionUtils.getConnection(); Channel channel = null; try { //2、創建一個通道 channel = connection.createChannel(); //3、定義消費方法 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //得到交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息內容 String message = new String(body, "utf-8"); System.out.println("Consumer consumption news>>" + message); } }; //4、監聽隊列 // 【方法入參: // 參數1: 隊列名稱; // 參數2: 是否自動確認收到 // 設置為true表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息, // 設置為false則需要手動回復; // 參數3: 消費消息的方法,消費者接收到消息后調用此方法 // 】 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501); responseDTO.setData(e.toString()); } return responseDTO; } /* * 目前消息轉發機制是平均分配,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當消息到達隊列進行轉發消息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。 為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。 換句話說,只有在消費者空閑的時候會發送下一條信息。調度分發消息的方式,也就是告訴RabbitMQ每次只給消費者處理一條消息,也就是等待消費者處理完畢並自己對剛剛處理的消息進行確認之后,才發送下一條消息,防止消費者太過於忙碌,也防止它太過去清閑。通過 設置channel.basicQos(1); * */ public ResponseDTO productMessQueueOnFairForward(String queueName, List<String> orderNos) { ResponseDTO responseDTO = new ResponseDTO(ErrorCodeEnum.OK); //1、得到連接 Connection connection = RabbitMqConnectionUtils.getConnection(); //2、創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務 Channel channel = null; try { channel = connection.createChannel(); //3、聲明隊列 如果Rabbit中沒有此隊列將自動創建 channel.queueDeclare(queueName, false, false, false, null); channel.basicQos(1);// 保證一次只分發一次 限制發送給同一個消費者 不得超過一條消息 for (String orderNo : orderNos) { //4、發送消息【參數說明:參數一:交換機名稱如果沒有指定,則使用Default Exchange;參數二:隊列名稱,參數三:消息的其他屬性-路由的headers信息;參數四:消息主體】 channel.basicPublish("", queueName, null, orderNo.getBytes()); } System.out.println("message send over :" + orderNos); channel.close(); connection.close(); } catch (IOException e) { responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501); responseDTO.setData(e.toString()); } catch (TimeoutException e) { responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501); responseDTO.setData(e.toString()); } return responseDTO; } /** * @Method: * @Author: * @Description: param: * @Return: * @Exception: * @Date: 2020/12/8 14:26 */ public ResponseDTO consumerMessageOnFariWay(String queueName) { ResponseDTO responseDTO = new ResponseDTO(ErrorCodeEnum.OK); //1、得到連接 Connection connection = RabbitMqConnectionUtils.getConnection(); //2、創建一個通道 Channel channel = null; try { channel = connection.createChannel(); //3、聲明隊列 channel.basicQos(1);// 保證一次只分發一次 限制發送給同一個消費者 不得超過一條消息 //4、定義消費方法 Channel finalChannel = channel; DefaultConsumer consumer = new DefaultConsumer(finalChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //得到交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息內容 String message = new String(body, "utf-8"); System.out.println("消費者消費:" + message); try { //睡眠1s Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 手動回執消息 finalChannel.basicAck(envelope.getDeliveryTag(), false); } } }; //5、監聽隊列(入參依次為:隊列名稱;設置為true表示消息接收到自動向mq回復接收到了,設置為false則需要手動回復;消費消息的方法,消費者接收到消息后調用此方法;) channel.basicConsume(queueName, false, consumer); } catch (IOException e) { responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501); responseDTO.setData(e.toString()); } return responseDTO; } }
4)通過廣播交換器生產和消費消息:FanoutExchangeServiceImpl

package com.zj.weblearn.serviceImpl.rabbitmq; import com.rabbitmq.client.*; import com.zj.weblearn.utils.RabbitMqConnectionUtils; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.concurrent.TimeoutException; /* * @Copyright (C), 2002-2020, * @ClassName: FanoutExchangeServiceImpl * @Author: * @Date: 2020/9/23 20:26 * @Description: * @History: * @Version:1.0 */ @Service public class FanoutExchangeServiceImpl { final String fanoutExchangeName = "fanoutExchange"; //交換器名稱(廣播交換器) /** * @Method: * @Author: * @Description: 通過廣播交換器(Fanout Exchange)生產消息 * param: * @Return: * @Exception: * @Date: 2020/9/23 20:27 */ public boolean byFanoutExchangeProductMessage(String orderMsg, String queueName) { boolean productResult = false; //1、得到連接 Connection connection = RabbitMqConnectionUtils.getConnection(); Channel channel = null; try { //2、創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務 channel = connection.createChannel(); //3、聲明交換器為廣播交換器(是一種發布訂閱交換器) channel.exchangeDeclare(fanoutExchangeName, "fanout"); //4、聲明隊列 如果Rabbit中沒有此隊列將自動創建 channel.queueDeclare(queueName, false, false, false, null); //5、發送消息 channel.basicPublish(fanoutExchangeName, queueName, null, orderMsg.getBytes()); System.out.println("message send body:" + orderMsg); try { channel.close(); } catch (TimeoutException e) { e.printStackTrace(); } connection.close(); productResult = true; } catch (IOException e) { e.printStackTrace(); } return productResult; } /** * @Method: * @Author: * @Description: 通過廣播交換器(Fanout Exchange)消費消息 * param: * @Return: * @Exception: * @Date: 2020/9/23 20:30 */ public void byFanoutExchangeConsumeMessage() { //1、得到連接 Connection connection = RabbitMqConnectionUtils.getConnection(); Channel channel = null; try { //2、創建一個通道 channel = connection.createChannel(); //3、聲明fanout交換器 channel.exchangeDeclare(fanoutExchangeName, "fanout"); //4、聲明隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, fanoutExchangeName, ""); //5、定義消費方法 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { String message = new String(body, "UTF-8"); } }; //6、監聽隊列(入參依次為:隊列名稱;設置為true表示消息接收到自動向mq回復接收到了,設置為false則需要手動回復;消費消息的方法,消費者接收到消息后調用此方法;) channel.basicConsume(queueName, true, consumer);// } catch (IOException e) { e.printStackTrace(); } } }
5)通過主題交換器生產和消費消息:TopicExchangeServiceImpl

package com.zj.weblearn.serviceImpl.rabbitmq; import com.rabbitmq.client.*; import com.zj.weblearn.utils.RabbitMqConnectionUtils; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.concurrent.TimeoutException; /* * @Copyright (C), 2002-2020, * @ClassName: TopicExchangeServiceImpl * @Author: * @Date: 2020/9/23 20:33 * @Description: * @History: * @Version:1.0 */ @Service public class TopicExchangeServiceImpl { final String topicExchangeName = "topicExchange"; //主題交換器名稱 /** * @Method: * @Author: * @Description: 通過主題交換器(Topic Exchange)生產消息 * param: * @Return: * @Exception: * @Date: 2020/9/23 20:27 */ public boolean byTopicExchangeProductMessage(String orderMsg, String queueName) { boolean productResult = false; //1、得到連接 Connection connection = RabbitMqConnectionUtils.getConnection(); Channel channel = null; try { //2、創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務 channel = connection.createChannel(); //3、聲明交換器為主題交換器(是一種匹配訂閱模式) channel.exchangeDeclare(topicExchangeName, "topic"); //4、聲明隊列 如果Rabbit中沒有此隊列將自動創建 channel.queueDeclare(queueName, false, false, false, null); //5、發送消息 channel.basicPublish(topicExchangeName, queueName, null, orderMsg.getBytes()); System.out.println("message send body:" + orderMsg); try { channel.close(); } catch (TimeoutException e) { e.printStackTrace(); } connection.close(); productResult = true; } catch (IOException e) { e.printStackTrace(); } return productResult; } /** * @Method: * @Author: * @Description: 通過主題交換器(Topic Exchange)消費消息 * param: * @Return: * @Exception: * @Date: 2020/9/23 20:27 */ public void byTopicExchangeConsumeMessage(){ //1、得到連接 Connection connection = RabbitMqConnectionUtils.getConnection(); Channel channel = null; try { //2、創建一個通道 channel = connection.createChannel(); //3、聲明topic交換器 channel.exchangeDeclare(topicExchangeName, "topic"); //4、聲明隊列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "#.error"; channel.queueBind(queueName, topicExchangeName, routingKey); //5、定義消費方法 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(routingKey + "|接收消息 => " + message); } }; //6、監聽隊列(入參依次為:隊列名稱;設置為true表示消息接收到自動向mq回復接收到了,設置為false則需要手動回復;消費消息的方法,消費者接收到消息后調用此方法;) channel.basicConsume(queueName, true, consumer);// } catch (IOException e) { e.printStackTrace(); } } }
6)相關的實體類:

import com.zj.weblearn.enums.BaseEnum; import com.zj.weblearn.enums.ErrorCodeEnum; import java.io.Serializable; public class ResponseDTO<T> implements Serializable { private boolean success; private String errorCode; /** * 原因 */ private String errorMsg; /** * 返回數據值 */ private T data; public ResponseDTO() { } public ResponseDTO(ErrorCodeEnum errorCode) { this.errorCode = errorCode.getErrorCode(); this.errorMsg = errorCode.getErrorMsg(); if(!"0".equals(errorCode.getErrorCode())){ success=false; }else{ success=true; } } public ResponseDTO(String errorCode, String errorMsg) { this.errorCode = errorCode; this.errorMsg = errorMsg; if(!"0".equals(errorCode)){ success=false; }else{ success=true; } } public void setErrorCodeEnum(BaseEnum errorCode) { this.errorCode = errorCode.getErrorCode(); this.errorMsg = errorCode.getErrorMsg(); if(!"0".equals(errorCode.getErrorCode())){ success=false; }else{ success=true; } } public String getErrorCode() { return errorCode; } public void setErrorCode(String errorCode) { this.errorCode = errorCode; } public String getErrorMsg() { return errorMsg; } public void setErrorMsg(String errorMsg) { this.errorMsg = errorMsg; } public T getData() { return data; } public void setData(T data) { this.data = data; } public boolean isSuccess() { return success; } public void setSuccess(boolean success) { this.success = success; } @Override public String toString() { return "ResponseDTO{" + "errorCode='" + errorCode + '\'' + ", errorMsg='" + errorMsg + '\'' + ", data=" + data + '}'; } } public enum ErrorCodeEnum implements BaseEnum{ OK("0", "成功"), FAIL_500("500", "系統開小差了,請稍后再試!"), FAIL_501("501", "服務異常,請聯系管理員處理!"), PARAM_ERROR("502", "入參異常,請檢查后重試!"); private String errorCode; private String errorMsg; ErrorCodeEnum(String errorCode, String errorMsg) { this.errorCode = errorCode; this.errorMsg = errorMsg; } ErrorCodeEnum(BaseEnum errorCodeEnum) { this.errorCode = errorCodeEnum.getErrorCode(); this.errorMsg = errorCodeEnum.getErrorMsg(); } public String getErrorCode() { return errorCode; } public String getErrorMsg() { return errorMsg; } }
4、RabbitMQ的五種形式隊列
1)簡單隊列(點對點):一個生產者P發送消息到隊列Q,一個消費者C接收。
點對點模式(一對一模式):一個生產者投遞消息給隊列,只能允許有一個消費者進行消費。如果是消費集群的話,會進行均攤消費。
推(由隊列指向消費者):消費者已經啟動了,建立長連接,一旦生產者向隊列投遞消息會立馬推送給消費者;
取(由消費者指向隊列):生產者先投遞消息到隊列進行緩存,這時候消費者在啟動的時候就向隊列中獲取消息
隊列:以先進先出原則存放消息集合;
2)工作(公平性)隊列模式
work queues與簡單隊列相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。
應用場景:對於 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
公平隊列原理:隊列服務器向消費者發送消息的時候,消費者采用手動應答模式,隊列服務器必須要收到消費者發送ack結果通知,才會發送下一個消息。
默認消費者集群為均攤消費。假設生產者向隊列發送10個消息,消息1和2都各自消費5個,保證消費唯一。
思考:均攤消費弊端-如果每個消費者處理消息的業務時間情況不相同,可能對消息處理比較慢的消費者不公平。應該能這多勞,誰消費快,就讓其多消費消息。
3)發布/訂閱模式(Publish/Subscribe):這個可能是消息隊列中最重要的隊列了,其他的都是在它的基礎上進行了擴展。發布訂閱模式說明:
(1)一個生產者,多個消費者
(2)每一個消費者都有自己的一個隊列,並對其進行監聽;
(3)生產者沒有直接發消息到隊列中,而是發送到交換機;
(4)每個消費者的隊列都綁定到交換機上;
(5)消息通過交換機到達每個消費者的隊列 該模式就是Fanout Exchange(廣播交換機)將消息路由給綁定到它身上的所有隊列 以用戶發郵件案例講解
注意:交換機沒有存儲消息功能,如果消息發送到沒有綁定消費隊列的交換機,消息則丟失。生產者將消息發給broker(消息隊列服務),由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息;
4)路由模式Routing:
(1)每個消費者監聽自己的隊列,並且設置routingkey。
(2)生產者發送消息到交換機並指定一個路由key,消費者隊列綁定到交換機時要指定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
5)通配符模式Topics
(1)每個消費者監聽自己的隊列,並且設置帶統配符的routingkey。
(2)生產者P發送消息到交換機X,type=topic,交換機根據綁定隊列的routing key的值進行通配符匹配,由交換機根據routingkey來轉發消息到指定的隊列。
符號#:匹配一個或者多個詞error.# 可以匹配error.order或者error.order.cancle
符號*:只能匹配一個詞error.* 可以匹配error.order或者error.ceshi
參看文章:https://www.cnblogs.com/vipstone/p/9295625.html