rabbitmq消息隊列,官網有六種,實戰常用的也就如下五種。
下面開始demo講解
大致三步:1.配置消息隊列,2.生產者提供消息給隊列,3.消費者監聽消費隊列消息
源碼下載:https://pan.baidu.com/s/119Hf0YFrWiQK9m4hwVrKPQ
1.配置消息隊列
package com.qy.mq.provider;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 七脈
* 描述:這里主要講解五種隊列消息
* 1.一對一普通隊列(hello world)
* 2.一對多工作隊列
* 3.fanout廣播隊列(發布訂閱)
* 4.direct定向隊列(routing-key)
* 5.topic通配符隊列(*、#)
*/
@Configuration
public class RabbitmqConfig {
/**hello world普通隊列**/
public static final String HELLO_WORLD_QUEUE = "hello_world_queue";
/**work工作隊列**/
public static final String WORK_QUEUE = "work_queue";
/**fanout使用隊列 one**/
public static final String FANOUT_QUEUE_ONE = "fanout_queue_one";
/**fanout使用隊列 two**/
public static final String FANOUT_QUEUE_TWO = "fanout_queue_two";
/**direct routing使用隊列ONE**/
public static final String DIRECT_QUEUE_ONE = "direct_queue_one";
/**direct routing使用隊列TWO**/
public static final String DIRECT_QUEUE_TWO = "direct_queue_two";
/**topic使用隊列**/
public static final String TOPIC_QUEUE_ONE = "topic_queue_one";
/**topic使用隊列**/
public static final String TOPIC_QUEUE_TWO = "topic_queue_TWO";
/**fanout交換機**/
public static final String FANOUT_EXCHANGE = "fanout_exchange";
/**direct routing交換機**/
public static final String DIRECT_EXCHANGE = "direct_exchange";
/**topic交換機**/
public static final String TOPIC_EXCHANGE = "topic_exchange";
/**定義routing-key提供給direct交換機使用**/
public static final String ROUTING_KEY = "my_routing_key";
/**定義topic通配符提供給topic交換機使用**/
public static final String TOPICS_ONE = "my_topic.*";//*表示匹配任何一個單詞
/**定義topic通配符提供給topic交換機使用**/
public static final String TOPICS_MORE = "my_topic.#";//#表示匹配任何多個單詞
/**
* @author 七脈
* 描述:hello world普通隊列,不需要綁定交換機
* 官方文檔里,點對點, 一個生產者、一個隊列、一個消費者。
* @return
*/
@Bean
public Queue helloWorldQueue(){
return new Queue(HELLO_WORLD_QUEUE, true, false, false);
//return new Queue(HELLO_WORLD_QUEUE, true);
}
/**
* @author 七脈
* 描述:work工作隊列,不需要綁定交換機
* 官方文檔里, 一個生產者、一個隊列、多個消費者。
* 多個消費者時,會均分接收消息。
* @return
*/
@Bean
public Queue workQueue(){
return new Queue(WORK_QUEUE, true);
}
/**
* @author 七脈
* 描述:第一個fanout廣播隊列,需要綁定Fanout交換機
* fanout交換機會把消息發送到每一個綁定的隊列
* 官方:發布訂閱
* @return
*/
@Bean
public Queue fanoutQueueOne(){
return new Queue(FANOUT_QUEUE_ONE, true);
}
/**
* @author 七脈
* 描述:第二個fanout廣播隊列,需要綁定Fanout交換機
* fanout交換機會把消息發送到每一個綁定的隊列
* 官方:發布訂閱
* @return
*/
@Bean
public Queue fanoutQueueTwo(){
return new Queue(FANOUT_QUEUE_TWO, true);
}
/**
* @author 七脈
* 描述:第一個direct定向隊列,需要綁定Direct交換機,並指定routing-key
* direct交換機會把消息發送到每一個綁定且指定相同routing-key的隊列,
* 官方:Routing
* @return
*/
@Bean
public Queue directQueueOne(){
return new Queue(DIRECT_QUEUE_ONE, true);
}
/**
* @author 七脈
* 描述:第二個direct定向隊列,需要綁定Direct交換機,並指定routing-key
* direct交換機會把消息發送到每一個綁定且指定相同routing-key的隊列,
* 官方:Routing
* @return
*/
@Bean
public Queue directQueueTwo(){
return new Queue(DIRECT_QUEUE_TWO, true);
}
/**
* @author 七脈
* 描述:第一個topic通配符匹配隊列,需要綁定Topic交換機
* topic隊列使用*、#的通配符進行匹配topic交換機的消息
* 官方:Topics
* @return
*/
@Bean
public Queue topicQueueOne(){
return new Queue(TOPIC_QUEUE_ONE, true);
}
/**
* @author 七脈
* 描述:第二個topic通配符匹配隊列,需要綁定Topic交換機
* topic隊列使用*、#的通配符進行匹配topic交換機的消息
* 官方:Topics
* @return
*/
@Bean
public Queue topicQueueTwo(){
return new Queue(TOPIC_QUEUE_TWO, true);
}
/**
* @author 七脈
* 描述:定義 FanoutExchange 交換機
* FanoutExchange交換機,將消息發送到每一個綁定的消息隊列中
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE, true, true);
}
/**
* @author 七脈
* 描述:定義 DirectExchange 交換機
* DirectExchange交換機,將消息發送到每一個綁定且對應routing-key的隊列中
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE, true, true);
}
/**
* @author 七脈
* 描述:定義 TopicExchange 交換機
* TopicExchange交換機,將消息發送到每一個綁定且匹配topic通配符的隊列中
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE, true, true);
}
/**
* @author 七脈
* 描述:將第一個FanoutQueueOne隊列綁定到FanoutExchange交換機
* @param fanoutQueueOne
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingFanoutQueueOne(Queue fanoutQueueOne, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueOne).to(fanoutExchange);
}
/**
* @author 七脈
* 描述:將第二個FanoutQueueTwo隊列綁定到FanoutExchange交換機
* @param fanoutQueueOne
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingFanoutQueueTwo(Queue fanoutQueueTwo, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueTwo).to(fanoutExchange);
}
/**
* @author 七脈
* 描述:將第一個DirectQueueOne隊列綁定到DirectExchange交換機
* @param directQueueOne
* @param directExchange
* @return
*/
@Bean
public Binding bindingDirectQueueOne(Queue directQueueOne, DirectExchange directExchange){
return BindingBuilder.bind(directQueueOne).to(directExchange).with(ROUTING_KEY);
}
/**
* @author 七脈
* 描述:將第二個DirectQueueTwo隊列綁定到DirectExchange交換機
* @param directQueueOne
* @param directExchange
* @return
*/
@Bean
public Binding bindingDirectQueueTwo(Queue directQueueTwo, DirectExchange directExchange){
return BindingBuilder.bind(directQueueTwo).to(directExchange).with(ROUTING_KEY);
}
/**
* @author 七脈
* 描述:將第一個TopicQueueOne隊列綁定到TopicExchange交換機
* @param topicQueueOne
* @param topicExchange
* @return
*/
@Bean
public Binding bindingTopicQueueOne(Queue topicQueueOne, TopicExchange topicExchange){
return BindingBuilder.bind(topicQueueOne).to(topicExchange).with(TOPICS_ONE);
}
/**
* @author 七脈
* 描述:將第二個TopicQueueOne隊列綁定到TopicExchange交換機
* @param topicQueueOne
* @param topicExchange
* @return
*/
@Bean
public Binding bindingTopicQueueTwo(Queue topicQueueTwo, TopicExchange topicExchange){
return BindingBuilder.bind(topicQueueTwo).to(topicExchange).with(TOPICS_MORE);
}
}
2.生產者提供消息給隊列
package com.qy.mq.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author 七脈
* 描述:MQ消息發送類
*/
@Service
public class ProviderService {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* @author 七脈
* 描述:發送普通隊列消息 HelloWorld
* @param msg
*/
public void sendMsgForHelloWorldQueue(String msg){
System.out.println("普通隊列HelloWorld-生產者發送:"+msg);
amqpTemplate.convertAndSend(RabbitmqConfig.HELLO_WORLD_QUEUE, msg);
}
/**
* @author 七脈
* 描述:發送工作隊列消息
* @param msg
*/
public void sendMsgForWorkQueue(String msg){
System.out.println("工作隊列-生產者發送:"+msg);
amqpTemplate.convertAndSend(RabbitmqConfig.WORK_QUEUE, msg);
}
/**
* @author 七脈
* 描述:發送到FanoutExchange交換機,交換機會發送到綁定的隊列
* @param msg
*/
public void sendMsgForFanoutExchange(String msg){
System.out.println("FanoutExchange交換機-生產者發送:"+msg);
amqpTemplate.convertAndSend(RabbitmqConfig.FANOUT_EXCHANGE, null, msg);
}
/**
* @author 七脈
* 描述:發送消息到DirectExchange交換機,交換機會發送到綁定且指定routing-key的隊列
* @param msg
*/
public void sendMsgForDirectExchange(String msg){
System.out.println("DirectExchange交換機-生產者發送:"+msg);
amqpTemplate.convertAndSend(RabbitmqConfig.DIRECT_EXCHANGE, RabbitmqConfig.ROUTING_KEY, msg);
}
/**
* @author 七脈
* 描述:發送消息到TopicExchange交換機,交換機會發送到綁定且匹配通配符的隊列
* @param msg
*/
public void sendMsgForTopicExchange(String msg, String wildcard){
System.out.println("TopicExchange交換機-生產者發送:"+msg);
amqpTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANGE, wildcard, msg);
}
}
3.消費者監聽消費隊列消息
package com.qy.mq.consumer;
import java.io.IOException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
/**
* @author 七脈
* 描述:普通隊列helloworld,一個生產者、一個隊列、一個消費者
*/
@Service
public class ConsumerHelloWorld {
@Value("${server.port}")
private String port;
@RabbitListener(queues=RabbitmqConfig.HELLO_WORLD_QUEUE)
public void recive(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
System.out.println(msg);
channel.basicAck(deliveryTag, false);//應答
//channel.basicNack(deliveryTag, false, true);//不應答
//channel.basicReject(deliveryTag, true);//拒絕應答
}
}
