在RabbitMQ的環境搭建成功后,創建SpringBoot項目,通過一個簡單的案例來詳細的說明下RabbitMQ
的生產者消費者的模式。下面結合SpringBoot項目,來具體的說明下這部分的具體應用。
一、pom引入RabbitMQ
創建項目成功后,我們需要在pom.xml的文件里面來引入rabbitmq的jar,pom.xml文件詳細的內容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>mq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
二、生產者思路
RabbitMQ是MQ中核心的組件技術棧,生產者消費者的模型中也是非常重要的部分,在RabbitMQ中生產者
的應用程序並不關心隊列,生產者的任務只需要把MQ的消息發送到Exchange,而不關心並且也不知道Queue的
存在,至於Queue這部分是需要消費者來進行關心的。所以在操作RabbitMQ上,它的步驟具體總結如下:
- 先創建連接工廠的對象,也就是ConnectionFactory
- 然后配置連接MQ的地址,端口,賬戶以及密碼,和虛擬主機這部分
- 連接工廠接着創建連接對象,也就是Connection
- 接着使用連接對象來創建信道,也就是Channel
- 最后通過生產者的模式把MQ的消息發送到Exchange
生產者涉及到的源代碼具體如下:
package com.example.rabbitmq.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer
{
public static void main(String[] args) throws Exception {
//創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置連接mq的地址信息
connectionFactory.setHost("101.43.158.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
//連接工廠創建連接
Connection connection = connectionFactory.newConnection();
//通過connection來創建Channel
Channel channel = connection.createChannel();
//通過channel來發送具體的數據信息
String msg = "Hello RabbitMQ";
for (int i = 0; i < 5; i++) {
channel.basicPublish("saas", "", null, msg.getBytes());
System.out.println("發送數據成功:"+i);
}
//發送消息成功后,關閉具體的連接
channel.close();
connection.close();
}
}
三、消費者思路
在消費者中,和生產者前面的代碼基本是一致的,但是這部分需要特別強調的是在消費者的設計中,
它不需要關心Exchange,而消費者核心需要關注的是Queue這部分,也就是消費者的應用程序主要是通過
Queue里面讀取到MQ的數據,這部分代碼具體如下:
package com.example.rabbitmq.quickstart;
import com.rabbitmq.client.*;
public class Consumer
{
//定義exchange
private static final String EXCHANGE = "saas";
//定義隊列
private static final String queueName="saas";
public static void main(String[] args) throws Exception
{
try{
//創建連接工廠
ConnectionFactory connectionFactory=new ConnectionFactory();
//配置連接mq的地址信息
connectionFactory.setHost("101.43.158.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
// connectionFactory.setVirtualHost("/");
//連接工廠創建連接
Connection connection=connectionFactory.newConnection();
//通過connection來創建Channel
Channel channel=connection.createChannel();
//設置exchange類型為fanout
channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);
/*
定義一個隊列
* 一個隊列來接收數據后,消費端才可以從隊列里面來接收具體的數據
* param1:隊列名稱
* param2:是否持久化
* param3:隊列是否獨占此連接
* param4:隊列不再使用時是否自動刪除此隊列
* param5:隊列參數
* */
channel.queueDeclare(queueName,true,false,true,null);
channel.queueBind(queueName,EXCHANGE,"");
//創建一個消費者來消費數據
// QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
//2.0以后的版本修改為DefaultConsumer
DefaultConsumer consumer=new DefaultConsumer(channel)
{
@Override
public void handleDelivery(
String consumerTag,
com.rabbitmq.client.Envelope envelope,
AMQP.BasicProperties properties,
byte [] body) throws java.io.IOException
{
String message=new String(body);
System.out.println("接收到的消息為:"+message);
};
};
// 監聽隊列,從隊列中獲取數據
System.out.println("消費者程序啟動成功,准備接收生產者的數據:\n");
channel.basicConsume(queueName,consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
四、程序驗證
完成代碼成功后,下來執行具體的程序來進行驗證。這部分需要特別的強調下,在執行消費者程序前
不需要單獨的再創建exchange和queue,要不就會出現消費者的程序無法啟動,這點需要特別的注意,另外
一個就是被操作的賬戶必須就具備管理員的權限。如上代碼執行后,消費者以及生產者輸出結果信息如下:


如上,就可以看到消費者接收到生產者發送的數據。程序執行后,可以在RabbitMQ的WEB控制台可以看到具體
的數據,主要是Queue的部分,具體如下:


如上,可以看到控制台中消費者的數據。感謝您的閱讀,或許會持續更新RabbitMQ的技術棧的體系文章。
