RabbitMQ的使用Demo


rabbitmq消息隊列,官網有六種,實戰常用的也就如下五種。

下面開始demo講解

大致三步:1.配置消息隊列,2.生產者提供消息給隊列,3.消費者監聽消費隊列消息

源碼下載:https://pan.baidu.com/s/119Hf0YFrWiQK9m4hwVrKPQ

1.配置消息隊列

package com.qy.mq.provider;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 七脈
 * 描述:這里主要講解五種隊列消息
 * 	1.一對一普通隊列(hello world) 
 * 	2.一對多工作隊列
 * 	3.fanout廣播隊列(發布訂閱)
 *  4.direct定向隊列(routing-key)
 *  5.topic通配符隊列(*、#)
 */
@Configuration
public class RabbitmqConfig {
	
	/**hello world普通隊列**/
	public static final String HELLO_WORLD_QUEUE = "hello_world_queue";
	
	/**work工作隊列**/
	public static final String WORK_QUEUE = "work_queue";
	
	/**fanout使用隊列 one**/
	public static final String FANOUT_QUEUE_ONE = "fanout_queue_one";
	
	/**fanout使用隊列 two**/
	public static final String FANOUT_QUEUE_TWO = "fanout_queue_two";
	
	/**direct routing使用隊列ONE**/
	public static final String DIRECT_QUEUE_ONE = "direct_queue_one";
	
	/**direct routing使用隊列TWO**/
	public static final String DIRECT_QUEUE_TWO = "direct_queue_two";

	/**topic使用隊列**/
	public static final String TOPIC_QUEUE_ONE = "topic_queue_one";
	
	/**topic使用隊列**/
	public static final String TOPIC_QUEUE_TWO = "topic_queue_TWO";
	
	/**fanout交換機**/
	public static final String FANOUT_EXCHANGE = "fanout_exchange";
	
	/**direct routing交換機**/
	public static final String DIRECT_EXCHANGE = "direct_exchange";
	
	/**topic交換機**/
	public static final String TOPIC_EXCHANGE = "topic_exchange";
	
	
	/**定義routing-key提供給direct交換機使用**/
	public static final String ROUTING_KEY = "my_routing_key";
	
	/**定義topic通配符提供給topic交換機使用**/
	public static final String TOPICS_ONE = "my_topic.*";//*表示匹配任何一個單詞
	/**定義topic通配符提供給topic交換機使用**/
	public static final String TOPICS_MORE = "my_topic.#";//#表示匹配任何多個單詞
	
	/**
	 * @author 七脈
	 * 描述:hello world普通隊列,不需要綁定交換機
	 *     官方文檔里,點對點, 一個生產者、一個隊列、一個消費者。
	 * @return
	 */
	@Bean
	public Queue helloWorldQueue(){
		return new Queue(HELLO_WORLD_QUEUE, true, false, false);
		//return new Queue(HELLO_WORLD_QUEUE, true);
	}
	
	/**
	 * @author 七脈
	 * 描述:work工作隊列,不需要綁定交換機
	 *     官方文檔里, 一個生產者、一個隊列、多個消費者。
	 *     多個消費者時,會均分接收消息。
	 * @return
	 */
	@Bean
	public Queue workQueue(){
		return new Queue(WORK_QUEUE, true);
	}
	
	/**
	 * @author 七脈
	 * 描述:第一個fanout廣播隊列,需要綁定Fanout交換機
	 * 	   fanout交換機會把消息發送到每一個綁定的隊列
	 * 	         官方:發布訂閱
	 * @return
	 */
	@Bean
	public Queue fanoutQueueOne(){
		return new Queue(FANOUT_QUEUE_ONE, true);
	}
	
	/**
	 * @author 七脈
	 * 描述:第二個fanout廣播隊列,需要綁定Fanout交換機
	 * 	   fanout交換機會把消息發送到每一個綁定的隊列
	 * 	         官方:發布訂閱
	 * @return
	 */
	@Bean
	public Queue fanoutQueueTwo(){
		return new Queue(FANOUT_QUEUE_TWO, true);
	}
	
	/**
	 * @author 七脈
	 * 描述:第一個direct定向隊列,需要綁定Direct交換機,並指定routing-key
	 * 	   direct交換機會把消息發送到每一個綁定且指定相同routing-key的隊列,
	 * 	         官方:Routing
	 * @return
	 */
	@Bean
	public Queue directQueueOne(){
		return new Queue(DIRECT_QUEUE_ONE, true);
	}
	
	/**
	 * @author 七脈
	 * 描述:第二個direct定向隊列,需要綁定Direct交換機,並指定routing-key
	 * 	   direct交換機會把消息發送到每一個綁定且指定相同routing-key的隊列,
	 * 	         官方:Routing
	 * @return
	 */
	@Bean
	public Queue directQueueTwo(){
		return new Queue(DIRECT_QUEUE_TWO, true);
	}
	
	
	/**
	 * @author 七脈
	 * 描述:第一個topic通配符匹配隊列,需要綁定Topic交換機
	 * 	   topic隊列使用*、#的通配符進行匹配topic交換機的消息
	 * 	         官方:Topics
	 * @return
	 */
	@Bean
	public Queue topicQueueOne(){
		return new Queue(TOPIC_QUEUE_ONE, true);
	}
	
	/**
	 * @author 七脈
	 * 描述:第二個topic通配符匹配隊列,需要綁定Topic交換機
	 * 	   topic隊列使用*、#的通配符進行匹配topic交換機的消息
	 * 	         官方:Topics
	 * @return
	 */
	@Bean
	public Queue topicQueueTwo(){
		return new Queue(TOPIC_QUEUE_TWO, true);
	}
	
	/**
	 * @author 七脈
	 * 描述:定義 FanoutExchange 交換機
	 * 	   FanoutExchange交換機,將消息發送到每一個綁定的消息隊列中
	 * @return
	 */
	@Bean
	public FanoutExchange fanoutExchange(){
		return new FanoutExchange(FANOUT_EXCHANGE, true, true);
	}
	
	/**
	 * @author 七脈
	 * 描述:定義 DirectExchange 交換機
	 *     DirectExchange交換機,將消息發送到每一個綁定且對應routing-key的隊列中
	 * @return
	 */
	@Bean
	public DirectExchange directExchange(){
		return new DirectExchange(DIRECT_EXCHANGE, true, true);
	}
	
	/**
	 * @author 七脈
	 * 描述:定義 TopicExchange 交換機
	 * 	   TopicExchange交換機,將消息發送到每一個綁定且匹配topic通配符的隊列中
	 * @return
	 */
	@Bean
	public TopicExchange topicExchange(){
		return new TopicExchange(TOPIC_EXCHANGE, true, true);
	}
	
	/**
	 * @author 七脈
	 * 描述:將第一個FanoutQueueOne隊列綁定到FanoutExchange交換機
	 * @param fanoutQueueOne
	 * @param fanoutExchange
	 * @return
	 */
	@Bean
	public Binding bindingFanoutQueueOne(Queue fanoutQueueOne, FanoutExchange fanoutExchange){
		return BindingBuilder.bind(fanoutQueueOne).to(fanoutExchange);
	}
	
	/**
	 * @author 七脈
	 * 描述:將第二個FanoutQueueTwo隊列綁定到FanoutExchange交換機
	 * @param fanoutQueueOne
	 * @param fanoutExchange
	 * @return
	 */
	@Bean
	public Binding bindingFanoutQueueTwo(Queue fanoutQueueTwo, FanoutExchange fanoutExchange){
		return BindingBuilder.bind(fanoutQueueTwo).to(fanoutExchange);
	}
	
	/**
	 * @author 七脈
	 * 描述:將第一個DirectQueueOne隊列綁定到DirectExchange交換機
	 * @param directQueueOne
	 * @param directExchange
	 * @return
	 */
	@Bean
	public Binding bindingDirectQueueOne(Queue directQueueOne, DirectExchange directExchange){
		return BindingBuilder.bind(directQueueOne).to(directExchange).with(ROUTING_KEY);
	}
	
	/**
	 * @author 七脈
	 * 描述:將第二個DirectQueueTwo隊列綁定到DirectExchange交換機
	 * @param directQueueOne
	 * @param directExchange
	 * @return
	 */
	@Bean
	public Binding bindingDirectQueueTwo(Queue directQueueTwo, DirectExchange directExchange){
		return BindingBuilder.bind(directQueueTwo).to(directExchange).with(ROUTING_KEY);
	}
	
	/**
	 * @author 七脈
	 * 描述:將第一個TopicQueueOne隊列綁定到TopicExchange交換機
	 * @param topicQueueOne
	 * @param topicExchange
	 * @return
	 */
	@Bean
	public Binding bindingTopicQueueOne(Queue topicQueueOne, TopicExchange topicExchange){
		return BindingBuilder.bind(topicQueueOne).to(topicExchange).with(TOPICS_ONE);
	}
	
	/**
	 * @author 七脈
	 * 描述:將第二個TopicQueueOne隊列綁定到TopicExchange交換機
	 * @param topicQueueOne
	 * @param topicExchange
	 * @return
	 */
	@Bean
	public Binding bindingTopicQueueTwo(Queue topicQueueTwo, TopicExchange topicExchange){
		return BindingBuilder.bind(topicQueueTwo).to(topicExchange).with(TOPICS_MORE);
	}
	
}

2.生產者提供消息給隊列

package com.qy.mq.provider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
 * @author 七脈
 * 描述:MQ消息發送類
 */
@Service
public class ProviderService {
	
	@Autowired
	private AmqpTemplate amqpTemplate;
	
	/**
	 * @author 七脈
	 * 描述:發送普通隊列消息 HelloWorld
	 * @param msg
	 */
	public void sendMsgForHelloWorldQueue(String msg){
		System.out.println("普通隊列HelloWorld-生產者發送:"+msg);
		amqpTemplate.convertAndSend(RabbitmqConfig.HELLO_WORLD_QUEUE, msg);
	}
	
	/**
	 * @author 七脈
	 * 描述:發送工作隊列消息
	 * @param msg
	 */
	public void sendMsgForWorkQueue(String msg){
		System.out.println("工作隊列-生產者發送:"+msg);
		amqpTemplate.convertAndSend(RabbitmqConfig.WORK_QUEUE, msg);
	}
	
	/**
	 * @author 七脈
	 * 描述:發送到FanoutExchange交換機,交換機會發送到綁定的隊列
	 * @param msg
	 */
	public void sendMsgForFanoutExchange(String msg){
		System.out.println("FanoutExchange交換機-生產者發送:"+msg);
		amqpTemplate.convertAndSend(RabbitmqConfig.FANOUT_EXCHANGE, null, msg);
	}
	
	/**
	 * @author 七脈
	 * 描述:發送消息到DirectExchange交換機,交換機會發送到綁定且指定routing-key的隊列
	 * @param msg
	 */
	public void sendMsgForDirectExchange(String msg){
		System.out.println("DirectExchange交換機-生產者發送:"+msg);
		amqpTemplate.convertAndSend(RabbitmqConfig.DIRECT_EXCHANGE, RabbitmqConfig.ROUTING_KEY, msg);
	}
	
	/**
	 * @author 七脈
	 * 描述:發送消息到TopicExchange交換機,交換機會發送到綁定且匹配通配符的隊列
	 * @param msg
	 */
	public void sendMsgForTopicExchange(String msg, String wildcard){
		System.out.println("TopicExchange交換機-生產者發送:"+msg);
		amqpTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANGE, wildcard, msg);
	}
}

3.消費者監聽消費隊列消息

package com.qy.mq.consumer;

import java.io.IOException;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

/**
 * @author 七脈
 * 描述:普通隊列helloworld,一個生產者、一個隊列、一個消費者
 */
@Service
public class ConsumerHelloWorld {
	
	@Value("${server.port}")
	private String port;
	
	
	@RabbitListener(queues=RabbitmqConfig.HELLO_WORLD_QUEUE)
	public void recive(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
		System.out.println(msg);
		channel.basicAck(deliveryTag, false);//應答
		//channel.basicNack(deliveryTag, false, true);//不應答
		//channel.basicReject(deliveryTag, true);//拒絕應答
	}
	
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM