SpringBoot结合RabbitMq
一、SpringBoot 框架部署
1.创建Maven工程(我用的IDEA)
File[文件] -> New[新建] -> Project[工程] -> Maven[选择Maven] -> Next[直接下一步] -> Name[输入项目名称] —> Finish[完成]
2.在项目里创建两个子工程
Producer 消息生产者
项目名称位置右键 -> New[新建] -> Module[组件] -> Maven[选择Maven] -> Next[下一步] -> Name[输入Producer] —> Finish[完成]
Consumer 消息消费者
项目名称位置右键 -> New[新建] -> Module[组件] -> Maven[选择Maven] -> Next[下一步] -> Name[输入Consumer] —> Finish[完成]
3.在主项目工程的pom文件里填写依赖(注意是主项目 两个子项目会继承父项目的依赖)
<!--Spring Boot依赖-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.8.RELEASE</version>
</parent>
<dependencies>
<!--spring-boot-starter-amqp依赖 [重要]-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
<!--下面三个依赖是为了方便控制台输出Log [一般]-->
<!--junit:junit 单元测试框架 用了都说好-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<!--org.projectlombok:lombok 整合注解-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<!--ch.qos.logback:logback-classic 日志框架-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!--spring-boot-starter-test SpringBootTest-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
</dependencies>
4.分别在两个子项目中创建application.yml文件
注意.yml文件需下载 YAML 插件
resource文件夹右键 -> New[新建] -> File[文件] -> Name[输入application.yml] —> Finish[完成]
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: username 改成你自己的账号
password: password 改成你自己的密码
server:
port: 服务器端口号 两个子工程不能一致
5.Producer生产者创建启动类
Java文件夹右键 -> New[新建] -> Java Class[类文件] -> Name[com.rabbit.producer.ProviderRabbitApplication.java] —> Finish[完成]
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 生产者启动类
*/
@SpringBootApplication
public class ProviderRabbitApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderRabbitApplication.class,args);
}
}
6.Consumer生产者创建启动类
Java文件夹右键 -> New[新建] -> Java Class[类文件] -> Name[com.rabbit.consumer.ConsumerRabbitMqApplication.java] —> Finish[完成]
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 消费者启动类
*/
@SpringBootApplication
public class ConsumerRabbitMqApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerRabbitMqApplication.class,args);
}
}
二、HelloWorld 简单模式
1.创建RabbitMqConfig.java文件 绑定交换机和队列
Java文件夹右键 -> New[新建] -> Java Class[类文件] -> Name[com.rabbit.producer.config.RabbitMqConfig.java] —> Finish[完成]
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq 配置类
*/
@Configuration
public class RabbitMqConfig {
public static final String SIMPLE_EXCHANGE_NAME = "simple_exchange";
public static final String SIMPLE_QUEUE_NAME = "simple_queue";
/**
* 创建 交换机
* @return
*/
@Bean
public Exchange simpleExchange(){
return ExchangeBuilder.topicExchange(SIMPLE_EXCHANGE_NAME).build();
}
/**
* 创建 队列
* @return
*/
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable(SIMPLE_QUEUE_NAME).build();
}
/**
* 绑定 交换机与队列
* @param exchange
* @param queue
* @return
*/
@Bean
public Binding itemQueueExchange(@Qualifier("simpleExchange") Exchange exchange, @Qualifier("simplQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
2.创建ConsumerListener.java监听消息文件
Java文件夹右键 -> New[新建] -> Java Class[类文件] -> Name[com.rabbit.cosnumer.listener.ConsumerListener.java] —> Finish[完成]
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class ConsumerListener {
/**
* 监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queuesToDeclare = "simple_queue")
public void myListener(String message){
//不用在手动转UTF-8 Spring自动转好了
log.debug("消费者接收到的消息为:{}", message);
}
}
3.创建ProducerTest.java测试文件
test->java文件夹右键 -> New[新建] -> Java Class[类文件] -> Name[com.rabbit.producer.test.ProducerTest.java] —> Finish[完成]
import lombok.extern.slf4j.Slf4j;
import com.itheima.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class RabbitMQTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void sendTest(){
rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE_NAME,"","测试 SpringBoot整合RabbitMq的普通模式");
}
}
4.测试
首先运行一次ProducerTest.java测试类 创建交换机和队列
然后在启动消费者监听器
三、Topic 通配符模式
1.创建RabbitMqConfig.java文件 绑定交换机和队列
Java文件夹右键 -> New[新建] -> Java Class[类文件] -> Name[com.rabbit.producer.config.RabbitMqConfig.java] —> Finish[完成]
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq 配置类
*/
@Configuration
public class RabbitMqConfig {
private static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
private static final String TOPIC_QUEUE_NAME = "topic_queue";
/**
* 创建 交换机
* @return
*/
@Bean
public Exchange itemTopicExchange(){
return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).build();
}
/**
* 创建 队列
* @return
*/
@Bean
public Queue itemQueue(){
return QueueBuilder.durable(TOPIC_QUEUE_NAME).build();
}
/**
* 绑定 交换机与队列
* @param exchange
* @param queue
* @return
*/
@Bean
public Binding itemQueueExchange(@Qualifier("itemTopicExchange") Exchange exchange, @Qualifier("itemQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
2.创建ConsumerListener.java监听消息文件
Java文件夹右键 -> New[新建] -> Java Class[类文件] -> Name[com.rabbit.cosnumer.listener.ConsumerListener01.java] —> Finish[完成]
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class ConsumerListener {
/**
* 监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queuesToDeclare = "topic_queue")
public void myListener(String message){
//不用在手动转UTF-8 Spring自动转好了
log.debug("消费者接收到的消息为:{}", message);
}
}
3.创建ProducerTest.java测试文件
test->java文件夹右键 -> New[新建] -> Java Class[类文件] -> Name[com.rabbit.producer.test.ProducerTest.java] —> Finish[完成]
import lombok.extern.slf4j.Slf4j;
import com.itheima.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class RabbitMQTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void sendTest(){
rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE_NAME,"item.insert","topic通配符模式,RoutingKey:item.insert");
rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE_NAME,"item.delete.yes","topic通配符模式,RoutingKey:item.delete.yes");
rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE_NAME,"null.null","topic通配符模式,RoutingKey:null.null");
}
}
4.测试
首先运行一次ProducerTest.java测试类 创建交换机和队列
然后在启动消费者监听器