RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
一.下载并安装
安装RabbitMQ之前,要先去下载安装Erlang。
下载RabbitMQ并安装,安装过程直接默认下一步。安装好后创建用户绑定。
使用浏览器打开 http://localhost:15672 访问Rabbit Mq的管理控制台,使用刚才创建的账号登陆系统
二.Spring Boot 集成 RabbitMQ
1.配置pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.application.properties配置文件
spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=xpc
spring.rabbitmq.password=123456
3.配置队列
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}
4.生产者
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
5.消费者
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
6.测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class HelloTest {
@Autowired
private HelloSender helloSender;
@Test
public void hello() throws Exception {
helloSender.send();
}
}
准备见证奇迹,然而,这时候出现了问题报错如下:
2019-03-20 21:08:44.976 INFO 12948 --- [ main] com.neo.rabbitmq.HelloTest : Starting HelloTest on DESKTOP-6P3OGVF with PID 12948 (started by 79235 in D:\workspace\spring-boot-rabbitmq)
2019-03-20 21:08:44.977 INFO 12948 --- [ main] com.neo.rabbitmq.HelloTest : No active profile set, falling back to default profiles: default
2019-03-20 21:08:45.306 INFO 12948 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$a50a910] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-03-20 21:08:45.781 INFO 12948 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-03-20 21:08:45.816 WARN 12948 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-03-20 21:08:45.816 INFO 12948 --- [ main] o.s.a.r.l.SimpleMessageListenerContainer : Broker not available; cannot force queue declarations during start
2019-03-20 21:08:45.820 INFO 12948 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-03-20 21:08:45.827 ERROR 12948 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured
java.net.SocketException: socket closed
at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_131]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_131]
at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_131]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_131]
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_131]
at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_131]
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_131]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:596) ~[amqp-client-5.4.3.jar:5.4.3]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
后来发现市因为之前创建的用户没有授权,打开RabbitMQ web管理界面点击admin——》set permission就可以了
再次测试结果
:: Spring Boot :: (v2.1.0.RELEASE)
2019-03-20 21:12:44.699 INFO 11472 --- [ main] com.neo.rabbitmq.HelloTest : Starting HelloTest on DESKTOP-6P3OGVF with PID 11472 (started by 79235 in D:\workspace\spring-boot-rabbitmq)
2019-03-20 21:12:44.700 INFO 11472 --- [ main] com.neo.rabbitmq.HelloTest : No active profile set, falling back to default profiles: default
2019-03-20 21:12:45.024 INFO 11472 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$a50a910] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-03-20 21:12:45.479 INFO 11472 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-03-20 21:12:45.523 INFO 11472 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#24d4d7c9:0/SimpleConnection@302fec27 [delegate=amqp://xpc@127.0.0.1:5672/, localPort= 64890]
2019-03-20 21:12:45.615 INFO 11472 --- [ main] com.neo.rabbitmq.HelloTest : Started HelloTest in 1.062 seconds (JVM running for 1.634)
Sender : hello Wed Mar 20 21:12:45 CST 2019
2019-03-20 21:12:45.788 INFO 11472 --- [ Thread-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
Receiver : hello Wed Mar 20 21:12:45 CST 2019
2019-03-20 21:12:46.600 INFO 11472 --- [ Thread-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.