前面的學習都是基於原生的api,下面我們使用spingboot來整合rabbitmq
springboot對rabbitmq提供了友好支持,極大的簡化了開發流程
引入maven
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
配置yml
rabbitmq: host: 47.102.103.232 port: 5672 username: admin password: admin virtual-host: /test publisher-confirms: true publisher-returns: true cache: channel: size: 10 listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 3 retry: enabled: true
這是基礎的配置,看不懂的配置后面會介紹
更詳細的配置參考官方https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-rabbitmq(搜索rabbit往下拉即可)
代碼實現
配置類
@Configuration public class RabbitConfig { @Bean public Queue helloQueue() { return new Queue("helloQueue"); }
//創建topic交換機 @Bean public TopicExchange helloExchange() { return new TopicExchange("helloExchange"); } @Bean public Binding bindingPaymentExchange(Queue helloQueue, TopicExchange helloExchange) { return BindingBuilder.bind(helloQueue).to(helloExchange).with("hello.#"); } /** * 定制化amqp模版
* connectionFactory:包含了yml文件配置參數 */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 必須設置為 true,不然當 發送到交換器成功,但是沒有匹配的隊列,不會觸發 ReturnCallback 回調 // 而且 ReturnCallback 比 ConfirmCallback 先回調,意思就是 ReturnCallback 執行完了才會執行 ConfirmCallback rabbitTemplate.setMandatory(true); // 設置 ConfirmCallback 回調 yml需要配置 publisher-confirms: true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { // 如果發送到交換器都沒有成功(比如說刪除了交換器),ack 返回值為 false // 如果發送到交換器成功,但是沒有匹配的隊列(比如說取消了綁定),ack 返回值為還是 true (這是一個坑,需要注意) if (ack) { String messageId = correlationData.getId(); System.out.println("confirm:"+messageId); } }); // 設置 ReturnCallback 回調 yml需要配置 publisher-returns: true // 如果發送到交換器成功,但是沒有匹配的隊列,就會觸發這個回調 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String messageId = message.getMessageProperties().getMessageId(); System.out.println("return:"+messageId); }); return rabbitTemplate; } }
回調機制
- 消息不管是否投遞到交換機都進行ConfirmCallback回調,投遞成功ack=true,否則為false
- 交換機匹配到隊列成功則不進行ReturnCallback回調,否則先進行ReturnCallback回調再進行ConfirmCallback回調
- 如果消息成功投遞到交換機,但沒匹配到隊列,則ConfirmCallback回調ack仍為true
生產者
@Component public class RbProducer { //注意一定要使用RabbitTemplate!! //雖然RabbitTemplate實現了AmqpTemplate 但是AmqpTemplate里並沒有能發送correlationData的方法 @Resource private RabbitTemplate rbtemplate; public void send1(String msg){ //CorrelationData用於confirm機制里的回調確認 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rbtemplate.convertAndSend("helloExchange", "hello.yj", msg,correlationData); } public void send2(User user){ CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rbtemplate.convertAndSend("helloExchange", "hello.yj", user,correlationData); } }
消費者
@Component @RabbitListener(queues = "helloQueue") public class RbConsumer { @RabbitLister(queues = "helloQueue") public void receive0(Message msg, Channel channel) throws IOException { System.out.println("consumer receive message0: " + msg); channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); } @RabbitHandler public void receive1(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException { System.out.println("consumer receive message1: " + msg); channel.basicAck(deliveryTag, false); } @RabbitHandler public void receive2(User user, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException { System.out.println("consumer receive message2: "+user); //如果發生以下情況投遞消息所有的通道或連接被突然關閉(包括消費者端丟失TCP連接、消費者應用程序(進程)掛掉、通道級別的協議異常)任何已經投遞的消息但是沒有被消費者端確認的消息會自動重新排隊。 //請注意,連接檢測不可用客戶端需要一段時間才會發現,所以會有一段時間內的所有消息會重新投遞 //因為消息的可能重新投遞,所有必須保證消費者端的接口的冪等。 //在RabbitMQ中影響吞吐量最大的參數是:消息確認模式和Qos預取值 //自動消息確認模式或設置Qos預取值為無限雖然可以最大的提高消息的投遞速度,但是在消費者端未及時處理的消息的數量也將增加,從而增加消費者RAM消耗,使用消費者端奔潰。所以以上兩種情況需要謹慎使用。 //RabbitMQ官方推薦Qos預取值設置在 100到300范圍內的值通常提供最佳的吞吐量,並且不會有使消費者奔潰的問題 channel.basicAck(deliveryTag, false); channel.basicQos(100); // 代表消費者拒絕一條或者多條消息,第二個參數表示一次是否拒絕多條消息,第三個參數表示是否把當前消息重新入隊 // channel.basicNack(deliveryTag, false, false); // 代表消費者拒絕當前消息,第二個參數表示是否把當前消息重新入隊 // channel.basicReject(deliveryTag,false); } }
@RabbitListener+@RabbitHandler:消費者監聽
使用@RabbitListener+@RabbitHandler組合進行監聽,監聽器會根據隊列發來的消息類型自動選擇處理方法
channel.basicAck(deliveryTag, false):手動確認機制
deliverTag:該消息的標識,每來一個消息該標識+1
multiple:第二個參數標識書否批量確認
requeue:被拒絕的是否重新入隊
channel.basicQos(100):最多未確認的消息數量為100,超過100隊列將停止給該消費者投遞消息
更多參數詳解參考https://www.cnblogs.com/piaolingzxh/p/5448927.html
測試
@RunWith(SpringRunner.class) @SpringBootTest(classes = TestBoot.class) public class TestRabbit { @Resource private RbProducer producer; @Test public void send1() { producer.send1("hello,im a string"); } @Test public void send2() { User user = new User(); user.setNickname("hello,im a object"); producer.send2(user); } }
成功消費


完結
下篇博客我們討論下在擁有了手動ack機制、confirm機制、return機制后,是否真的可靠~
