SpringBoot中使用消息隊列RabbitMQ


小提示:

RabbitMQ由Erlang語言開發,所以安裝RabbitMQ之前要先安裝Erlang運行環境


 

服務都安裝好后就可以進行如下的使用了

如果安裝了web管理插件的話可輸入:http://localhost:15672/#/users

用戶名和密碼都是:guest

 

java中使用示例

pom.xml

1 <!-- RabbitMQ -->
2 <dependency>
3     <groupId>org.springframework.boot</groupId>
4     <artifactId>spring-boot-starter-amqp</artifactId>
5 </dependency>

application.yml

1 spring:
2   application:
3     name: test‐rabbitmq‐producer
4   rabbitmq:
5     host: 127.0.0.1
6     port: 5672
7     username: guest
8     password: guest
9     virtualHost: /

 

配置類(RabbitmqConfig)

 1 import org.springframework.amqp.core.Binding;
 2 import org.springframework.amqp.core.BindingBuilder;
 3 import org.springframework.amqp.core.Exchange;
 4 import org.springframework.amqp.core.ExchangeBuilder;
 5 import org.springframework.amqp.core.Queue;
 6 import org.springframework.beans.factory.annotation.Qualifier;
 7 import org.springframework.context.annotation.Bean;
 8 import org.springframework.context.annotation.Configuration;
 9 
10 /**
11  * Rabbitmq配置類
12  */
13 @Configuration
14 public class RabbitmqConfig {
15     public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
16     public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
17     public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
18     public static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
19     public static final String ROUTINGKEY_SMS = "inform.#.sms.#";
20 
21     /**
22      *  交換機配置
23      *  ExchangeBuilder提供了fanout、direct、topic、header交換機類型的配置
24      *  @return the exchange
25      */
26     @Bean(EXCHANGE_TOPICS_INFORM)
27     public Exchange EXCHANGE_TOPICS_INFORM() {
28         //durable(true)持久化,消息隊列重啟后交換機仍然存在
29         return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
30     }
31 
32     /**
33      * 聲明隊列:SMS
34      * @return
35      */
36     @Bean(QUEUE_INFORM_SMS)
37     public Queue QUEUE_INFORM_SMS() {
38         Queue queue = new Queue(QUEUE_INFORM_SMS);
39         return queue;
40     }
41 
42     /**
43      * 聲明隊列:EMAIL
44      * @return
45      */
46     @Bean(QUEUE_INFORM_EMAIL)
47     public Queue QUEUE_INFORM_EMAIL() {
48         Queue queue = new Queue(QUEUE_INFORM_EMAIL);
49         return queue;
50     }
51 
52     /**  channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
53      *  綁定隊列到交換機
54      *  @param queue the queue
55      *  @param exchange the exchange
56      *  @return the binding
57      */
58     @Bean
59     public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
60                                               @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
61         return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
62     }
63 
64     @Bean
65     public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
66                                             @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
67         return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
68     }
69 }

 

發送消息

1 @Autowired
2 RabbitTemplate rabbitTemplate;//先在要發消息的類中注入rabbitTemplate
3 
4 //在要發消息的地方直接寫上以下代碼
5 //要發送實體類的話可先轉為json
6 String message = "sms and email inform";
7 //routingKey為inform.sms.email可同時發到sms和email中,inform.sms就只發到sms中
8 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.sms.email", message);
9 System.out.println("Send Message is:" + message);

 

監聽消息

可以像下面這樣建一個類讓spring管理,或者不建類,但寫這個注解的類要被spring管理到

 1 import com.rabbitmq.client.Channel;
 2 import org.springframework.amqp.core.Message;
 3 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 4 import org.springframework.stereotype.Component;
 5 
 6 @Component
 7 public class TestMq {
 8 
 9     //監聽email隊列
10     @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
11     public void receive_email(String msg, Message message, Channel channel) {
12         System.out.println("email:" + msg);
13     }
14 
15     //監聽sms隊列
16     @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
17     public void receive_sms(String msg, Message message, Channel channel) {
18         System.out.println("sms:" + msg);
19     }
20 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM