RabbitMQ的基本概念和七種隊列模式


I. RabbitMQ的基本概念

1. 生產者/消費者

  • 生產者(Producer)
    消息的創建者。
    負責創建和推送數據到消息服務器。

  • 消費者(Consumer)
    消息的接收方。
    負責接收消息和處理數據。

 

2. 消息隊列(Queue)

消息隊列是RabbitMQ的內部對象,用於存儲生產者的消息直到發送給消費者,它是消費者接收消息的地方。

消息隊列的重要屬性:

  • 持久性
    broker重啟前都有效。
  • 自動刪除
    在所有消費者停止使用之后自動刪除。
  • 惰性
    沒有主動聲明隊列,調用會導致異常。
  • 排他性
    -一旦啟用,聲明它的消費者才能使用。

 

3. 交換機(Exchange)

交換機用於接收,分配消息。

1. 生產者要先指定一個routing key,然后將消息發送到交換機。
2. routing key需要與exchange type和binding key聯合使用才能最終生效。
3. 交換機將消息路由到一個或多個隊列中,或丟棄。

交換機包含4中類型: direct, topic, fanout, headers。

  • direct(直連交換機)
具有路由功能的交換機,綁定到此交換機的時候需要指定一個routing_key,交換機發送消息的時候需要routing_key,會將消息發送道對應的隊列。

先匹配,再投送

Direct Exchange是RabbitMQ的默認交換機模式。
這是最簡單的模式。
它根據routing key全文匹配去尋找隊列。

在綁定隊列時會設定一個routing key(通常是隊列的名字)。
只有在消息的routing key與隊列匹配時,消息才會被交換機投送到綁定的隊列中。

  • topic(主題交換機)

    在直連交換機基礎上增加模式匹配,也就是對routing_key進行模式匹配,*代表一個單詞,#代表多個單詞。

    按規則轉發消息

主題交換機(Topic Exchange)主要根據通配符轉發消息。
這種方式最靈活。
交換機和隊列的綁定會定義一種路由模式。
路由鍵(routing key)和路由模式匹配后,交換機才能轉發消息。

在這種交換機模式下,路由鍵(routing key)必須是一串字符,用"."隔開。
路由模式必須包含一個星號"*", 主要用於匹配路由鍵指定位置的一個單詞。
* 匹配一個單詞。
# 匹配0個或多個單詞。
eg:
binding key:                 *.com.#
匹配的routing key:     cn.com,  us.com.aa
不匹配:                         com.bb

  • headers(首部交換機)
    忽略routing_key,使用Headers信息(一個Hash的數據結構)進行匹配,優勢在於可以有更多更靈活的匹配規則。
    根據應用程序消息的特定屬性進行匹配

  • fanout(扇形交換機)
    廣播消息到所有隊列,沒有任何處理,速度最快。
    消息廣播的模式

這種方式將消息廣播到所有綁定到它的隊列中。
不考慮routing key的值,即使配置了路由鍵,依然會被忽略。


 

4. 消息確認

消息確認是指當一個消息從隊列中投遞給消費者(consumer)后,消費者會通知一下消息代理(broker)

消息確認可以自動,也可以由處理消息的開發者手動執行。
當啟用消息確認后,消息代理需要收到來自消費者的確認回執后,才完全將消息從隊列中刪除。

 
 

II. 七種隊列模式

1. 簡單模式(Hello World)

做最簡單的事情,一個生產者對應一個消費者,RabbitMQ相當於一個消息代理,負責將A的消息轉發給B。

單生產者,單消費者,單隊列。


應用場景:

將發送的電子郵件放到消息隊列,然后郵件服務在隊列中獲取郵件並發送給收件人。

 

2. 工作隊列模式(Work queues)

在多個消費者之間分配任務(競爭的消費者模式),一個生產者對應多個消費者。

適用於資源密集型任務, 單個消費者處理不過來,需要多個消費者進行處理的場景。

單生產者,多消費者,單隊列。


應用場景:

一個訂單的處理需要10s,有多個訂單可以同時放到消息隊列,

然后讓多個消費者同時並行處理,而不是單個消費者的串行消費。

 

3. 發布訂閱模式(Publish/Subscribe)

一次向許多消費者發送消息,將消息將廣播到所有的消費者。

單生產者,多消費者,多隊列。

應用場景:

更新商品庫存后需要通知多個緩存和多個數據庫。

結構如下:

  • 一個fanout類型交換機扇出兩個消息隊列,分別為緩存消息隊列、數據庫消息隊列
  • 一個緩存消息隊列對應着多個緩存消費者
  • 一個數據庫消息隊列對應着多個數據庫消費者

 

4. 路由模式(Routing)

路由模式(Routing)

根據Routing Key有選擇地接收消息。

多消費者,選擇性多隊列,每個隊列通過routing key全文匹配。

發送消息到交換機並且要指定路由鍵(Routing key) 。
消費者將隊列綁定到交換機時需要指定路由key,僅消費指定路由key的消息。

應用場景:

在商品庫存中增加了1台iphone12,iphone12促銷活動消費者指定routing key為iphone12 promote,
只有此促銷活動會接收到消息,其它促銷活動不關心也不會消費此routing key的消息。

 

5. 主題模式(Topics)

主題模式(Topics)

主題交換機方式接收消息,將routing key和模式進行匹配。

多消費者,選擇性多隊列,每個隊列通過模式匹配。

隊列需要綁定在一個模式上。
#匹配一個詞或多個詞,*只匹配一個詞。

應用場景:

iphone促銷活動可以接收主題為多種iPhone的消息,如iphone12、iphone13等。

 

6. 遠程過程調用(RPC)

遠程過程調用(RPC)

在遠程計算機上運行功能並等待結果。

應用場景:

需要等待接口返回數據,如訂單支付。

 

7. 發布者確認(Publisher Confirms)

與發布者進行可靠的發布確認,發布者確認是RabbitMQ擴展,可以實現可靠的發布。

在通道上啟用發布者確認后,RabbitMQ將異步確認發送者發布的消息,這意味着它們已在服務器端處理。

應用場景:

對於消息可靠性要求較高,比如錢包扣款。

 
 

III. 實戰代碼

1. 准備工作

首先,我們需要加入rabbitmq的amqp client依賴

<!-- amqp client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

其次,我們需要編寫一個連接mq和通道的工具類ConnectionUtils,如下:

package com.mcp.lab.mq.rabbit.common.util;

import com.mcp.lab.mq.rabbit.common.domain.ConnInfo;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtils {
    /**
     * 連接器(默認)
     *
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnInfo connInfo = new ConnInfo.Builder()
                .setHost("Your RabbitMQ Broker Host")
                .setPort(5672)
                .setVirtualHost("Your Virtual Host(自定義)")
                .setUsername("your rabbit admin user")
                .setPassword("your rabbit admin password")
                .build();

        return getConnection(connInfo);
    }
}

 

2. 簡單模式實例

  • 生產者(Producer)代碼
    ConsoleSender
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class ConsoleSender {
    private static final String QUIT = "Q";
    public static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        // 創建隊列聲明
        // queue:隊列名
        // durable:是否持久化
        // exclusive:是否排外  即只允許該channel訪問該隊列   一般等於true的話用於一個隊列只能有一個消費者來消費的場景
        // autoDelete:是否自動刪除  消費完刪除
        // arguments:其他屬性
        AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 輸入發送的消息
        Scanner input = new Scanner(System.in);
        String msg = "";
        while (true) {
            System.out.print("請輸入發送的消息: ");
            msg = input.nextLine();

            if (QUIT.equals(msg.toUpperCase())) {
                break;
            }

            // exchange,隊列,參數,消息字節體
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("Producer發送的消息: " + msg);
        }

        // 清理工作
        channel.close();
        connection.close();
    }
}

  • 消費者(Consumer)代碼
    SimpleReceiver
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class SimpleReceiver {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [Consumer] Received from queue - '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}

運行結果:

1. 首先運行結果生產者(ConsoleSender)
我們在控制台的輸入如下:
請輸入發送的消息: 789
Producer發送的消息: 789
請輸入發送的消息: 111
Producer發送的消息: 111
請輸入發送的消息: q

2. 其次運行消費者(SimpleReceiver)
顯示如下:
[Consumer] Received from queue - 'simple_queue':'789'
[Consumer] Received from queue - 'simple_queue':'111'

 

3. 工作隊列模式實例

  • 生產者(Producer)代碼
    WorkQueueSender
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkQueueSender {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 100; i++) {
            String message = "work mode message" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[Producer] Sent '" + message + "'");
            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}

  • 消費者代碼(模擬2個消費者)
    WorkQueueReceiver1
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class WorkQueueReceiver1 {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻服務器只會發送一條消息給消費者
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [Work Consumer 1] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });

    }
}

WorkQueueReceiver2

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class WorkQueueReceiver2 {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻服務器只會發送一條消息給消費者
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [Work Consumer 2] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });

    }
}

運行結果:

1. Producer運行:
[Producer] Sent 'work mode message0'
...
[Producer] Sent 'work mode message99'
---------------------------------------------------

2. Consumer1運行:
 [Work Consumer 1] Received 'work_queue':'work mode message0'
 [Work Consumer 1] Received 'work_queue':'work mode message2'
 ...
 [Work Consumer 1] Received 'work_queue':'work mode message98'
---------------------------------------------------

3. Consumer2運行:
 [Work Consumer 2] Received 'work_queue':'work mode message1'
 [Work Consumer 2] Received 'work_queue':'work mode message3'
 ...
 [Work Consumer 2] Received 'work_queue':'work mode message99'

注: 從上面結果可以看出,2個消費者以搶占的方式消費消息且不重復。

 

4. 發布訂閱模式實例

  • 生產者(Producer)代碼
    ConsolePublishSender
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class ConsolePublishSender {
    private static final String QUIT = "Q";
    private static final String EXCHANGE_NAME = "publish_logs";

    public static void main(String[] argv) throws Exception {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 輸入發送的消息
        Scanner input = new Scanner(System.in);
        String msg = "";
        while (true) {
            System.out.print("請輸入發送的消息: ");
            msg = input.nextLine();

            if (QUIT.equals(msg.toUpperCase())) {
                break;
            }

            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [Publisher] Sent '" + msg + "'");
        }

        channel.close();
        connection.close();
    }
}

  • 消費者代碼(模擬2個消費者)
    SubscribeReceive1
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class SubscribeReceive1 {
    private static final String EXCHANGE_NAME = "publish_logs";

    public static void main(String[] argv) throws Exception {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 訂閱消息的回調函數
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [Subscriber 1] Received '" + message + "'");
        };

        // 消費者,有消息時觸發訂閱回調函數
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

SubscribeReceive2

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class SubscribeReceive2 {
    private static final String EXCHANGE_NAME = "publish_logs";

    public static void main(String[] argv) throws Exception {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 訂閱消息的回調函數
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [Subscriber 2] Received '" + message + "'");
        };

        // 消費者,有消息時觸發訂閱回調函數
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

運行結果:

1. Producer運行:
請輸入發送的消息: topic 1
 [Publisher] Sent 'topic 1'
請輸入發送的消息: topic 2
 [Publisher] Sent 'topic 2'
請輸入發送的消息: haha
 [Publisher] Sent 'haha'
請輸入發送的消息: q
---------------------------------------------------
2. Consumer1運行:
[*] Waiting for messages. To exit press CTRL+C
 [Subscriber 1] Received '發布的主題信息'
 [Subscriber 1] Received 'topic 1'
 [Subscriber 1] Received 'topic 2'
 [Subscriber 1] Received 'haha'
---------------------------------------------------
3. Consumer2運行:
[*] Waiting for messages. To exit press CTRL+C
 [Subscriber 2] Received '發布的主題信息'
 [Subscriber 2] Received 'topic 1'
 [Subscriber 2] Received 'topic 2'
 [Subscriber 2] Received 'haha'

注: 多個接收者接收到一模一樣的消息。該模式用於多個消費者訂閱同一個主題。

 

5. 路由模式實例

  • 生產者(Producer)代碼

ConsoleRouteSender

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class ConsoleRouteSender {
    private static final String QUIT = "Q";
    private final static String EXCHANGE_NAME = "exchange_direct";
    private final static String EXCHANGE_TYPE = "direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        // 交換機聲明
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

        // 輸入發送的消息
        Scanner input = new Scanner(System.in);
        String msg = "";
        while (true) {
            System.out.print("請輸入發送的消息: ");
            msg = input.nextLine();

            if (QUIT.equals(msg.toUpperCase())) {
                break;
            }

            // 只有routingKey相同的才會消費
            channel.basicPublish(EXCHANGE_NAME, "key2", null, msg.getBytes());
            //channel.basicPublish(EXCHANGE_NAME, "key", null, msg.getBytes());
            System.out.println("[Route Producer] Sent '" + msg + "'");
        }


        channel.close();
        connection.close();
    }
}

  • 消費者代碼(模擬2個消費者)

RouteReceiver1

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class RouteReceiver1 {
    private final static String QUEUE_NAME = "queue_routing";
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 指定路由的key,接收key和key2
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [Route Consumer 1] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}

RouteReceiver2

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class RouteReceiver2 {
    private final static String QUEUE_NAME = "queue_routing2";
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 僅接收key2
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [Route Consumer 2] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}

運行結果:

1. Producer運行:
請輸入發送的消息: abc
[Route Producer] Sent 'abc'
請輸入發送的消息: test
[Route Producer] Sent 'test'
請輸入發送的消息: q
---------------------------------------------------

2. Consumer1運行:
 [Route Consumer 1] Received 'key2':'abc'
[Route Consumer 1] Received 'key2':'test'
---------------------------------------------------

3. Consumer2運行:
[Route Consumer 2] Received 'key2':'abc'
[Route Consumer 2] Received 'key2':'test'

如果把sender中的key2改成key,運行結果如下:

請輸入發送的消息: 123
[Route Producer] Sent '123'
請輸入發送的消息: 456
[Route Producer] Sent '456'
請輸入發送的消息: 789
[Route Producer] Sent '789'
請輸入發送的消息: q

[Route Consumer 1] Received 'key':'123'
[Route Consumer 1] Received 'key':'456'
[Route Consumer 1] Received 'key':'789'

consumer2沒有數據,因為route key沒有匹配。

 

6. 主題模式實例

  • 生產者(Producer)代碼

SimpleTopicSender

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class SimpleTopicSender {
    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String EXCHANGE_TYPE = "topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

        String message = "topics model message with key.1";
        channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
        System.out.println("[Producer] Sent '" + message + "'");

        String message2 = "topics model message with key.1.2";
        channel.basicPublish(EXCHANGE_NAME, "key.1.2", null, message2.getBytes());
        System.out.println("[Producer] Sent '" + message2 + "'");

        channel.close();
        connection.close();
    }
}

  • 消費者代碼(模擬2個消費者)

TopicReceiver1

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class TopicReceiver1 {
    private final static String QUEUE_NAME = "queue_topic";
    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 可以接收key.1
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [Consumer 1] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}

TopicReceiver2

import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class TopicReceiver2 {
    private final static String QUEUE_NAME = "queue_topic2";
    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String EXCHANGE_TYPE = "topic";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        // 獲取一個連接
        Connection connection = ConnectionUtils.getConnection();

        // 從連接獲取一個通道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // *號代表單個單詞,可以接收key.1
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
        // #號代表多個單詞,可以接收key.1.2
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.#");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [Consumer 2] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}

運行結果:

1. Producer運行:
[Producer] Sent 'topics model message with key.1'
[Producer] Sent 'topics model message with key.1.2'
---------------------------------------------------

2. Consumer1運行:
 [Consumer 1] Received 'key.1':'topics model message with key.1'
---------------------------------------------------

3. Consumer2運行:
[Consumer 2] Received 'key.1':'topics model message with key.1'
 [Consumer 2] Received 'key.1.2':'topics model message with key.1.2'


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM