rabbitmq消息隊列,官網有六種,實戰常用的也就如下五種。
下面開始demo講解
大致三步:1.配置消息隊列,2.生產者提供消息給隊列,3.消費者監聽消費隊列消息
源碼下載:https://pan.baidu.com/s/119Hf0YFrWiQK9m4hwVrKPQ
1.配置消息隊列
package com.qy.mq.provider; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 七脈 * 描述:這里主要講解五種隊列消息 * 1.一對一普通隊列(hello world) * 2.一對多工作隊列 * 3.fanout廣播隊列(發布訂閱) * 4.direct定向隊列(routing-key) * 5.topic通配符隊列(*、#) */ @Configuration public class RabbitmqConfig { /**hello world普通隊列**/ public static final String HELLO_WORLD_QUEUE = "hello_world_queue"; /**work工作隊列**/ public static final String WORK_QUEUE = "work_queue"; /**fanout使用隊列 one**/ public static final String FANOUT_QUEUE_ONE = "fanout_queue_one"; /**fanout使用隊列 two**/ public static final String FANOUT_QUEUE_TWO = "fanout_queue_two"; /**direct routing使用隊列ONE**/ public static final String DIRECT_QUEUE_ONE = "direct_queue_one"; /**direct routing使用隊列TWO**/ public static final String DIRECT_QUEUE_TWO = "direct_queue_two"; /**topic使用隊列**/ public static final String TOPIC_QUEUE_ONE = "topic_queue_one"; /**topic使用隊列**/ public static final String TOPIC_QUEUE_TWO = "topic_queue_TWO"; /**fanout交換機**/ public static final String FANOUT_EXCHANGE = "fanout_exchange"; /**direct routing交換機**/ public static final String DIRECT_EXCHANGE = "direct_exchange"; /**topic交換機**/ public static final String TOPIC_EXCHANGE = "topic_exchange"; /**定義routing-key提供給direct交換機使用**/ public static final String ROUTING_KEY = "my_routing_key"; /**定義topic通配符提供給topic交換機使用**/ public static final String TOPICS_ONE = "my_topic.*";//*表示匹配任何一個單詞 /**定義topic通配符提供給topic交換機使用**/ public static final String TOPICS_MORE = "my_topic.#";//#表示匹配任何多個單詞 /** * @author 七脈 * 描述:hello world普通隊列,不需要綁定交換機 * 官方文檔里,點對點, 一個生產者、一個隊列、一個消費者。 * @return */ @Bean public Queue helloWorldQueue(){ return new Queue(HELLO_WORLD_QUEUE, true, false, false); //return new Queue(HELLO_WORLD_QUEUE, true); } /** * @author 七脈 * 描述:work工作隊列,不需要綁定交換機 * 官方文檔里, 一個生產者、一個隊列、多個消費者。 * 多個消費者時,會均分接收消息。 * @return */ @Bean public Queue workQueue(){ return new Queue(WORK_QUEUE, true); } /** * @author 七脈 * 描述:第一個fanout廣播隊列,需要綁定Fanout交換機 * fanout交換機會把消息發送到每一個綁定的隊列 * 官方:發布訂閱 * @return */ @Bean public Queue fanoutQueueOne(){ return new Queue(FANOUT_QUEUE_ONE, true); } /** * @author 七脈 * 描述:第二個fanout廣播隊列,需要綁定Fanout交換機 * fanout交換機會把消息發送到每一個綁定的隊列 * 官方:發布訂閱 * @return */ @Bean public Queue fanoutQueueTwo(){ return new Queue(FANOUT_QUEUE_TWO, true); } /** * @author 七脈 * 描述:第一個direct定向隊列,需要綁定Direct交換機,並指定routing-key * direct交換機會把消息發送到每一個綁定且指定相同routing-key的隊列, * 官方:Routing * @return */ @Bean public Queue directQueueOne(){ return new Queue(DIRECT_QUEUE_ONE, true); } /** * @author 七脈 * 描述:第二個direct定向隊列,需要綁定Direct交換機,並指定routing-key * direct交換機會把消息發送到每一個綁定且指定相同routing-key的隊列, * 官方:Routing * @return */ @Bean public Queue directQueueTwo(){ return new Queue(DIRECT_QUEUE_TWO, true); } /** * @author 七脈 * 描述:第一個topic通配符匹配隊列,需要綁定Topic交換機 * topic隊列使用*、#的通配符進行匹配topic交換機的消息 * 官方:Topics * @return */ @Bean public Queue topicQueueOne(){ return new Queue(TOPIC_QUEUE_ONE, true); } /** * @author 七脈 * 描述:第二個topic通配符匹配隊列,需要綁定Topic交換機 * topic隊列使用*、#的通配符進行匹配topic交換機的消息 * 官方:Topics * @return */ @Bean public Queue topicQueueTwo(){ return new Queue(TOPIC_QUEUE_TWO, true); } /** * @author 七脈 * 描述:定義 FanoutExchange 交換機 * FanoutExchange交換機,將消息發送到每一個綁定的消息隊列中 * @return */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(FANOUT_EXCHANGE, true, true); } /** * @author 七脈 * 描述:定義 DirectExchange 交換機 * DirectExchange交換機,將消息發送到每一個綁定且對應routing-key的隊列中 * @return */ @Bean public DirectExchange directExchange(){ return new DirectExchange(DIRECT_EXCHANGE, true, true); } /** * @author 七脈 * 描述:定義 TopicExchange 交換機 * TopicExchange交換機,將消息發送到每一個綁定且匹配topic通配符的隊列中 * @return */ @Bean public TopicExchange topicExchange(){ return new TopicExchange(TOPIC_EXCHANGE, true, true); } /** * @author 七脈 * 描述:將第一個FanoutQueueOne隊列綁定到FanoutExchange交換機 * @param fanoutQueueOne * @param fanoutExchange * @return */ @Bean public Binding bindingFanoutQueueOne(Queue fanoutQueueOne, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueueOne).to(fanoutExchange); } /** * @author 七脈 * 描述:將第二個FanoutQueueTwo隊列綁定到FanoutExchange交換機 * @param fanoutQueueOne * @param fanoutExchange * @return */ @Bean public Binding bindingFanoutQueueTwo(Queue fanoutQueueTwo, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueueTwo).to(fanoutExchange); } /** * @author 七脈 * 描述:將第一個DirectQueueOne隊列綁定到DirectExchange交換機 * @param directQueueOne * @param directExchange * @return */ @Bean public Binding bindingDirectQueueOne(Queue directQueueOne, DirectExchange directExchange){ return BindingBuilder.bind(directQueueOne).to(directExchange).with(ROUTING_KEY); } /** * @author 七脈 * 描述:將第二個DirectQueueTwo隊列綁定到DirectExchange交換機 * @param directQueueOne * @param directExchange * @return */ @Bean public Binding bindingDirectQueueTwo(Queue directQueueTwo, DirectExchange directExchange){ return BindingBuilder.bind(directQueueTwo).to(directExchange).with(ROUTING_KEY); } /** * @author 七脈 * 描述:將第一個TopicQueueOne隊列綁定到TopicExchange交換機 * @param topicQueueOne * @param topicExchange * @return */ @Bean public Binding bindingTopicQueueOne(Queue topicQueueOne, TopicExchange topicExchange){ return BindingBuilder.bind(topicQueueOne).to(topicExchange).with(TOPICS_ONE); } /** * @author 七脈 * 描述:將第二個TopicQueueOne隊列綁定到TopicExchange交換機 * @param topicQueueOne * @param topicExchange * @return */ @Bean public Binding bindingTopicQueueTwo(Queue topicQueueTwo, TopicExchange topicExchange){ return BindingBuilder.bind(topicQueueTwo).to(topicExchange).with(TOPICS_MORE); } }
2.生產者提供消息給隊列
package com.qy.mq.provider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author 七脈 * 描述:MQ消息發送類 */ @Service public class ProviderService { @Autowired private AmqpTemplate amqpTemplate; /** * @author 七脈 * 描述:發送普通隊列消息 HelloWorld * @param msg */ public void sendMsgForHelloWorldQueue(String msg){ System.out.println("普通隊列HelloWorld-生產者發送:"+msg); amqpTemplate.convertAndSend(RabbitmqConfig.HELLO_WORLD_QUEUE, msg); } /** * @author 七脈 * 描述:發送工作隊列消息 * @param msg */ public void sendMsgForWorkQueue(String msg){ System.out.println("工作隊列-生產者發送:"+msg); amqpTemplate.convertAndSend(RabbitmqConfig.WORK_QUEUE, msg); } /** * @author 七脈 * 描述:發送到FanoutExchange交換機,交換機會發送到綁定的隊列 * @param msg */ public void sendMsgForFanoutExchange(String msg){ System.out.println("FanoutExchange交換機-生產者發送:"+msg); amqpTemplate.convertAndSend(RabbitmqConfig.FANOUT_EXCHANGE, null, msg); } /** * @author 七脈 * 描述:發送消息到DirectExchange交換機,交換機會發送到綁定且指定routing-key的隊列 * @param msg */ public void sendMsgForDirectExchange(String msg){ System.out.println("DirectExchange交換機-生產者發送:"+msg); amqpTemplate.convertAndSend(RabbitmqConfig.DIRECT_EXCHANGE, RabbitmqConfig.ROUTING_KEY, msg); } /** * @author 七脈 * 描述:發送消息到TopicExchange交換機,交換機會發送到綁定且匹配通配符的隊列 * @param msg */ public void sendMsgForTopicExchange(String msg, String wildcard){ System.out.println("TopicExchange交換機-生產者發送:"+msg); amqpTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANGE, wildcard, msg); } }
3.消費者監聽消費隊列消息
package com.qy.mq.consumer; import java.io.IOException; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; import com.rabbitmq.client.Channel; /** * @author 七脈 * 描述:普通隊列helloworld,一個生產者、一個隊列、一個消費者 */ @Service public class ConsumerHelloWorld { @Value("${server.port}") private String port; @RabbitListener(queues=RabbitmqConfig.HELLO_WORLD_QUEUE) public void recive(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { System.out.println(msg); channel.basicAck(deliveryTag, false);//應答 //channel.basicNack(deliveryTag, false, true);//不應答 //channel.basicReject(deliveryTag, true);//拒絕應答 } }