SpringCloud之RabbitMQ消息隊列原理及配置


  本篇章講解RabbitMQ的用途、原理以及配置,RabbitMQ的安裝請查看SpringCloud之RabbitMQ安裝

一、MQ用途

  1、同步變異步消息

  場景:用戶下單完成后,發送郵件和短信通知。

  運用消息隊列之后,用戶下單完之后,下單信息寫入數據庫,再寫入消息隊列,發送郵件和發送短信各自去消息隊列進行讀取,節省時間,提高效率。

      

  2、應用解耦

  場景:用戶下單后,訂單系統需要多渠道通知用戶。

  下單服務系統:用戶使用下單服務后,將下單信息寫入數據庫,下單成功。

  短信服務系統:用戶下單后,將短信信息寫入消息隊列,以發送短信信息通知用戶交易信息。

  郵件服務系統:用戶下單后,將郵件信息寫入消息隊列,以發送郵件信息通知用戶交易信息。

  這樣,如果微信通知不能正常使用,也不影響用戶下單,用戶下單后,只用把下單通知信息寫入消息隊列,不用關心后續操作,實現了訂單系統和通知系統的解耦。

            

  3、流量削峰

  一般在秒殺或者團購活動中使用。

  場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。針對這個問題,一般需要在應用前端加入消息隊列。

    a.可以控制活動的人數

    b.可以緩解短時間內高流量壓垮應用

  用戶的請求,服務器接收后,首先寫入消息隊列,如果消息隊列的數量大於最大的數量,則直接拋棄用戶請求或者跳轉錯誤頁面。

                

二、RabbitMQ原理介紹

  如圖所示:

    

  各組件意義如下:

  

三、RabbitMQ應用

  RabbitMQ包依賴(spring-boot-starter-amqp):

<!-- rabbitMQ的依賴。rabbitmq已經被spring-boot做了整合訪問實現。
    spring cloud也對springboot做了整合邏輯。所以rabbitmq的依賴可以在spring cloud中直接使用。
 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  1、Direct交換器

  是一種點對點,實現發布/訂閱標准的交換器。Producer發送消息到RabbitMQ中,MQ中的Direct交換器接受到消息后,會根據Routing Key來決定這個消息要發送到哪一個隊列中。Consumer則負責注冊一個隊列監聽器,來監聽隊列的狀態,當隊列狀態發生變化時,消費消息。注冊隊列監聽需要提供交換器信息,隊列信息和路由鍵信息。

  這種交換器通常用於點對點消息傳輸的業務模型中。如電子郵箱。

  如下圖所示日志處理MQ示例:

            

  Producer全局配置文件:

spring.application.name=direct-producer
server.port=8082

# 必要配置 # 配置rabbitmq鏈接相關信息。key都是固定的。是springboot要求的。 # rabbitmq安裝位置
spring.rabbitmq.host=localhost
# rabbitmq的端口
spring.rabbitmq.port=5672
# rabbitmq的用戶名
spring.rabbitmq.username=test
# rabbitmq的用戶密碼
spring.rabbitmq.password=123456

# 可選配置 # 配置producer中操作的Queue和Exchange相關信息的。key是自定義的。為了避免硬編碼(代碼中可以寫死)。 # exchange的命名。交換器名稱可以隨意定義。
mq.config.exchange=log.direct
# 路由鍵, 是定義某一個路由鍵。 info級別日志使用的queue的路由鍵。
mq.config.queue.info.routing.key=log.info.routing.key
# 路由鍵,error級別日志使用的queue的路由鍵。
mq.config.queue.error.routing.key=log.error.routing.key

  Producer消息發送類:

/**
 * 消息發送者 - Producer。
 * @Component Producer類型的對象,必須交由Spring容器管理。
 * 使用SpringBoot提供的AMQP啟動器,來訪問rabbitmq的時候,都是通過AmqpTemplate來實現的。
 * 如果全局配置文件中,配置了rabbitmq相關內容,且工程依賴了starter-amqp,則spring容器自動創建AmqpTemplate對象。 */
@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;
    
    //exchange 交換器名稱
    @Value("${mq.config.exchange}")
    private String exchange;
    
    //routingkey 路由鍵
    @Value("${mq.config.queue.info.routing.key}")
    private String routingkey;
    /*
     * 發送消息的方法
     */
    public void send(LogMessage msg){
        /**
         * convertAndSend - 轉換並發送消息的template方法。
         * 是將傳入的普通java對象,轉換為rabbitmq中需要的message類型對象,並發送消息到rabbitmq中。
         * 參數一:交換器名稱。 類型是String
         * 參數二:路由鍵。 類型是String
         * 參數三:消息,是要發送的消息內容對象。類型是Object */
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);
    }
}

  Producer實體類:

/**
 * 消息內容載體,在rabbitmq中,存儲的消息可以是任意的java類型的對象。
 * 強制要求,作為消息數據載體的類型,必須是Serializable的。
 * 如果消息數據載體類型未實現Serializable,在收發消息的時候,都會有異常發生。 */
public class LogMessage implements Serializable {

    private Long id;
    private String msg;
    private String logLevel;
    private String serviceType;
    private Date createTime;
    private Long userId;
    public LogMessage() {
        super();
    }
    public LogMessage(Long id, String msg, String logLevel, String serviceType, Date createTime, Long userId) {
        super();
        this.id = id;
        this.msg = msg;
        this.logLevel = logLevel;
        this.serviceType = serviceType;
        this.createTime = createTime;
        this.userId = userId;
    }
    @Override
    public String toString() {
        return "LogMessage [id=" + id + ", msg=" + msg + ", logLevel=" + logLevel + ", serviceType=" + serviceType
                + ", createTime=" + createTime + ", userId=" + userId + "]";
    }
    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    public String getLogLevel() {
        return logLevel;
    }
    public void setLogLevel(String logLevel) {
        this.logLevel = logLevel;
    }
    public String getServiceType() {
        return serviceType;
    }
    public void setServiceType(String serviceType) {
        this.serviceType = serviceType;
    }
    public Date getCreateTime() {
        return createTime;
    }
    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }
    public Long getUserId() {
        return userId;
    }
    public void setUserId(Long userId) {
        this.userId = userId;
    }
    
}

  Producer消息產生測試類:

/**
 * Direct交換器
 * Producer測試。
 * 注意:
 * 在rabbitmq中,consumer都是listener監聽模式消費消息的。
 * 一般來說,在開發的時候,都是先啟動consumer,確定有什么exchange、queue、routing-key,然后再啟動producer。
 * 然后再啟動producer發送消息,。
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes=SpringbootServerApplication.class)
public class QueueTest {

    @Autowired
    private Sender sender;
    
    /*
     * 測試消息隊列
     */
    @Test
    public void testSend()throws Exception{
        Long id = 1L;
        while(true){
            Thread.sleep(1000);
            this.sender.send(new LogMessage(id,"test log", "info", "訂單服務", new Date(), id));
            id++;
        }
    }
}

  Consumer全局配置:

spring.application.name=direct-consumer
server.port=8083

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456

# 自定義配置。 配置交換器exchange、路由鍵routing-key、隊列名稱 queue name;在RabbitMQ中隊列的生成 # 交換器名稱
mq.config.exchange=log.direct
# info級別queue的名稱
mq.config.queue.info=log.info
# info級別的路由鍵
mq.config.queue.info.routing.key=log.info.routing.key
# error級別queue的名稱
mq.config.queue.error=log.error
# error級別的路由鍵
mq.config.queue.error.routing.key=log.error.routing.key

  Consumer消費者:

/**
 * 消息接收者 - consumer
 * 
 * @RabbitListener - 可以注解類和方法。
 *  注解類,當表當前類的對象是一個rabbit listener。
 *      監聽邏輯明確,可以由更好的方法定義規范。
 *      必須配合@RabbitHandler才能實現rabbit消息消費能力,一個類可以有多個方法,但是僅有一個方法注解@RabbitHandler。
 * 注解方法,代表當前方法是一個rabbit listener處理邏輯。 * 方便開發,一個類中可以定義若干個listener邏輯。
 *      方法定義規范可能不合理。如:一個方法的處理邏輯太多,造成方法的bad smell。
 * 
 * @RabbitListener -  代表當前類型是一個rabbitmq的監聽器。
 *      bindings:綁定隊列
 * @QueueBinding  - @RabbitListener.bindings屬性的類型。綁定一個隊列。
 * value:綁定隊列, Queue類型。 * exchange:配置交換器, Exchange類型。 * key:路由鍵,字符串類型。
 * 
 * @Queue - 隊列。
 * value:隊列名稱 * autoDelete:是否是一個臨時隊列。 * true :當所有的consumer關閉后,自動刪除queue。 * false:當任意一個consumer啟動並創建queue后,如果queue中有消息未消費,無論是否有consumer繼續執行,都保存queue。
 * 
 * @Exchange - 交換器
 * value:為交換器起個名稱 * type:指定具體的交換器類型 */
@Component
@RabbitListener(
            bindings=@QueueBinding(
                    value=@Queue(value="${mq.config.queue.error}",autoDelete="false"),
                    exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),
                    key="${mq.config.queue.error.routing.key}"
            )
        )
public class ErrorReceiver {

    /**
     * 消費消息的方法。采用消息隊列監聽機制
     * @RabbitHandler - 代表當前方法是監聽隊列狀態的方法,就是隊列狀態發生變化后,執行的消費消息的方法。
     * 方法參數。就是處理的消息的數據載體類型。
     */
    @RabbitHandler
    public void process(LogMessage msg){
        System.out.println("Error..........receiver: "+msg);
    }
}

  2、Topic交換器

  主題交換器,也稱為規則匹配交換器。是通過自定義的模糊匹配規則來決定消息存儲在哪些隊列中。當Producer發送消息到RabbitMQ中時,MQ中的交換器會根據路由鍵來決定消息應該發送到哪些隊列中。Consumer同樣是注冊一個監聽器到隊列,監聽隊列狀態,當隊列狀態發生變化時,消費消息。注冊監聽器需要提供交換器信息,隊列信息和路由鍵信息。

  如下圖所示日志處理MQ示例:

          

  Producer公共配置文件:

spring.application.name=topic-producer

spring.rabbitmq.host=192.168.1.122
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456

mq.config.exchange=log.topic

  Producer的User實體日志發送類:

/**
 * 消息發送者
 */
@Component
public class UserSender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;
    
    //exchange 交換器名稱
    @Value("${mq.config.exchange}")
    private String exchange;
    
    /*
     * 發送消息的方法
     */
    public void send(String msg){
        //向消息隊列發送消息
        //參數一:交換器名稱。
        //參數二:路由鍵
        //參數三:消息
        this.rabbitAmqpTemplate.convertAndSend(this.exchange,"user.log.debug", "user.log.debug....."+msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange,"user.log.info", "user.log.info....."+msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange,"user.log.warn","user.log.warn....."+msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange,"user.log.error", "user.log.error....."+msg);
    }
}

  Producer的Order實體日志發送類:

/**
 * 消息發送者
 */
@Component
public class OrderSender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;
    
    //exchange 交換器名稱
    @Value("${mq.config.exchange}")
    private String exchange;
    
    /*
     * 發送消息的方法
     */
    public void send(String msg){
        //向消息隊列發送消息
        //參數一:交換器名稱。
        //參數二:路由鍵
        //參數三:消息
        this.rabbitAmqpTemplate.convertAndSend(this.exchange,"order.log.debug", "order.log.debug....."+msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange,"order.log.info", "order.log.info....."+msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange,"order.log.warn","order.log.warn....."+msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange,"order.log.error", "order.log.error....."+msg);
    }
}

  Producer測試類:

/**
 * 消息隊列測試類
 * @author Administrator
 *
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes=SpringbootServerApplication.class)
public class QueueTest {

    @Autowired
    private UserSender usersender;
    
    @Autowired
    private ProductSender productsender;
    
    @Autowired
    private OrderSender ordersender;
    
    /*
     * 測試消息隊列
     */
    @Test
    public void test() throws InterruptedException{
        while(true){
            Thread.sleep(1000);
            this.usersender.send("UserSender.....");this.ordersender.send("OrderSender......");
        }
    }
}

  可以看出Producer的發送和Direct沒有區別,Consumer的全局配置文件:

spring.application.name=topic-consumer

spring.rabbitmq.host=192.168.1.122
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456

mq.config.exchange=log.topic
mq.config.queue.info=log.info
mq.config.queue.error=log.error
mq.config.queue.logs=log.all

  Consumer中的info日志消費者:

@Component
@RabbitListener(
            bindings=@QueueBinding(
                    value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),
                    exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
                    key="*.log.info"
            )
        )
public class InfoReceiver {
    @RabbitHandler
    public void process(String msg){
        System.out.println("......Info........receiver: "+msg);
    }
}

  Consumer中的全體日志消費者:

/**
 * 和direct交換器的區別是: Exchange的類型為TOPIC。
 * 全日志處理。
 */
@Component
@RabbitListener(
            bindings=@QueueBinding(
                    value=@Queue(value="${mq.config.queue.logs}",autoDelete="true"),
                    exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
                    key="*.log.*"
            )
        )
public class LogsReceiver {
    @RabbitHandler
    public void process(String msg){
        System.out.println("......All........receiver: "+msg);
    }
}

  3、Fanout交換器

  廣播交換器。這種交換器會將接收到的消息發送給綁定的所有隊列中。當Producer發送消息到RabbitMQ時,交換器會將消息發送到已綁定的所有隊列中,這個過程交換器不會嘗試匹配路由鍵,所以消息中不需要提供路由鍵信息。Consumer仍舊注冊監聽器到隊列,監聽隊列狀態,當隊列狀態發生變化,消費消息。注冊監聽器需要提供交換器信息和隊列信息。

  如下圖所示短信、APP推送的MQ示例:

        

  由於Producer的測試類和以上無差別,不再贅述,如下Producer的發送類:

/**
 * 消息發送者
 * fanout交換器 - 
 *  使用fanout交換器的時候,交換器是忽略routing-key的匹配。
 *  因為廣播不需要考慮路由鍵的匹配,只考慮在Exchange上綁定了多少個queue,這個由Consumer的配置決定。
 *  會將接受到的消息發送到所有的綁定的queue中,進行消息的緩存。 */
@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;
    
    //exchange 交換器名稱
    @Value("${mq.config.exchange}")
    private String exchange;
    
    /*
     * 發送消息的方法
     */
    public void send(String msg){
        //向消息隊列發送消息
        //參數一:交換器名稱。
        //參數二:路由鍵  無需填寫,填寫了也無效 //參數三:消息
        this.rabbitAmqpTemplate.convertAndSend(this.exchange,"A", msg);
    }
}

  如下所示Consumer的SMS消費類:

/**
 * 使用fanout交換器的時候,可以在consumer中省略routing-key的配置。 * 因為fanout交換器忽略routing-key的匹配,即使配置當type=ExchangeTypes.FANOUT時也無效。 */
@Component
@RabbitListener(
            bindings=@QueueBinding(
                    value=@Queue(value="${mq.config.queue.sms}",autoDelete="true"),
                    exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.FANOUT)
            )
        )
public class SmsReceiver {
    @RabbitHandler
    public void process(String msg){
        System.out.println("Sms........receiver: "+msg);
    }
}

  如Consumer的Publish消費類:

@Component
@RabbitListener(
            bindings=@QueueBinding(
                    value=@Queue(value="${mq.config.queue.push}",autoDelete="true"),
                    exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.FANOUT)
            )
        )
public class PushReceiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println("Push..........receiver: "+msg);
    }
}

四、RabbitMQ消息可靠性處理

  前面內容,如果consumer未啟動,而producer發送了消息。則消息會丟失。如果consumer先啟動,創建queue后,producer發送消息可以正常消費。那么當所有的consumer宕機的時候,queue會auto-delete,消息仍舊會丟失。這種情況,消息不可靠。有丟失的可能。

  Rabbitmq的消息可靠性處理,分為兩部分。

  • 消息不丟失。當consumer全部宕機后,消息不能丟失。 ------持久化解決
  • 消息不會錯誤消費。當consumer獲取消息后,萬一consumer在消費消息的過程中發生了異常,如果rabbitmq一旦發送消息給consumer后立刻刪除消息,也會有消息丟失的可能。 -------確認機制解決

  1、消息持久化

  • @Queue注解中的屬性 - autoDelete:當所有消費客戶端連接斷開后,是否自動刪除隊列 。true:刪除   false:不刪除
  • @Exchange注解中的屬性 - autoDelete:當交換器所有的綁定隊列都不再使用時,是否自動刪除交換器(更粗粒度,不建議)。true:刪除   false:不刪除

  2、消息確認機制 ACK - acknowledge

  什么是消息確認機制?

  如果在消息處理過程中,消費者的服務器在處理消息時發生異常,那么這條正在處理的消息就很可能沒有完成消息的消費,如果RabbitMQ在Consumer消費消息后立刻刪除消息,則可能造成數據丟失。為了保證數據的可靠性,RabbitMQ引入了消息確認機制。

  • 消息確認機制是消費者Consumer從RabbitMQ中收到消息並處理完成后,反饋給RabbitMQ的,當RabbitMQ收到確認反饋后才會將此消息從隊列中刪除。
  • 如果某Consumer在處理消息時出現了網絡不穩定,服務器異常等現象時,那么就不會有消息確認反饋,RabbitMQ會認為這個消息沒有正常消費,會將消息重新放入隊列中。
  • 如果在Consumer集群環境下,RabbitMQ未接收到Consumer的確認消息時,會立即將這個消息推送給集群中的其他Consumer,保證不丟失消息。
  • 如果Consumer沒有確認反饋,RabbitMQ將永久保存消息。

  消息確認機制默認都是開啟狀態的,同時不推薦關閉消息確認機制。

  注意:如果Consumer沒有處理消息確認,將導致嚴重后果。如:所有的Consumer都沒有正常反饋確認信息,並退出監聽狀態,消息則會永久保存,並處於鎖定狀態,直到消息被正常消費為止。消息的發送者Producer如果持續發送消息到RabbitMQ,那么消息將會堆積,持續占用RabbitMQ所在服務器的內存,導致“內存泄漏”問題。

  消息確認機制處理方案:

  編碼異常處理(推薦)

  通過編碼處理異常的方式,保證消息確認機制正常執行。這種處理方案也可以有效避免消息的重復消費。

  異常處理,不是讓Consumer編碼catch異常后,直接丟棄消息,或反饋ACK確認消息。而是做異常處理的。該拋的異常,還得拋,保證ACK機制的正常執行。或者使用其他的手法,實現消息的再次處理。如:catch代碼塊中,將未處理成功的消息,重新發送給MQ。如:catch代碼中,本地邏輯的重試(使用定時線程池重復執行任務3次。)

  配置重試次數處理

  通常來說,消息重試3次以上未處理成功,就是Consumer開發出現了嚴重問題。需要修改Consumer代碼,提升版本/打補丁之類的處理方案。

  通過全局配置文件,開啟消息消費重試機制,配置重試次數。當RabbitMQ未收到Consumer的確認反饋時,會根據配置來決定重試推送消息的次數,當重試次數使用完畢,無論是否收到確認反饋,RabbitMQ都會刪除消息,避免內存泄漏的可能。具體配置如下:

#開啟重試
spring.rabbitmq.listener.retry.enabled=true #重試次數,默認為3次
spring.rabbitmq.listener.retry.max-attempts=5

五、常用MQ產品對比和選擇

  社區活躍度:RabbitMQ > ActiveMQ = RocketMQ > kafka

  消息持久化:RabbitMQ、ActiveMQ、RocketMQ、kafka都支持持久化。ZeroMQ不支持持久化。

  高並發: RabbitMQ = kafka > RocketMQ > ActiveMQ。RabbitMQ高並發是基於ErLang的。ErLang本身就是針對高並發提供的一種開發腳本語言。

  吞吐量:RabbitMQ = kafka > RocketMQ > ActiveMQ。小型項目(並發吞吐低於萬級別)使用ActiveMQ。中型項目(並發吞吐10萬~100萬級),可選RocketMQ、ActiveMQ。大型項目優先考慮RabbitMQ和Kafka。

  綜合技術:RabbitMQ和kafka最好。RocketMQ次之。ActiveMQ最弱。如:可靠性、路由、集群、事務、高可用隊列、消息可靠排序、持久化、可視化管理工具等。

  RabbitMQ和Kafka選擇:建議Kafka針對日志處理。其他使用RabbitMQ。商業項目中,如果現有的系統架構已經使用了某一個MQ產品,且沒有業務和性能上的問題,不推薦切換MQ產品。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM