RabbitMQ傳輸原理、五種模式


本文代碼基於SpringBoot,文末有代碼連接 。首先是一些在Spring Boot的一些配置和概念,然后跟隨代碼看下五種模式 

MQ兩種消息傳輸方式,點對點(代碼中的簡單傳遞模式),發布/訂閱(代碼中路由模式)。要是你熟悉RabbitMQ SpringBoot配置的話,就是simple和direct。

MQ安裝指南:https://blog.csdn.net/qq_19006223/article/details/89421050

 

0.消息隊列運轉過程

生產者生產過程:
(1)生產者連接到 RabbitMQ Broker 建立一個連接( Connection) ,開啟 個信道 (Channel)
(2) 生產者聲明一個交換器 ,並設置相關屬性,比如交換機類型、是否持久化等
(3)生產者聲明 個隊列井設置相關屬性,比如是否排他、是否持久化、是否自動刪除等
(4)生產者通過路由鍵將交換器和隊列綁定起來。
(5)生產者發送消息至 RabbitMQ Broker ,其中包含路由鍵、交換器等信息。
(6) 相應的交換器根據接收到的路由鍵查找相匹配的隊列 如果找到 ,則將從生產者發送過來的消息存入相應的隊列中。
(7) 如果沒有找到 ,則根據生產者配置的屬性選擇丟棄還是回退給生產者
(8) 關閉信道。
(9) 關閉連接。

消費者接收消息的過程:
(1)消費者連接到 RabbitMQ Broker ,建立一個連接(Connection ,開啟 個信道(Channel)
(2) 消費者向 RabbitMQ Broker 請求消費相應隊列中的消息,可能會設置相應的回調函數, 以及做 些准備工作。
(3)等待 RabbitMQ Broker 回應並投遞相應隊列中的消息, 消費者接收消息。
(4) 消費者確認 ack) 接收到的消息
(5) RabbitMQ 從隊列中刪除相應己經被確認的消息
(6) 關閉信道。
(7)關閉連接。

1.項目結構

 common是工具,receiver是消費者,sender是生產者

具體各自的pom.xml文件請看項目,都有注釋。

2.sender(生產者的配置)

 

   

#確認機制
publisher-confirms: true 消息有沒有到達MQ(會返回一個ack確認碼)
publisher-returns: true 消息有沒有找到合適的隊列
主要是為了生產者和mq之間的一個確認機制,當消息到沒到mq,會提供相應的回調,在項目中 RabbitSender 這個類中進行了相應的配置
 1     private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, s) -> {
 2         if (ack) {
 3             System.out.println(correlationData.getId());
 4         } else {
 5             log.error("ConfirmCallback消息發送失敗: {}", s);
 6         }
 7     };
 8 
 9     private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routingKey)
10             -> log.error("ReturnCallback消息發送失敗: {}", new String(message.getBody(), StandardCharsets.UTF_8));
11 
12 
13     public <T> void sendMsg(String exchangeName, String routingKeyName, T content) {
14         // 設置每個消息都返回一個確認消息
15         this.rabbitTemplate.setMandatory(true);
16         // 消息確認機制
17         this.rabbitTemplate.setConfirmCallback(confirmCallback);
18         // 消息發送失敗機制
19         this.rabbitTemplate.setReturnCallback(returnCallback);
20         // 異步發送消息
21         CorrelationData correlationData = new CorrelationData();
22         correlationData.setId("123");
23         this.rabbitTemplate.convertAndSend(exchangeName, routingKeyName, content, correlationData);
24     }
View Code

 還可以根據需求設置發送時CorrelationData 的值

    #mandatory

參數設為 true 時,交換器無法根據自身的類型和路由鍵找到一個符合條件 的隊列,那么 RabbitM 會調用 Basic.Return 命令將消息返回給生產者。
默認為false,直接丟棄

3.receiver(消費者配置)

 

這里主要說一下 listerner 的相關配置

一共有兩種模式:simple和direct模式

simple主要包括兩種工作模式,direct主要包括四種,待會代碼會詳解。

先說主要配置(以direct為例)

#acknowledge-mode: manual 

手動確認模式,推薦使用這種。就是說當消息被消費者消費時,需要手動返回信息告訴mq。如果是自動的話,mq會自動確認,不管你消費者是否完成消費(比如說拋出異常)

#prefetch: 1

一個消費者一次拉取幾條消息,本demo一條一條來。

#consumers-per-queue: 2

一個隊列可以被多少消費者消費(這個配置,我測試的時候沒測試出來,如果有朋友了解的話,可以評論下。)

還有其他配置,看下源碼,兩種模式共有的

 simple特有的

direct特有的

4.各種模式詳解

---------simple方式下的兩種

打開上面的listener配置

 

4.1 simple

一個生產者,一個消費者

生產者發送消息都在SenderTest里面

 

生產者:

    /**簡單模式*/
    @Test
    public void senderSimple() throws Exception {
        String context = "simple---> " + new Date();
        this.rabbitTemplate.convertAndSend("simple", context);
    }

消費者

    @RabbitListener(queues = "simple")
    public void simple(Message message, Channel channel){
        String messageRec = new String(message.getBody());
        System.out.println("simple模式接收到了消息:"+messageRec);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            System.out.println("報錯了------------------"+e.getMessage());
        }
    }

輸出

  

simple模式接收到了消息:simple---> Sat Apr 20 20:40:16 CST 2019

4.2 work 模式

一個生產者,多個消費者

生產者

    private static final List<Integer> ints = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    /**work模式*/
    @Test
    public void senderWork() throws Exception {
        ints.forEach((i)->{
            String context = "work---> " + i;
            this.rabbitTemplate.convertAndSend("work", context);
        });
    }

消費者

 

    @RabbitListener(queues = "work")
    public void work1(Message message, Channel channel){
        try{
            Thread.sleep(500);
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
        String messageRec = new String(message.getBody());
        System.out.println("work1接收到了消息:"+messageRec);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            System.out.println("work1報錯了------------------"+e.getMessage());
        }
    }


    @RabbitListener(queues = "work")
    public void work2(Message message, Channel channel){
        try{
            Thread.sleep(1000);
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
        String messageRec = new String(message.getBody());
        System.out.println("work2接收到了消息:"+messageRec);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            System.out.println("work2報錯了------------------"+e.getMessage());
        }
    }

輸出

 

work1接收到了消息:work---> 2
work2接收到了消息:work---> 0
work1接收到了消息:work---> 1
work1接收到了消息:work---> 4
work2接收到了消息:work---> 3
work1接收到了消息:work---> 6
work1接收到了消息:work---> 7
work2接收到了消息:work---> 5
work1接收到了消息:work---> 8
work1接收到了消息:work---> 10
work2接收到了消息:work---> 9

-----direct方式下的

切換listener配置

4.3direct交換機

生產者發送消息給指定交換機,綁定的某個隊列。

消費者通過監聽某交換機綁定的某個隊列接受消息。

生產者

    /**direct交換機*/
    @Test
    public void senderDirect() throws Exception {
        rabbitSender.sendMsg("direct","directKey1","directContent1");
        rabbitSender.sendMsg("direct","directKey2","directContent2");
    }

消費者

    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange("direct"), key = "directKey1"
            , value = @Queue(value = "directQueue1", durable = "true", exclusive = "false", autoDelete = "false")))
    public void direct1(String str, Channel channel, Message message) throws IOException {
        try {
            System.out.println("directQueue1接收到了:"+str);
        }catch (Exception e){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }

    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange("direct"), key = "directKey2"
            , value = @Queue(value = "directQueue2", durable = "true", exclusive = "false", autoDelete = "false")))
    public void direct2(String str, Channel channel, Message message) throws IOException {
        try {
            System.out.println("directQueue2接收到了:"+str);
        }catch (Exception e){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }

輸出

directQueue1接收到了:directContent1
directQueue2接收到了:directContent2

4.4 topic交換機

指定主題

# :匹配一個或者多級路徑

*: 匹配一級路徑

生產者

    @Test
    public void senderTopic() throws Exception {
        String contexta = "topic.a";
        rabbitSender.sendMsg("topic","topicKey.a",contexta);
        String contextb = "topic.b";
        rabbitSender.sendMsg("topic","topicKey.b",contextb);
        String contextc = "topic.c";
        rabbitSender.sendMsg("topic","topicKey.c",contextc);
        String contextz = "topic.z";
        rabbitSender.sendMsg("topic","topicKey.c.z",contextz);
    }

消費者

    /**
     * topic交換機
     * */
    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "topic",type = "topic"), key = "topicKey.#"
            , value = @Queue(value = "topicQueue", durable = "true", exclusive = "false", autoDelete = "false")))
    public void topicQueue(String str, Channel channel, Message message) throws Exception {
        try {
            System.out.println("topicQueue接收到了:"+str);
        }catch (Exception e){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }

輸出

topicQueue接收到了:topic.a

4.5 Fanout 交換機

廣播模式,一個消息可以給多個消費者消費

生產者

    /**Fanout 交換機*/
    @Test
    public void senderFanout() throws Exception {
        String contexta = "Fanout";
        rabbitSender.sendMsg("fanout","fanoutKey1",contexta);
        //寫不寫KEY都無所謂
    }

消費者

    /**
     * Fanout 交換機
     * */
    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "fanout",type = "fanout"), key = "fanoutKey1"
            , value = @Queue(value = "fanoutQueue1", durable = "true", exclusive = "false", autoDelete = "false")))
    public void fanoutQueue1(String str, Channel channel, Message message) throws Exception {
        try {
            System.out.println("fanoutQueue1接收到了:"+str);
        }catch (Exception e){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }

    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "fanout",type = "fanout"), key = "fanoutKey2"
            , value = @Queue(value = "fanoutQueue2", durable = "true", exclusive = "false", autoDelete = "false")))
    public void fanoutQueue2(String str, Channel channel, Message message) throws Exception {
        try {
            System.out.println("fanoutQueue2接收到了:"+str);
        }catch (Exception e){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }

輸出

fanoutQueue2接收到了:Fanout
fanoutQueue1接收到了:Fanout

4.6 Headers 交換機

 

代碼:https://github.com/majian1994/rabbitMQ_Study


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM