在上一篇隨筆中我們認識並安裝了RabbitMQ,接下來我們來看下怎么在Spring Boot 應用中整合RabbitMQ。
先給出最終目錄結構:
搭建步驟如下:
- 新建maven工程amqp
- 修改pom文件,引入spring-boot-starter-amqp和spring-boot-starter-test
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sam</groupId> <artifactId>amqp</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.1.RELEASE</version> </parent> <properties> <javaVersion>1.8</javaVersion> </properties> <dependencies> <!-- 引入amqp依賴,它能很好的支持RabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- 引入test依賴,這次需要用到JUnit --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> </project>
- 新建application.properties配置文件,主要就是配置下連接RabbitMQ的信息:
spring.application.name=rabbitmq-hello #config rabbitmq info spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
- 新建啟動類,這里沒什么特殊的,就是普通的spring boot啟動類
/** * 這里沒什么特殊的地方,就是普通的spring boot 配置 * */ @SpringBootApplication public class RabbitMQApp { public static void main(String[] args) { SpringApplication.run(RabbitMQApp.class, args); } }
- 創建生產者類,通過AmqpTemplate實現消息的發送,AmqpTemplate接口定義了一套針對AMQP協議的基礎操作。在Spring Boot中會根據配置來注入具體的實現。這里我們會產生一個字符串,並發送到名為hello的隊列中。
@Component public class Sender { @Autowired AmqpTemplate rabbitmqTemplate; /** * 發送消息 */ public void send() { String content = "Sender says:" + "'hello, I'm sender'"; System.out.println(content); rabbitmqTemplate.convertAndSend("hello", content); } }
- 創建消費者類,需要用到@RabbitListener來定義對hello隊列的監聽,並用@RabbitHandler注解來指定對消息處理的方法。我們這里實現了對hello隊列的消費。
/** * 通過@RabbitListener對hello隊列進行監聽 * */ @Component @RabbitListener(queues="hello") public class Receiver { /** * 通過@RabbitHandler聲明的方法,對hello隊列中的消息進行處理 */ @RabbitHandler public void receiver(String str) { System.out.println("Receiver says:[" + str + "]"); } }
- 編寫RabbitMQ的配置類,配置類可以配置隊列、交換器、路由等高級信息。我們這里為了簡單,只配置隊列,其他的采用默認配置。
/** * rabbitmq配置類, * 為了簡單,我們這里只配置了Queue * 至於exchanges、brokers等用的默認配置 * */ @Configuration public class RabbitConfig { @Bean public Queue helloQueue() { return new Queue("hello"); } }
- 編寫測試類,用來調用消息生產者
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes=RabbitMQApp.class) public class HelloTest { @Autowired private Sender sender; /** * 調用生產者進行消息發送 */ @Test public void hello() throws Exception{ sender.send(); } }
- 運行啟動類,啟動后控制台會有下面的提示內容:
- 執行測試類,在測試類的控制台會打印我們打的log內容
切換到amqp應用的控制台,能看到打印:
在管理頁面中我們能看到Connections和Channels中包含了當前連接的條目:
在整個生產和消費的過程中,生產和消費是一個異步操作,這是分布式系統中要使用消息代理的重要原因。