當前社區活躍度最好的消息中間件就是kafka和rabbitmq了,前面對kafaka的基礎使用做了一些總結,最近開始研究rabbitmq,查看了很多資料,自己仿着寫了一些demo,在博客園記錄一下。
rabbitmq基礎知識
關於rabbitmq基礎知識,可以看這篇博客,介紹的很詳細了:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html,這里分享一張核心概念圖
rabbitmq安裝
rabbitmq的安裝很簡單,我們可以根據自己的系統去網上找對應的安裝說明,這里我為了方便,采用docker鏡像的方式,我的虛擬機裝的是centos7。步驟如下:
1、啟動docker,關閉防火牆
2、拉取鏡像:docker pull rabbitmq,如需要管理界面:docker pull rabbitmq:management
3、執行指令啟動RabbitMQ
無管理界面:
docker run --hostname rabbit-host --name rabbit -d -p 5672:5672 rabbitmq
有管理界面:
docker run --hostname rabbit-host --name rabbit -d -p 5672:5672 -p 15672:15672 rabbitmq:management
4、啟動后輸入你的虛擬機地址+端口號15672,即可訪問到rabbitmq登錄界面,默認用戶名和密碼都是guest。
springboot與rabbitmq整合
IDE:STS,這是spring官方推薦的開發工具,構建springboot項目非常方便。JDK:1.8
1、pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>powerx.io</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>springboot-rabbitmq</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2、定義常量
package com.example.demo.constant; public interface QueueConstants { // 消息交換 String MESSAGE_EXCHANGE = "message.direct.myexchange"; // 消息隊列名稱 String MESSAGE_QUEUE_NAME = "message.myqueue"; // 消息路由鍵 String MESSAGE_ROUTE_KEY = "message.myroute"; // 死信消息交換 String MESSAGE_EXCHANGE_DL = "message.direct.dlexchange"; // 死信消息隊列名稱 String MESSAGE_QUEUE_NAME_DL = "message.dlqueue"; // 死信消息路由鍵 String MESSAGE_ROUTE_KEY_DL = "message.dlroute"; }
3、rabbitmq配置
package com.example.demo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.ExchangeBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.example.demo.constant.QueueConstants; @Configuration public class MyRabbitMqConfiguration { /** * 交換配置 * * @return */ @Bean public DirectExchange messageDirectExchange() { return (DirectExchange) ExchangeBuilder.directExchange(QueueConstants.MESSAGE_EXCHANGE) .durable(true) .build(); } /** * 消息隊列聲明 * * @return */ @Bean public Queue messageQueue() { return QueueBuilder.durable(QueueConstants.MESSAGE_QUEUE_NAME) .build(); } /** * 消息綁定 * * @return */ @Bean public Binding messageBinding() { return BindingBuilder.bind(messageQueue()) .to(messageDirectExchange()) .with(QueueConstants.MESSAGE_ROUTE_KEY); } }
4、生產者
package com.example.demo.producer; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.example.demo.constant.QueueConstants; @Component public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String str) { rabbitTemplate.convertAndSend(QueueConstants.MESSAGE_EXCHANGE, QueueConstants.MESSAGE_ROUTE_KEY, str); } }
5、消費者
package com.example.demo.consumer; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.example.demo.constant.QueueConstants; import com.rabbitmq.client.Channel; @Component public class MessageConsumer { @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME) public void processMessage(Channel channel,Message message) { System.out.println("MessageConsumer收到消息:"+new String(message.getBody())); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { } } }
6、控制器類
package com.example.demo.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.example.demo.producer.MessageProducer; @RestController public class TestController { @Autowired private MessageProducer messageProducer; @RequestMapping(value = "/index") public String index(String str) { // 將實體實例寫入消息隊列 messageProducer.sendMessage(str); return "Success"; } }
7、application.properties
#用戶名 spring.rabbitmq.username=guest #密碼 spring.rabbitmq.password=guest #服務器ip spring.rabbitmq.host=192.168.1.124 #虛擬空間地址 spring.rabbitmq.virtual-host=/ #端口號 spring.rabbitmq.port=5672
至此,springboot整合rabbitmq基本demo完畢,這里不再貼出演示截圖。
消息序列化
涉及網絡傳輸的應用序列化不可避免,發送端以某種規則將消息轉成 byte 數組進行發送,接收端則以約定的規則進行 byte[] 數組的解析,RabbitMQ 的序列化是指 Message 的 body 屬性,即我們真正需要傳輸的內容,RabbitMQ 抽象出一個 MessageConvert 接口處理消息的序列化,其實現有 SimpleMessageConverter(默認)、Jackson2JsonMessageConverter。
SimpleMessageConverter 對於要發送的消息體 body 為 byte[] 時不進行處理,如果是 String 則轉成字節數組,如果是 Java 對象,則使用 jdk 序列化將消息轉成字節數組,轉出來的結果較大,含class類名,類相應方法等信息。因此性能較差;當使用 RabbitMQ 作為中間件時,數據量比較大,此時就要考慮使用類似 Jackson2JsonMessageConverter 等序列化形式以此提高性能。
Jackson2JsonMessageConverter配置如下:
1、消息發送者設置序列化方式 :rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
2、消息消費者也應配置 MessageConverter 為 Jackson2JsonMessageConverter
@Configuration public class RabbitMQConfig { @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
3、消費消息
@Component @RabbitListener(queues = "consumer_queue") public class Receiver { @RabbitHandler public void processMessage1(@Payload User user) { System.out.println(user.getName()); } }
消息確認
RabbitMQ的消息確認有兩種。
一種是消息發送確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。
第二種是消費接收確認。這種是確認消費者是否成功消費了隊列中的消息。
消息發送確認
1、ConfirmCallback
確認消息發送成功,通過實現ConfirmCallBack接口,消息發送到交換器Exchange后觸發回調,使用該功能需要開啟確認,spring-boot中配置如下:
spring.rabbitmq.publisher-confirms = true
在MessageProducer.java加入如下代碼:
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> { System.out.println("消息唯一標識" + correlationData); System.out.println("消息確認結果" + ack); System.out.println("失敗原因" + cause); });
2、ReturnCallback
通過實現ReturnCallback接口,如果消息從交換器發送到對應隊列失敗時觸發(比如根據發送消息時指定的routingKey找不到隊列時會觸發),使用該功能需要開啟確認,spring-boot中配置如下:spring.rabbitmq.publisher-returns = true
在MessageProducer.java加入如下代碼:
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) ->{ System.out.println("消息主體message" + message); System.out.println("消息replyCode" + replyCode); System.out.println("消息replyText" + replyText); System.out.println("消息使用的交換器" + exchange); System.out.println("消息使用的路由鍵" + routingKey); });
消息消費確認
消費確認模式有三種:NONE、AUTO、MANUAL。
開啟手動確認需要在配置中加入:spring.rabbitmq.listener.direct.acknowledge-mode=manual
消息在處理失敗后將再次返回隊列,重新嘗試消費,如果再次失敗則直接拒絕。
實例代碼如下:
package com.example.demo.consumer; import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.example.demo.constant.QueueConstants; import com.rabbitmq.client.Channel; @Component public class MessageConsumer { @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME) public void processMessage(Channel channel, Message message) { System.out.println("MessageConsumer收到消息:" + new String(message.getBody())); try { //模擬消息處理失敗 int a = 3 / 0; // false只確認當前一個消息收到,true確認所有consumer獲得的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { System.out.println("消息已重復處理失敗,拒絕再次接收..."); try { channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//requeue為false,拒絕 } catch (IOException e1) { } } else { System.out.println("消息即將再次返回隊列處理..."); try { channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // requeue為true重新回到隊列 } catch (IOException e1) { } } } } }
死信隊列
DLX, Dead-Letter-Exchange。利用DLX, 當消息在一個隊列中變成死信(dead message)之后,它能被重新publish到另一個Exchange,這樣我們就可以重新去處理這個消息。DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性,當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange上去,進而被路由到另一個隊列,可以監聽這個隊列中消息做相應的處理 。消息變成死信一向有一下幾種情況:
- 消息被拒絕(basic.reject/ basic.nack)並且requeue=false
- 消息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時間))
- 隊列達到最大長度
利用DLX,我們可以實現消息的延遲消費,可參考:https://www.jianshu.com/p/b74a14c7f31d,還可以像我的demo那樣,對於有問題的消息進行重新處理,實例代碼如下
首先在MyRabbitMqConfiguration上加入如下配置:
@Bean DirectExchange messagedlDirect() { return (DirectExchange) ExchangeBuilder.directExchange(QueueConstants.MESSAGE_EXCHANGE_DL).durable(true) .build(); } @Bean Queue messagedlQueue() { return QueueBuilder.durable(QueueConstants.MESSAGE_QUEUE_NAME_DL) // 配置到期后轉發的交換 .withArgument("x-dead-letter-exchange", QueueConstants.MESSAGE_EXCHANGE) // 配置到期后轉發的路由鍵 .withArgument("x-dead-letter-routing-key", QueueConstants.MESSAGE_ROUTE_KEY).build(); } @Bean public Binding messageTtlBinding(Queue messagedlQueue, DirectExchange messagedlDirect) { return BindingBuilder.bind(messagedlQueue).to(messagedlDirect).with(QueueConstants.MESSAGE_ROUTE_KEY_DL); }
其次,修改我們的消息發送者,發送消息到我們新加入的交換器和路由鍵上,如下:
rabbitTemplate.convertAndSend(QueueConstants.MESSAGE_EXCHANGE_DL, QueueConstants.MESSAGE_ROUTE_KEY_DL, str);
新添加一個消費者,同時將原來的消費者的監聽隊列換成新加入的
package com.example.demo.consumer; import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.example.demo.constant.QueueConstants; import com.rabbitmq.client.Channel; @Component public class MessageConsumer { @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME_DL) public void processMessage(Channel channel, Message message) { System.out.println("MessageConsumer收到消息:" + new String(message.getBody())); try { //模擬消息處理失敗 int a = 3 / 0; // false只確認當前一個消息收到,true確認所有consumer獲得的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { System.out.println("消息已重復處理失敗,拒絕再次接收..."); try { channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//requeue為false,拒絕 } catch (IOException e1) { } } else { System.out.println("消息即將再次返回隊列處理..."); try { channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // requeue為true重新回到隊列 } catch (IOException e1) { } } } } }
package com.example.demo.consumer; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.example.demo.constant.QueueConstants; import com.rabbitmq.client.Channel; @Component public class MessageConsumer2 { @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME) public void processMessage(Channel channel,Message message) { System.out.println("MessageConsumer2收到消息:"+new String(message.getBody())); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { } } }
啟動項目,發送請求http://localhost:8082/index?str=asdfgh,可以看到后台日志如下:
rabbitmq支持四種交換器,同時還支持很多種插件,功能非常強大,這里我自己還沒親手用過,所以不再展開。