springboot + rabbitmq 整合示例


幾個概念說明:
Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

交換機路由的幾種類型:
Direct Exchange:直接匹配,通過Exchange名稱+RountingKey來發送與接收消息.
Fanout Exchange:廣播訂閱,向所有的消費者發布消息,但是只有消費者將隊列綁定到該路由器才能收到消息,忽略Routing Key.
Topic Exchange:主題匹配訂閱,這里的主題指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.來分隔多個詞,只有消息這將隊列綁定到該路由器且指定RoutingKey符合匹配規則時才能收到消息;
Headers Exchange:消息頭訂閱,消息發布前,為消息定義一個或多個鍵值對的消息頭,然后消費者接收消息同時需要定義類似的鍵值對請求頭:(如:x-mactch=all或者x_match=any),只有請求頭與消息頭匹配,才能接收消息,忽略RoutingKey.
默認的exchange:如果用空字符串去聲明一個exchange,那么系統就會使用”amq.direct”這個exchange,我們創建一個queue時,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去

安裝Erland
http://www.erlang.org/downloads

安裝RabbitMQ
https://www.rabbitmq.com/download.html

開啟RabbitMQ服務
執行rabbitmq-plugins enable rabbitmq_management命令,開啟Web管理插件
重啟RabbitMQ服務

Web地址
http://localhost:15672/
默認用戶名和密碼:guest

 

一、引入springboot和rabbitmq的依賴

<!-- 添加springboot對amqp的支持 -->
<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.0</version>
</dependency>
View Code

二、新增application.properties對rabbimq的配置信息

spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=116.255.193.36
spring.rabbitmq.port=5672
spring.rabbitmq.username=scrm
spring.rabbitmq.password=scrm
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=scrm
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#最小消息監聽線程數
spring.rabbitmq.listener.concurrency=2  
#最大消息監聽線程數
spring.rabbitmq.listener.max-concurrency=2 
View Code

三、公共設置類

1、隊列、消息交換機,路由關鍵字公共枚舉類

package cloud.app.prod.home.rabbitmq;

/**
 * Author : YongBo Xie </br>
 * File Name: RabbitMqEnum.java </br>
 * Created Date: 2018年3月28日 上午10:32:02 </br>
 * Modified Date: 2018年3月28日 上午10:32:02 </br>
 * Version: 1.0 </br>
*/

public class RabbitMqEnum {
    
    /**
     * describe: 定義隊列名稱
     **/
    public enum QueueName {
        MARKETING_ACTIVITIE_QUEUE("marketingActivitieQueue", "營銷活動隊列");

        private String code;
        private String name;

        QueueName(String code, String name) {
            this.code = code;
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        public String getName() {
            return name;
        }

    }
    
    /**
     * describe: 定義交換機
     **/
    public enum Exchange {
        DIRECT_EXCHANGE("directExchange", "直連交換機"),
        FANOUT_EXCHANGE("fanoutExchange", "扇形交換機"),
        TOPIC_EXCHANGE("topicExchange", "主題交換機"),
        HEADERS_EXCHANGE("headersExchange", "首部交換機");

        private String code;
        private String name;

        Exchange(String code, String name) {
            this.code = code;
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        public String getName() {
            return name;
        }

    }

    /**
     * describe: 定義routing_key
     **/
    public enum QueueKey {
        MARKETING_ACTIVITIE_DIRECT("marketingActivitie", "營銷活動key"),
        MARKETING_ACTIVITIE_TOPIC_01("*.marketingActivitie.*", "營銷活動key"),
        MARKETING_ACTIVITIE_TOPIC_02("marketingActivitie.#", "營銷活動key");

        private String code;
        private String name;

        QueueKey(String code, String name) {
            this.code = code;
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        public String getName() {
            return name;
        }
    }

}
View Code

2、數據連接配置類

package cloud.app.prod.home.rabbitmq;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Author : YongBo Xie </br>
 * File Name: RabbitConfig.java </br>
 * Created Date: 2018年3月28日 下午6:41:17 </br>
 * Modified Date: 2018年3月28日 下午6:41:17 </br>
 * Version: 1.0 </br>
 */
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitConfig {

    @Value("${spring.rabbitmq.host}")
    private String addresses;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.publisher-confirms}")
    private Boolean publisherConfirms;
    @Value("${spring.rabbitmq.publisher-returns}")
    private Boolean publisherReturns;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    // 構建mq實例工廠
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherReturns(publisherReturns);
        return connectionFactory;
    }

}
View Code

3、生產者類

package cloud.app.prod.home.rabbitmq;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Author : YongBo Xie </br>
 * File Name: RabbitMqSender.java </br>
 * Created Date: 2018年3月30日 上午10:48:36 </br>
 * Modified Date: 2018年3月30日 上午10:48:36 </br>
 * Version: 1.0 </br>
*/
@Component
public class RabbitMqSender {
    
    private static Logger logger = Logger.getLogger(RabbitMqSender.class);
    
    @Bean
    public RabbitTemplate messageRabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
            
            /**
             * 回調
             * @param correlationData 消息唯一標識
             * @param ack 確認結果
             * @param cause 失敗原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                logger.info("消息唯一標識:"+correlationData);
                logger.info("確認結果:"+ack);
                logger.info("失敗原因:"+cause);
            }
        });
        
        rabbitTemplate.setReturnCallback(new ReturnCallback() {
            
            /**
             * 用於實現消息發送到RabbitMQ交換器,但無相應隊列與交換器綁定時的回調
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                logger.info(message.getMessageProperties().getCorrelationIdString() + " 發送失敗");
            }
        });
        
        return rabbitTemplate;
    }

}
View Code

四、個例

1、初始化隊列、消息交換機,並把隊列綁定到消息交換機

package cloud.app.prod.home.rabbitmq.mem;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import cloud.app.prod.home.rabbitmq.RabbitMqEnum;

/**
 * Author : YongBo Xie </br>
 * File Name: RabbitConfig.java </br>
 * Created Date: 2018年3月27日 下午3:13:57 </br>
 * Modified Date: 2018年3月27日 下午3:13:57 </br>
 * Version: 1.0 </br>
*/
@Configuration
public class MarketingActivitieRabbitConfig {
    
//    private static Logger logger = Logger.getLogger(MarketingActivitieRabbitConfig.class);
    
    /**
     * 構建隊列,名稱,是否持久化之類
     * @return
     */
    @Bean
    public Queue marketingActivitieQueue() {
        return new Queue(RabbitMqEnum.QueueName.MARKETING_ACTIVITIE_QUEUE.getCode(), true);
    }

    /**
     * 直連交換機(模式)
     * 用於實例間的任務分發
     * 是一種帶路由功能的交換機,一個隊列會和一個交換機綁定,除此之外再綁定一個routing_key
     */
    @Bean
    public DirectExchange createDirectExchange() {
        return new DirectExchange(RabbitMqEnum.Exchange.DIRECT_EXCHANGE.getCode());
    }
    
    /**
     * 扇形交換機(模式)
     * 分發給所有綁定到該exchange上的隊列,忽略routing key
     * 速度是所有的交換機類型里面最快的
     */
    @Bean
    public FanoutExchange createFanoutExchange() {
        return new FanoutExchange(RabbitMqEnum.Exchange.FANOUT_EXCHANGE.getCode());
    }
    
    /**
     * 主題交換機(模式)
     * 通過可配置的規則分發給綁定在該exchange上的隊列
     * 發送到主題交換機上的消息需要攜帶指定規則的routing_key
     * 交換機和隊列的binding_key需要采用*.#.*.....的格式,每個部分用.分開
     * *表示一個單詞
     * #表示任意數量(零個或多個)單詞
     */
    @Bean
    public TopicExchange createTopicExchange() {
        return new TopicExchange(RabbitMqEnum.Exchange.TOPIC_EXCHANGE.getCode());
    }
    
    /**
     * 首部交換機(模式)
     * 適用規則復雜的分發,用headers里的參數表達規則,有點像HTTP的Headers
     * 綁定交換機和隊列的時候,Hash結構中要求攜帶一個鍵“x-match”,這個鍵的Value可以是any或者all,
     * 這代表消息攜帶的Hash是需要全部匹配(all),還是僅匹配一個鍵(any)就可以了
     */
    @Bean
    public HeadersExchange createHeadersExchange() {
        return new HeadersExchange(RabbitMqEnum.Exchange.HEADERS_EXCHANGE.getCode());
    }
    
    /**
     * 隊列和直連交換機綁定
     * @param queue
     * @param routingKey
     * @return
     */
    @Bean
    public Binding bindingQueueWithDirectExchange() {
        return BindingBuilder.bind(marketingActivitieQueue()).to(createDirectExchange())
                .with(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_DIRECT.getCode());
    }
    
    /**
     * 隊列和扇形交換機綁定
     * @param queue
     * @return
     */
    @Bean
    public Binding bindingQueueWithFanoutExchange() {
        return BindingBuilder.bind(marketingActivitieQueue()).to(createFanoutExchange());
    }
    
    /**
     * 隊列和主題交換機綁定
     * @param queue
     * @param routingKey
     * @return
     */
    @Bean
    public Binding bindingQueueWithTopicExchange() {
        return BindingBuilder.bind(marketingActivitieQueue()).to(createTopicExchange())
                .with(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_TOPIC_01.getCode());
    }
    
    /**
     * 隊列和首部交換機綁定
     * key和value匹配
     * @param queue
     * @param key
     * @param value
     * @return
     */
//    @Bean
//    public Binding bindingQueueWithHeadersExchange() {
//        return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange())
//                .where(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_HEADERS.getCode())
//                .matches(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_HEADERS.getName());
//    }
    
    /**
     * 隊列和首部交換機綁定(x-match : all)
     * 完全匹配
     * @param queue
     * @param headerValues
     * @return
     */
//    @Bean
//    public Binding bindingQueueWithHeadersExchangeAll(Map<String, Object> headerValues) {
//        return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()).whereAll(headerValues).match();
//    }
    
    /**
     * 隊列和首部交換機綁定(x-match : all)
     * 任一匹配
     * @param queue
     * @param headerValues
     * @return
     */
//    @Bean
//    public Binding bindingQueueWithHeadersExchangeAny(Map<String, Object> headerValues) {
//        return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()).whereAny(headerValues).match();
//    }
}
View Code

2、生產者

package cloud.app.prod.home.rabbitmq.mem;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import cloud.app.prod.home.common.FailException;
import cloud.app.prod.home.mem.vo.MarketingActivitiesVO;
import cloud.app.prod.home.rabbitmq.RabbitMqEnum;
import cloud.app.prod.home.utils.DSHUtils;

/**
 * Author : YongBo Xie </br>
 * File Name: MarketingActivitieRabbitMqSender.java </br>
 * Created Date: 2018年3月28日 下午2:16:32 </br>
 * Modified Date: 2018年3月28日 下午2:16:32 </br>
 * Version: 1.0 </br>
*/
@Component
public class MarketingActivitieRabbitMqSender {
    
    private static Logger logger = Logger.getLogger(MarketingActivitieRabbitMqSender.class);
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /**
     * 發送消息
     * rabbitTemplate.send(message); //發消息,參數類型為org.springframework.amqp.core.Message 
     * rabbitTemplate.convertAndSend(object); //轉換並發送消息。 將參數對象轉換為org.springframework.amqp.core.Message后發送 
     * rabbitTemplate.convertSendAndReceive(message) //轉換並發送消息,且等待消息者返回響應消息
     * 針對業務場景選擇合適的消息發送方式即可
     * @param obj
     * @throws FailException
     */
    public void sendRabbitmqDirect(MarketingActivitiesVO marketingActivitiesVO) throws FailException {
        CorrelationData correlationData = new CorrelationData(DSHUtils.generateUUID());
        logger.info("send: " + correlationData.getId());
        rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.DIRECT_EXCHANGE.getCode(), RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_DIRECT.getCode() , marketingActivitiesVO, correlationData);
    }

    public void sendRabbitmqDirect(String exchange, String routingKey, Object obj) throws FailException {
        CorrelationData correlationData = new CorrelationData(DSHUtils.generateUUID());
        logger.info("send: " + correlationData.getId());
        rabbitTemplate.convertAndSend(exchange, routingKey, obj);
    }

}
View Code

3、消費者

package cloud.app.prod.home.rabbitmq.mem;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

import cloud.app.prod.home.rabbitmq.RabbitMqEnum;

/**
 * Author : YongBo Xie </br>
 * File Name: MarketingActivitieRabbitMqReceiver.java </br>
 * Created Date: 2018年3月28日 下午3:14:58 </br>
 * Modified Date: 2018年3月28日 下午3:14:58 </br>
 * Version: 1.0 </br>
*/
@Component
public class MarketingActivitieRabbitMqReceiver {
    
    private static Logger logger = Logger.getLogger(MarketingActivitieRabbitMqReceiver.class);
    
    @Bean
    public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(RabbitMqEnum.QueueName.MARKETING_ACTIVITIE_QUEUE.getCode());
        container.setMessageListener(messageListener());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置為手動
        return container;
    }
    
//    @RabbitListener(queues = "marketingActivitieQueue") 
//    @RabbitHandler
//    public void process(String msg) { 
//        logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊列的消息:" + msg); 
//    }


    @Bean
    public ChannelAwareMessageListener messageListener() {
        return new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                channel.confirmSelect();//在設置消息被消費的回調前需顯示調用,否則回調函數無法調用
                if (message.toString().indexOf("1") > 0){
                    logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊列的消息1:" + message.toString()); 
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                }

                if (message.toString().indexOf("2") > 0){
                    logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊列的消息2:" + message.toString());
                    
                    //被拒絕的是否重新入隊列
                    //channel.basicNack 與 channel.basicReject 的區別在於basicNack可以拒絕多條消息,而basicReject一次只能拒絕一條消息
//                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                }
                logger.info(Thread.currentThread().getName() + " 接收到來自marketingActivitieQueue隊列的消息3:" + message.toString());
            }
        };
    }

}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM