SpringBoot中使用消息队列RabbitMQ


小提示:

RabbitMQ由Erlang语言开发,所以安装RabbitMQ之前要先安装Erlang运行环境


 

服务都安装好后就可以进行如下的使用了

如果安装了web管理插件的话可输入:http://localhost:15672/#/users

用户名和密码都是:guest

 

java中使用示例

pom.xml

1 <!-- RabbitMQ -->
2 <dependency>
3     <groupId>org.springframework.boot</groupId>
4     <artifactId>spring-boot-starter-amqp</artifactId>
5 </dependency>

application.yml

1 spring:
2   application:
3     name: test‐rabbitmq‐producer
4   rabbitmq:
5     host: 127.0.0.1
6     port: 5672
7     username: guest
8     password: guest
9     virtualHost: /

 

配置类(RabbitmqConfig)

 1 import org.springframework.amqp.core.Binding;
 2 import org.springframework.amqp.core.BindingBuilder;
 3 import org.springframework.amqp.core.Exchange;
 4 import org.springframework.amqp.core.ExchangeBuilder;
 5 import org.springframework.amqp.core.Queue;
 6 import org.springframework.beans.factory.annotation.Qualifier;
 7 import org.springframework.context.annotation.Bean;
 8 import org.springframework.context.annotation.Configuration;
 9 
10 /**
11  * Rabbitmq配置类
12  */
13 @Configuration
14 public class RabbitmqConfig {
15     public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
16     public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
17     public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
18     public static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
19     public static final String ROUTINGKEY_SMS = "inform.#.sms.#";
20 
21     /**
22      *  交换机配置
23      *  ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
24      *  @return the exchange
25      */
26     @Bean(EXCHANGE_TOPICS_INFORM)
27     public Exchange EXCHANGE_TOPICS_INFORM() {
28         //durable(true)持久化,消息队列重启后交换机仍然存在
29         return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
30     }
31 
32     /**
33      * 声明队列:SMS
34      * @return
35      */
36     @Bean(QUEUE_INFORM_SMS)
37     public Queue QUEUE_INFORM_SMS() {
38         Queue queue = new Queue(QUEUE_INFORM_SMS);
39         return queue;
40     }
41 
42     /**
43      * 声明队列:EMAIL
44      * @return
45      */
46     @Bean(QUEUE_INFORM_EMAIL)
47     public Queue QUEUE_INFORM_EMAIL() {
48         Queue queue = new Queue(QUEUE_INFORM_EMAIL);
49         return queue;
50     }
51 
52     /**  channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
53      *  绑定队列到交换机
54      *  @param queue the queue
55      *  @param exchange the exchange
56      *  @return the binding
57      */
58     @Bean
59     public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
60                                               @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
61         return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
62     }
63 
64     @Bean
65     public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
66                                             @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
67         return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
68     }
69 }

 

发送消息

1 @Autowired
2 RabbitTemplate rabbitTemplate;//先在要发消息的类中注入rabbitTemplate
3 
4 //在要发消息的地方直接写上以下代码
5 //要发送实体类的话可先转为json
6 String message = "sms and email inform";
7 //routingKey为inform.sms.email可同时发到sms和email中,inform.sms就只发到sms中
8 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.sms.email", message);
9 System.out.println("Send Message is:" + message);

 

监听消息

可以像下面这样建一个类让spring管理,或者不建类,但写这个注解的类要被spring管理到

 1 import com.rabbitmq.client.Channel;
 2 import org.springframework.amqp.core.Message;
 3 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 4 import org.springframework.stereotype.Component;
 5 
 6 @Component
 7 public class TestMq {
 8 
 9     //监听email队列
10     @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
11     public void receive_email(String msg, Message message, Channel channel) {
12         System.out.println("email:" + msg);
13     }
14 
15     //监听sms队列
16     @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
17     public void receive_sms(String msg, Message message, Channel channel) {
18         System.out.println("sms:" + msg);
19     }
20 }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM