幾個概念說明:
Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
交換機路由的幾種類型:
Direct Exchange:直接匹配,通過Exchange名稱+RountingKey來發送與接收消息.
Fanout Exchange:廣播訂閱,向所有的消費者發布消息,但是只有消費者將隊列綁定到該路由器才能收到消息,忽略Routing Key.
Topic Exchange:主題匹配訂閱,這里的主題指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.來分隔多個詞,只有消息這將隊列綁定到該路由器且指定RoutingKey符合匹配規則時才能收到消息;
Headers Exchange:消息頭訂閱,消息發布前,為消息定義一個或多個鍵值對的消息頭,然后消費者接收消息同時需要定義類似的鍵值對請求頭:(如:x-mactch=all或者x_match=any),只有請求頭與消息頭匹配,才能接收消息,忽略RoutingKey.
默認的exchange:如果用空字符串去聲明一個exchange,那么系統就會使用”amq.direct”這個exchange,我們創建一個queue時,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去
安裝Erland
http://www.erlang.org/downloads
安裝RabbitMQ
https://www.rabbitmq.com/download.html
開啟RabbitMQ服務
執行rabbitmq-plugins enable rabbitmq_management命令,開啟Web管理插件
重啟RabbitMQ服務
Web地址
http://localhost:15672/
默認用戶名和密碼:guest
一、引入springboot和rabbitmq的依賴

<!-- 添加springboot對amqp的支持 --> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.0</version> </dependency>
二、新增application.properties對rabbimq的配置信息

spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=116.255.193.36
spring.rabbitmq.port=5672
spring.rabbitmq.username=scrm
spring.rabbitmq.password=scrm
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=scrm
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#最小消息監聽線程數
spring.rabbitmq.listener.concurrency=2
#最大消息監聽線程數
spring.rabbitmq.listener.max-concurrency=2
三、公共設置類
1、隊列、消息交換機,路由關鍵字公共枚舉類

package cloud.app.prod.home.rabbitmq; /** * Author : YongBo Xie </br> * File Name: RabbitMqEnum.java </br> * Created Date: 2018年3月28日 上午10:32:02 </br> * Modified Date: 2018年3月28日 上午10:32:02 </br> * Version: 1.0 </br> */ public class RabbitMqEnum { /** * describe: 定義隊列名稱 **/ public enum QueueName { MARKETING_ACTIVITIE_QUEUE("marketingActivitieQueue", "營銷活動隊列"); private String code; private String name; QueueName(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } /** * describe: 定義交換機 **/ public enum Exchange { DIRECT_EXCHANGE("directExchange", "直連交換機"), FANOUT_EXCHANGE("fanoutExchange", "扇形交換機"), TOPIC_EXCHANGE("topicExchange", "主題交換機"), HEADERS_EXCHANGE("headersExchange", "首部交換機"); private String code; private String name; Exchange(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } /** * describe: 定義routing_key **/ public enum QueueKey { MARKETING_ACTIVITIE_DIRECT("marketingActivitie", "營銷活動key"), MARKETING_ACTIVITIE_TOPIC_01("*.marketingActivitie.*", "營銷活動key"), MARKETING_ACTIVITIE_TOPIC_02("marketingActivitie.#", "營銷活動key"); private String code; private String name; QueueKey(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } }
2、數據連接配置類

package cloud.app.prod.home.rabbitmq; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Author : YongBo Xie </br> * File Name: RabbitConfig.java </br> * Created Date: 2018年3月28日 下午6:41:17 </br> * Modified Date: 2018年3月28日 下午6:41:17 </br> * Version: 1.0 </br> */ @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String addresses; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.publisher-confirms}") private Boolean publisherConfirms; @Value("${spring.rabbitmq.publisher-returns}") private Boolean publisherReturns; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; // 構建mq實例工廠 @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherReturns(publisherReturns); return connectionFactory; } }
3、生產者類

package cloud.app.prod.home.rabbitmq; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; /** * Author : YongBo Xie </br> * File Name: RabbitMqSender.java </br> * Created Date: 2018年3月30日 上午10:48:36 </br> * Modified Date: 2018年3月30日 上午10:48:36 </br> * Version: 1.0 </br> */ @Component public class RabbitMqSender { private static Logger logger = Logger.getLogger(RabbitMqSender.class); @Bean public RabbitTemplate messageRabbitTemplate(ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new ConfirmCallback() { /** * 回調 * @param correlationData 消息唯一標識 * @param ack 確認結果 * @param cause 失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("消息唯一標識:"+correlationData); logger.info("確認結果:"+ack); logger.info("失敗原因:"+cause); } }); rabbitTemplate.setReturnCallback(new ReturnCallback() { /** * 用於實現消息發送到RabbitMQ交換器,但無相應隊列與交換器綁定時的回調 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info(message.getMessageProperties().getCorrelationIdString() + " 發送失敗"); } }); return rabbitTemplate; } }
四、個例
1、初始化隊列、消息交換機,並把隊列綁定到消息交換機

package cloud.app.prod.home.rabbitmq.mem; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import cloud.app.prod.home.rabbitmq.RabbitMqEnum; /** * Author : YongBo Xie </br> * File Name: RabbitConfig.java </br> * Created Date: 2018年3月27日 下午3:13:57 </br> * Modified Date: 2018年3月27日 下午3:13:57 </br> * Version: 1.0 </br> */ @Configuration public class MarketingActivitieRabbitConfig { // private static Logger logger = Logger.getLogger(MarketingActivitieRabbitConfig.class); /** * 構建隊列,名稱,是否持久化之類 * @return */ @Bean public Queue marketingActivitieQueue() { return new Queue(RabbitMqEnum.QueueName.MARKETING_ACTIVITIE_QUEUE.getCode(), true); } /** * 直連交換機(模式) * 用於實例間的任務分發 * 是一種帶路由功能的交換機,一個隊列會和一個交換機綁定,除此之外再綁定一個routing_key */ @Bean public DirectExchange createDirectExchange() { return new DirectExchange(RabbitMqEnum.Exchange.DIRECT_EXCHANGE.getCode()); } /** * 扇形交換機(模式) * 分發給所有綁定到該exchange上的隊列,忽略routing key * 速度是所有的交換機類型里面最快的 */ @Bean public FanoutExchange createFanoutExchange() { return new FanoutExchange(RabbitMqEnum.Exchange.FANOUT_EXCHANGE.getCode()); } /** * 主題交換機(模式) * 通過可配置的規則分發給綁定在該exchange上的隊列 * 發送到主題交換機上的消息需要攜帶指定規則的routing_key * 交換機和隊列的binding_key需要采用*.#.*.....的格式,每個部分用.分開 * *表示一個單詞 * #表示任意數量(零個或多個)單詞 */ @Bean public TopicExchange createTopicExchange() { return new TopicExchange(RabbitMqEnum.Exchange.TOPIC_EXCHANGE.getCode()); } /** * 首部交換機(模式) * 適用規則復雜的分發,用headers里的參數表達規則,有點像HTTP的Headers * 綁定交換機和隊列的時候,Hash結構中要求攜帶一個鍵“x-match”,這個鍵的Value可以是any或者all, * 這代表消息攜帶的Hash是需要全部匹配(all),還是僅匹配一個鍵(any)就可以了 */ @Bean public HeadersExchange createHeadersExchange() { return new HeadersExchange(RabbitMqEnum.Exchange.HEADERS_EXCHANGE.getCode()); } /** * 隊列和直連交換機綁定 * @param queue * @param routingKey * @return */ @Bean public Binding bindingQueueWithDirectExchange() { return BindingBuilder.bind(marketingActivitieQueue()).to(createDirectExchange()) .with(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_DIRECT.getCode()); } /** * 隊列和扇形交換機綁定 * @param queue * @return */ @Bean public Binding bindingQueueWithFanoutExchange() { return BindingBuilder.bind(marketingActivitieQueue()).to(createFanoutExchange()); } /** * 隊列和主題交換機綁定 * @param queue * @param routingKey * @return */ @Bean public Binding bindingQueueWithTopicExchange() { return BindingBuilder.bind(marketingActivitieQueue()).to(createTopicExchange()) .with(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_TOPIC_01.getCode()); } /** * 隊列和首部交換機綁定 * key和value匹配 * @param queue * @param key * @param value * @return */ // @Bean // public Binding bindingQueueWithHeadersExchange() { // return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()) // .where(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_HEADERS.getCode()) // .matches(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_HEADERS.getName()); // } /** * 隊列和首部交換機綁定(x-match : all) * 完全匹配 * @param queue * @param headerValues * @return */ // @Bean // public Binding bindingQueueWithHeadersExchangeAll(Map<String, Object> headerValues) { // return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()).whereAll(headerValues).match(); // } /** * 隊列和首部交換機綁定(x-match : all) * 任一匹配 * @param queue * @param headerValues * @return */ // @Bean // public Binding bindingQueueWithHeadersExchangeAny(Map<String, Object> headerValues) { // return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()).whereAny(headerValues).match(); // } }
2、生產者

package cloud.app.prod.home.rabbitmq.mem; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import cloud.app.prod.home.common.FailException; import cloud.app.prod.home.mem.vo.MarketingActivitiesVO; import cloud.app.prod.home.rabbitmq.RabbitMqEnum; import cloud.app.prod.home.utils.DSHUtils; /** * Author : YongBo Xie </br> * File Name: MarketingActivitieRabbitMqSender.java </br> * Created Date: 2018年3月28日 下午2:16:32 </br> * Modified Date: 2018年3月28日 下午2:16:32 </br> * Version: 1.0 </br> */ @Component public class MarketingActivitieRabbitMqSender { private static Logger logger = Logger.getLogger(MarketingActivitieRabbitMqSender.class); @Autowired private RabbitTemplate rabbitTemplate; /** * 發送消息 * rabbitTemplate.send(message); //發消息,參數類型為org.springframework.amqp.core.Message * rabbitTemplate.convertAndSend(object); //轉換並發送消息。 將參數對象轉換為org.springframework.amqp.core.Message后發送 * rabbitTemplate.convertSendAndReceive(message) //轉換並發送消息,且等待消息者返回響應消息 * 針對業務場景選擇合適的消息發送方式即可 * @param obj * @throws FailException */ public void sendRabbitmqDirect(MarketingActivitiesVO marketingActivitiesVO) throws FailException { CorrelationData correlationData = new CorrelationData(DSHUtils.generateUUID()); logger.info("send: " + correlationData.getId()); rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.DIRECT_EXCHANGE.getCode(), RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_DIRECT.getCode() , marketingActivitiesVO, correlationData); } public void sendRabbitmqDirect(String exchange, String routingKey, Object obj) throws FailException { CorrelationData correlationData = new CorrelationData(DSHUtils.generateUUID()); logger.info("send: " + correlationData.getId()); rabbitTemplate.convertAndSend(exchange, routingKey, obj); } }
3、消費者

package cloud.app.prod.home.rabbitmq.mem; import org.apache.log4j.Logger; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import cloud.app.prod.home.rabbitmq.RabbitMqEnum; /** * Author : YongBo Xie </br> * File Name: MarketingActivitieRabbitMqReceiver.java </br> * Created Date: 2018年3月28日 下午3:14:58 </br> * Modified Date: 2018年3月28日 下午3:14:58 </br> * Version: 1.0 </br> */ @Component public class MarketingActivitieRabbitMqReceiver { private static Logger logger = Logger.getLogger(MarketingActivitieRabbitMqReceiver.class); @Bean public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(RabbitMqEnum.QueueName.MARKETING_ACTIVITIE_QUEUE.getCode()); container.setMessageListener(messageListener()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置為手動 return container; } // @RabbitListener(queues = "marketingActivitieQueue") // @RabbitHandler // public void process(String msg) { // logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊列的消息:" + msg); // } @Bean public ChannelAwareMessageListener messageListener() { return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { channel.confirmSelect();//在設置消息被消費的回調前需顯示調用,否則回調函數無法調用 if (message.toString().indexOf("1") > 0){ logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊列的消息1:" + message.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } if (message.toString().indexOf("2") > 0){ logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊列的消息2:" + message.toString()); //被拒絕的是否重新入隊列 //channel.basicNack 與 channel.basicReject 的區別在於basicNack可以拒絕多條消息,而basicReject一次只能拒絕一條消息 // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊列的消息3:" + message.toString()); } }; } }