小提示:
RabbitMQ由Erlang語言開發,所以安裝RabbitMQ之前要先安裝Erlang運行環境。
服務都安裝好后就可以進行如下的使用了
如果安裝了web管理插件的話可輸入:http://localhost:15672/#/users
用戶名和密碼都是:guest
java中使用示例
pom.xml
1 <!-- RabbitMQ --> 2 <dependency> 3 <groupId>org.springframework.boot</groupId> 4 <artifactId>spring-boot-starter-amqp</artifactId> 5 </dependency>
application.yml
1 spring: 2 application: 3 name: test‐rabbitmq‐producer 4 rabbitmq: 5 host: 127.0.0.1 6 port: 5672 7 username: guest 8 password: guest 9 virtualHost: /
配置類(RabbitmqConfig)
1 import org.springframework.amqp.core.Binding; 2 import org.springframework.amqp.core.BindingBuilder; 3 import org.springframework.amqp.core.Exchange; 4 import org.springframework.amqp.core.ExchangeBuilder; 5 import org.springframework.amqp.core.Queue; 6 import org.springframework.beans.factory.annotation.Qualifier; 7 import org.springframework.context.annotation.Bean; 8 import org.springframework.context.annotation.Configuration; 9 10 /** 11 * Rabbitmq配置類 12 */ 13 @Configuration 14 public class RabbitmqConfig { 15 public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; 16 public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; 17 public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; 18 public static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; 19 public static final String ROUTINGKEY_SMS = "inform.#.sms.#"; 20 21 /** 22 * 交換機配置 23 * ExchangeBuilder提供了fanout、direct、topic、header交換機類型的配置 24 * @return the exchange 25 */ 26 @Bean(EXCHANGE_TOPICS_INFORM) 27 public Exchange EXCHANGE_TOPICS_INFORM() { 28 //durable(true)持久化,消息隊列重啟后交換機仍然存在 29 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); 30 } 31 32 /** 33 * 聲明隊列:SMS 34 * @return 35 */ 36 @Bean(QUEUE_INFORM_SMS) 37 public Queue QUEUE_INFORM_SMS() { 38 Queue queue = new Queue(QUEUE_INFORM_SMS); 39 return queue; 40 } 41 42 /** 43 * 聲明隊列:EMAIL 44 * @return 45 */ 46 @Bean(QUEUE_INFORM_EMAIL) 47 public Queue QUEUE_INFORM_EMAIL() { 48 Queue queue = new Queue(QUEUE_INFORM_EMAIL); 49 return queue; 50 } 51 52 /** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#"); 53 * 綁定隊列到交換機 54 * @param queue the queue 55 * @param exchange the exchange 56 * @return the binding 57 */ 58 @Bean 59 public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, 60 @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { 61 return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); 62 } 63 64 @Bean 65 public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, 66 @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { 67 return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); 68 } 69 }
發送消息
1 @Autowired 2 RabbitTemplate rabbitTemplate;//先在要發消息的類中注入rabbitTemplate 3 4 //在要發消息的地方直接寫上以下代碼 5 //要發送實體類的話可先轉為json 6 String message = "sms and email inform"; 7 //routingKey為inform.sms.email可同時發到sms和email中,inform.sms就只發到sms中 8 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.sms.email", message); 9 System.out.println("Send Message is:" + message);
監聽消息
可以像下面這樣建一個類讓spring管理,或者不建類,但寫這個注解的類要被spring管理到
1 import com.rabbitmq.client.Channel; 2 import org.springframework.amqp.core.Message; 3 import org.springframework.amqp.rabbit.annotation.RabbitListener; 4 import org.springframework.stereotype.Component; 5 6 @Component 7 public class TestMq { 8 9 //監聽email隊列 10 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) 11 public void receive_email(String msg, Message message, Channel channel) { 12 System.out.println("email:" + msg); 13 } 14 15 //監聽sms隊列 16 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS}) 17 public void receive_sms(String msg, Message message, Channel channel) { 18 System.out.println("sms:" + msg); 19 } 20 }