SpringBoot應用可以完成自動配置及依賴注入——可以通過Spring直接提供與MQ的連接對象
6.1 消息生產者
-
創建SpringBoot應用,添加依賴
- 配置application.yml
server:
port: 9001
spring:
application:
name: producer
rabbitmq:
host: 47.96.11.185
port: 5672
virtual-host: host1
username: ytao
password: admin123
發送消息
@Service public class TestService { @Resource private AmqpTemplate amqpTemplate; public void sendMsg(String msg){ //1. 發送消息到隊列 amqpTemplate.convertAndSend("queue1",msg); //2. 發送消息到交換機(訂閱交換機) amqpTemplate.convertAndSend("ex1","",msg); //3. 發送消息到交換機(路由交換機) amqpTemplate.convertAndSend("ex2","a",msg); } }
6.2 消息消費者
-
創建項目添加依賴
-
配置yml
-
接收消息
@Service //@RabbitListener(queues = {"queue1","queue2"}) @RabbitListener(queues = "queue1") public class ReceiveMsgService { @RabbitHandler public void receiveMsg(String msg){ System.out.println("接收MSG:"+msg); } //@RabbitHandler //public void receiveMsg(byte[] bs){ // //} }
-
傳遞的對象實現序列化接口
-
@Service public class MQService { @Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods){ //消息隊列可以發送 字符串、字節數組、序列化對象 amqpTemplate.convertAndSend("","queue1",goods); } }
消息消費者
@Component @RabbitListener(queues = "queue1") public class ReceiveService { @RabbitHandler public void receiveMsg(Goods goods){ System.out.println("Goods---"+goods); } }
-
傳遞的對象實現序列化接口
-
消息提供者
@Service public class MQService { @Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods){ //消息隊列可以發送 字符串、字節數組、序列化對象 byte[] bytes = SerializationUtils.serialize(goods); amqpTemplate.convertAndSend("","queue1",bytes); } }
消息消費者
@Component @RabbitListener(queues = "queue1") public class ReceiveService { @RabbitHandler public void receiveMsg(byte[] bs){ Goods goods = (Goods) SerializationUtils.deserialize(bs); System.out.println("byte[]---"+goods); } }
2.3 使用JSON字符串傳遞
要求:對象的屬性名一直
-
消息提供者
@Service public class MQService { @Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods) throws JsonProcessingException { //消息隊列可以發送 字符串、字節數組、序列化對象 ObjectMapper objectMapper = new ObjectMapper(); String msg = objectMapper.writeValueAsString(goods); amqpTemplate.convertAndSend("","queue1",msg); } }
消息消費者
@Component @RabbitListener(queues = "queue1") public class ReceiveService { @RabbitHandler public void receiveMsg(String msg) throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); Goods goods = objectMapper.readValue(msg,Goods.class); System.out.println("String---"+msg); } }
三、基於Java的交換機與隊列創建
我們使用消息隊列,消息隊列和交換機可以通過管理系統完成創建,也可以在應用程序中通過Java代碼來完成創建
3.1 普通Maven項目交換機及隊列創建
-
-
//1.定義隊列 (使用Java代碼在MQ中新建一個隊列) //參數1:定義的隊列名稱 //參數2:隊列中的數據是否持久化(如果選擇了持久化) //參數3: 是否排外(當前隊列是否為當前連接私有) //參數4:自動刪除(當此隊列的連接數為0時,此隊列會銷毀(無論隊列中是否還有數據)) //參數5:設置當前隊列的參數 channel.queueDeclare("queue7",false,false,false,null);
- 新建交換機
-
//定義一個“訂閱交換機” channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT); //定義一個“路由交換機” channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);
- 綁定隊列到交換機
-
//綁定隊列 //參數1:隊列名稱 //參數2:目標交換機 //參數3:如果綁定訂閱交換機參數為"",如果綁定路由交換機則表示設置隊列的key channel.queueBind("queue7","ex4","k1"); channel.queueBind("queue8","ex4","k2");
@Configuration public class RabbitMQConfiguration { //聲明隊列 @Bean public Queue queue9(){ Queue queue9 = new Queue("queue9"); //設置隊列屬性 return queue9; } @Bean public Queue queue10(){ Queue queue10 = new Queue("queue10"); //設置隊列屬性 return queue10; } //聲明訂閱模式交換機 @Bean public FanoutExchange ex5(){ return new FanoutExchange("ex5"); } //聲明路由模式交換機 @Bean public DirectExchange ex6(){ return new DirectExchange("ex6"); } //綁定隊列 @Bean public Binding bindingQueue9(Queue queue9, DirectExchange ex6){ return BindingBuilder.bind(queue9).to(ex6).with("k1"); } @Bean public Binding bindingQueue10(Queue queue10, DirectExchange ex6){ return BindingBuilder.bind(queue10).to(ex6).with("k2"); } }
四、消息的可靠性
4.1 RabbitMQ事務
channel.txSelect(); //開啟事務 try{ channel.basicPublish("ex4", "k1", null, msg.getBytes()); System.out.println("發送:" + msg); channel.txCommit(); //提交事務 }catch (Exception e){ channel.txRollback(); //事務回滾 }
4.2.1 普通Maven項目的消息確認
-
普通confirm方式
-
//1.發送消息之前開啟消息確認 channel.confirmSelect(); channel.basicPublish("ex1", "a", null, msg.getBytes()); //2.接收消息確認 boolean b = channel.waitForConfirms(); System.out.println("發送:" +(b?"成功":"失敗"));
- 批量confirm方式
-
//1.發送消息之前開啟消息確認 channel.confirmSelect(); //2.批量發送消息 for (int i=0 ; i<10 ; i++){ channel.basicPublish("ex1", "a", null, msg.getBytes()); } //3.接收批量消息確認:發送的所有消息中,如果有一條是失敗的,則所有消息發送直接失敗,拋出IO異常 boolean b = channel.waitForConfirms();
- 異步confirm方式
-
//發送消息之前開啟消息確認 channel.confirmSelect(); //批量發送消息 for (int i=0 ; i<10 ; i++){ channel.basicPublish("ex1", "a", null, msg.getBytes()); } //假如發送消息需要10s,waitForConfirms會進入阻塞狀態 //boolean b = channel.waitForConfirms(); //使用監聽器異步confirm channel.addConfirmListener(new ConfirmListener() { //參數1: long l 返回消息的表示 //參數2: boolean b 是否為批量confirm public void handleAck(long l, boolean b) throws IOException { System.out.println("~~~~~消息成功發送到交換機"); } public void handleNack(long l, boolean b) throws IOException { System.out.println("~~~~~消息發送到交換機失敗"); } });
4.2.2 普通Maven項目的return機制
-
添加return監聽器
-
-
由於監聽器監聽是異步處理,所以在消息發送之后不能關閉channel
String msg = "Hello HuangDaoJun!"; Connection connection = ConnectionUtil.getConnection(); //相當於JDBC操作的數據庫連接 Channel channel = connection.createChannel(); //相當於JDBC操作的statement //return機制:監控交換機是否將消息分發到隊列 channel.addReturnListener(new ReturnListener() { public void handleReturn(int i, String s, String s1, String s2,AMQP.BasicProperties basicProperties,byte[] bytes) throws IOException { //如果交換機分發消息到隊列失敗,則會執行此方法(用來處理交換機分發消息到隊列失敗的情況) System.out.println("*****"+i); //標識 System.out.println("*****"+s); // System.out.println("*****"+s1); //交換機名 System.out.println("*****"+s2); //交換機對應的隊列的key System.out.println("*****"+new String(bytes)); //發送的消息 } }); //發送消息 //channel.basicPublish("ex2", "c", null, msg.getBytes()); channel.basicPublish("ex2", "c", true, null, msg.getBytes());
4.3 在SpringBoot應用實現消息確認與return監聽
spring:
rabbitmq:
publisher-confirm-type: simple ## 開啟消息確認模式
publisher-returns: true ##使用return監聽機制
4.3.2 創建confirm和return監聽
@Component public class MsgConfirmAndReturn implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { Logger logger = LoggerFactory.getLogger(MsgConfirmAndReturn.class); @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { //此方法用於監聽消息確認結果(消息是否發送到交換機) if(b){ logger.info("-------消息成功發送到交換機"); }else{ logger.warn("-------消息發送到交換機失敗"); } } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { //此方法用於return監聽(當交換機分發消息到隊列失敗時執行) logger.warn("~~~~~~~交換機分發消息到隊列失敗"); } }
五、延遲機制
5.1 延遲隊列
-
延遲隊列——消息進入到隊列之后,延遲指定的時間才能被消費者消費
-
AMQP協議和RabbitMQ隊列本身是不支持延遲隊列功能的,但是可以通過TTL(Time To Live)特性模擬延遲隊列的功能
-
TTL就是消息的存活時間。RabbitMQ可以分別對隊列和消息設置存活時間
-
-
創建消息隊列沒有設置TTL,但是消息設置了TTL,那么當消息的存活時間結束,也會被移除;
-
5.2 使用延遲隊列實現訂單支付監控
5.2.2 創建交換機和隊列
2.創建消息隊列
3.創建死信隊列
4.隊列綁定