小提示:
RabbitMQ由Erlang语言开发,所以安装RabbitMQ之前要先安装Erlang运行环境。
服务都安装好后就可以进行如下的使用了
如果安装了web管理插件的话可输入:http://localhost:15672/#/users
用户名和密码都是:guest
java中使用示例
pom.xml
1 <!-- RabbitMQ --> 2 <dependency> 3 <groupId>org.springframework.boot</groupId> 4 <artifactId>spring-boot-starter-amqp</artifactId> 5 </dependency>
application.yml
1 spring: 2 application: 3 name: test‐rabbitmq‐producer 4 rabbitmq: 5 host: 127.0.0.1 6 port: 5672 7 username: guest 8 password: guest 9 virtualHost: /
配置类(RabbitmqConfig)
1 import org.springframework.amqp.core.Binding; 2 import org.springframework.amqp.core.BindingBuilder; 3 import org.springframework.amqp.core.Exchange; 4 import org.springframework.amqp.core.ExchangeBuilder; 5 import org.springframework.amqp.core.Queue; 6 import org.springframework.beans.factory.annotation.Qualifier; 7 import org.springframework.context.annotation.Bean; 8 import org.springframework.context.annotation.Configuration; 9 10 /** 11 * Rabbitmq配置类 12 */ 13 @Configuration 14 public class RabbitmqConfig { 15 public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; 16 public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; 17 public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; 18 public static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; 19 public static final String ROUTINGKEY_SMS = "inform.#.sms.#"; 20 21 /** 22 * 交换机配置 23 * ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置 24 * @return the exchange 25 */ 26 @Bean(EXCHANGE_TOPICS_INFORM) 27 public Exchange EXCHANGE_TOPICS_INFORM() { 28 //durable(true)持久化,消息队列重启后交换机仍然存在 29 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); 30 } 31 32 /** 33 * 声明队列:SMS 34 * @return 35 */ 36 @Bean(QUEUE_INFORM_SMS) 37 public Queue QUEUE_INFORM_SMS() { 38 Queue queue = new Queue(QUEUE_INFORM_SMS); 39 return queue; 40 } 41 42 /** 43 * 声明队列:EMAIL 44 * @return 45 */ 46 @Bean(QUEUE_INFORM_EMAIL) 47 public Queue QUEUE_INFORM_EMAIL() { 48 Queue queue = new Queue(QUEUE_INFORM_EMAIL); 49 return queue; 50 } 51 52 /** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#"); 53 * 绑定队列到交换机 54 * @param queue the queue 55 * @param exchange the exchange 56 * @return the binding 57 */ 58 @Bean 59 public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, 60 @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { 61 return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); 62 } 63 64 @Bean 65 public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, 66 @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { 67 return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); 68 } 69 }
发送消息
1 @Autowired 2 RabbitTemplate rabbitTemplate;//先在要发消息的类中注入rabbitTemplate 3 4 //在要发消息的地方直接写上以下代码 5 //要发送实体类的话可先转为json 6 String message = "sms and email inform"; 7 //routingKey为inform.sms.email可同时发到sms和email中,inform.sms就只发到sms中 8 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.sms.email", message); 9 System.out.println("Send Message is:" + message);
监听消息
可以像下面这样建一个类让spring管理,或者不建类,但写这个注解的类要被spring管理到
1 import com.rabbitmq.client.Channel; 2 import org.springframework.amqp.core.Message; 3 import org.springframework.amqp.rabbit.annotation.RabbitListener; 4 import org.springframework.stereotype.Component; 5 6 @Component 7 public class TestMq { 8 9 //监听email队列 10 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) 11 public void receive_email(String msg, Message message, Channel channel) { 12 System.out.println("email:" + msg); 13 } 14 15 //监听sms队列 16 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS}) 17 public void receive_sms(String msg, Message message, Channel channel) { 18 System.out.println("sms:" + msg); 19 } 20 }