RabbitMQ使用詳解


RabbitMQ是基於AMQP的一款消息管理系統。AMQP(Advanced Message Queuing Protocol),是一個提供消息服務的應用層標准高級消息隊列協議,其中RabbitMQ就是基於這種協議的一種實現。

常見mq:

  • ActiveMQ:基於JMS
  • RabbitMQ:基於AMQP協議,erlang語言開發,穩定性好
  • RocketMQ:基於JMS,阿里巴巴產品,目前交由Apache基金會
  • Kafka:分布式消息系統,高吞吐量

1 消息模型

RabbitMq有5種常用的消息模型

1.1 基本消息模型

這是最簡單的消息模型,如下圖:

生產者將消息發送到隊列,消費者從隊列中獲取消息,隊列是存儲消息的緩沖區。

再演示代碼之前,我們先創建一個工程rabbitmq-demo,並編寫一個工具類,用於提供與mq服務創建連接

public class ConnectionUtil {
    /**
     * 建立與RabbitMQ的連接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務地址
        factory.setHost("192.168.18.130");
        //端口
        factory.setPort(5672);
        //設置賬號信息,用戶名、密碼、vhost
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 通過工程獲取連接
        Connection connection = factory.newConnection();
        return connection;
    }
}

生產者發送消息

接下來是生產者發送消息,其過程包括:1.與mq服務建立連接,2.建立通道,3.聲明隊列(有相同隊列則不創建,沒有則創建),4.發送消息,代碼如下:

public class Send {
    private static final String QUEUE_NAME = "basic_queue";
    public static void main(String[] args) throws Exception {
        //消息發送端與mq服務創建連接
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello world";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("生產者已發送:" + message);
        channel.close();
        connection.close();
    }
}

消費者接受消息

消費者在接收消息的過程需要經歷如下幾個步驟: 1.與mqfuwu建立連接,2.建立通道,3.聲明隊列,4,接收消息,代碼如下:

public class Consumer1 {
    private static final String QUEUE_NAME = "basic_queue";
    public static void main(String[] args) throws Exception {
        //消息消費者與mq服務建立連接
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息體
                String msg = new String(body);
                System.out.println("消費者1接收到消息:" + msg);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消息的接收與消費使用都需要在一個匿名內部類DefaultConsumer中完成

注意:隊列需要提前聲明,如果未聲明就使用隊列,則會報錯。如果不清楚生產者和消費者誰先聲明,為了保證不報錯,生產者和消費者都聲明隊列,隊列的創建會保證冪等性,也就是說生產者和消費者都聲明同一個隊列,則只會創建一個隊列

1.2 Work Queues工作隊列模型

在基本消息模型中,一個生產者對應一個消費者,而實際生產過程中,往往消息生產會發送很多條消息,如果消費者只有一個的話效率就會很低,因此rabbitmq有另外一種消息模型,這種模型下,一個生產發送消息到隊列,允許有多個消費者接收消息,但是一條消息只會被一個消費者獲取。

生產者發送消息

與基本消息模型基本一致,這里測試循環發布20條消息:

public class Send {
    private static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 循環發布任務
        for (int i = 1; i <= 20; i++) {
            // 消息內容
            String message = "task .. " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("生產者發送消息:" + message);
            Thread.sleep(500);
        }
        channel.close();
        connection.close();
    }
}

消費者1

public class Consumer1 {
    private static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("消費者1接收到消息:" + msg);
                try {
                    Thread.sleep(50);//模擬消費耗時
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消費者2

public class Consumer2 {
    private static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("消費者2接收到消息:" + msg);
                try {
                    Thread.sleep(50);//模擬消費耗時
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

此時有兩個消費者監聽同一個隊列,當兩個消費者都工作時,生成者發送消息,就會按照負載均衡算法分配給不同消費者,如下圖:

1.3 訂閱模型

在之前的模型中,一條消息只能被一個消費者獲取,而在訂閱模式中,可以實現一條消息被多個消費者獲取。在這種模型下,消息傳遞過程中比之前多了一個exchange交換機,生產者不是直接發送消息到隊列,而是先發送給交換機,經由交換機分配到不同的隊列,而每個消費者都有自己的隊列:

解讀:

1、1個生產者,多個消費者

2、每一個消費者都有自己的一個隊列

3、生產者沒有將消息直接發送到隊列,而是發送到了交換機

4、每個隊列都要綁定到交換機

5、生產者發送的消息,經過交換機到達隊列,實現一個消息被多個消費者獲取的目的

X(exchange)交換機的類型有以下幾種:

Fanout:廣播,交換機將消息發送到所有與之綁定的隊列中去

Direct:定向,交換機按照指定的Routing Key發送到匹配的隊列中去

Topics:通配符,與Direct大致相同,不同在於Routing Key可以根據通配符進行匹配

注意:在發布訂閱模型中,生產者只負責發消息到交換機,至於消息該怎么發,以及發送到哪個隊列,生產者都不負責。一般由消費者創建隊列,並且綁定到交換機

訂閱模型之Fanout

在廣播模式下,消息發送的流程如下:

  1. 可以有多個消費者,每個消費者都有自己的隊列
  2. 每個隊列都要與exchange綁定
  3. 生產者發送消息到exchange
  4. exchange將消息把消息發送到所有綁定的隊列中去
  5. 消費者從各自的隊列中獲取消息
生產者發送消息
public class Send {
    private static final String EXCHANGE_NAME = "fanout_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明exchange,指定類型為fanout
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        String message = "hello world";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println("生產者發送消息:" + message);
        channel.close();
        connection.close();
    }
}
消費者
public class Consumer1 {
    private static final String QUEUE_NAME = "fanout_queue_1";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //消費者聲明自己的隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 聲明exchange,指定類型為direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //消費者將隊列與交換機進行綁定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                String msg = new String(body);
                System.out.println("消費者1獲取到消息:" + msg);
            }
        });
    }
}

其他消費者只需修改QUEUE_NAME即可

注意:exchange與隊列一樣都需要提前聲明,如果未聲明就使用交換機,則會報錯。如果不清楚生產者和消費者誰先聲明,為了保證不報錯,生產者和消費者都聲明交換機,同樣的,交換機的創建也會保證冪等性。

訂閱模型之Direct

在fanout模型中,生產者發布消息,所有消費者都可以獲取所有消息。在路由模式(Direct)中,可以實現不同的消息被不同的隊列消費,在Direct模式下,交換機不再將消息發送給所有綁定的隊列,而是根據Routing Key將消息發送到指定的隊列,隊列在與交換機綁定時會設定一個Routing Key,而生產者發送的消息時也需要攜帶一個Routing Key。

如圖所示,消費者C1的隊列與交換機綁定時設置的Routing Key是“error”, 而C2的隊列與交換機綁定時設置的Routing Key包括三個:“info”,“error”,“warning”,假如生產者發送一條消息到交換機,並設置消息的Routing Key為“info”,那么交換機只會將消息發送給C2的隊列。

生產者發送消息
public class Send {
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明exchange,指定類型為direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String message = "新增一個訂單";
        //生產者發送消息時,設置消息的Routing Key:"insert"
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
        System.out.println("生產者發送消息:" + message);
        channel.close();
        connection.close();
    }
}
消費者1
public class Consumer1 {
    private static final String QUEUE_NAME = "direct_queue_1";
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //消費者聲明自己的隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //消費者將隊列與交換機進行綁定,並且設置Routing Key:"insert"
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                String msg = new String(body);
                System.out.println("消費者1獲取到消息:" + msg);
            }
        });
    }
}

其他消費者需要修改隊列名QUEUE_NAME和Routing Key,上述生成者發送的消息,消費者1是可以獲取到的

發布訂閱之Topics

Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符

Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

通配符規則:

     #:匹配一個或多個詞

     *:匹配不多不少恰好1個詞

舉例:

     audit.#:能夠匹配audit.irs.corporate 或者 audit.irs

     audit.*:只能匹配audit.irs

Topics生產者代碼與Direct大致相同,只不過子聲明交換機時,將類型設為BuiltinExchangeType.TOPIC(topic),

消費者代碼也與Direct大致相同,也是在聲明交換機時設置類型為topic,代碼不再演示

Spring AMQP

Spring AMQP是對AMQP的一種封裝,目的是能夠讓我們更簡便的使用消息隊列,下面介紹一下Spring AMQP在Spring boot中的使用方法

依賴和配置

添加AMQP的啟動器:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在application.yml中添加RabbitMQ的地址:

spring:
  rabbitmq:
    host: 192.168.18.130
    username: admin
    password: admin

消費者

消費者需要定義一個類,類中定義監聽隊列的方法

@Component
public class Listener {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "spring.test.queue", durable = "false"),
                    exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),
                    key = "insert"
            )
    )
    public void listen(String msg){
        System.out.println("消費者接受到消息:" + msg);
    }
}

注解:

@Component:保證監聽類被spring掃描到

@RabbitListener:

@RabbitListener包含很多內容,在發布訂閱模式中,我們可以使用其中的“QueueBinding[] bindings”,其中QueueBinding底層如下:

其中Queue表示隊列,Exchange表示交換機,key表示Routing Key

@RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "spring.test.queue", durable = "false"),
                    exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),
                    key = "insert"
            )
    )

@Queue會創建隊列

@Exchange會創建交換機

@QueueBinding會綁定隊列和交換機

生產者發送消息

可以通過注解引入AmqpTemplate:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
    @Resource
    private AmqpTemplate template;

    @Test
    public void testSendMsg() throws InterruptedException {
        String message = "hello spring";
        template.convertAndSend("spring.test.exchange", "insert", message);
        System.out.println("生產者發送消息:" + message);
        Thread.sleep(10000);//等待10s,讓測試方法延遲結束,防止消費者未來得及獲取消息
    }
}

RabbitMQ如何防止消息丟失

1. 消息確認機制(ACK)

RabbitMQ有一個ACK機制,消費者在接收到消息后會向mq服務發送回執ACK,告知消息已被接收。這種ACK分為兩種情況:

  • 自動ACK:消息一旦被接收,消費者會自動發送ACK
  • 手動ACK:消息接收后,不會自動發送ACK,而是需要手動發送ACK

如果消費者沒有發送ACK,則消息會一直保留在隊列中,等待下次接收。但這里存在一個問題,就是一旦消費者發送了ACK,如果消費者后面宕機,則消息會丟失。因此自動ACK不能保證消費者在接收到消息之后能夠正常完成業務功能,因此需要在消息被充分利用之后,手動ACK確認

自動ACK,basicConsume方法中將autoAck參數設為true即可:

手動ack,在匿名內部類中,手動發送ACK:

當然,如果設置了手動ack,但又不手動發送ACK確認,消息會一直停留在隊列中,可能造成消息的重復獲取

2. 持久化

消息確認機制(ACK)能夠保證消費者不丟失消息,但假如消費者在獲取消息之前mq服務宕機,則消息也會丟失,因此要保證消息在服務端不丟失,則需要將消息進行持久化。隊列、交換機、消息都要持久化。

隊列持久化

exchange持久化

消息持久化

3. 生產者確認

生成者在發送消息過程中也可能出現錯誤或者網絡延遲燈故障,導致消息未成功發送到交換機或者隊列,或重復發送消息,為了解決這個問題,rabbitmq中有多個解決辦法:

事務:

用事務將消息發送代碼包圍起來:

Confirm模式:

如下所示,在發送代碼前執行channel.confirmSelect(),如果消息未正常發送,就會進入if代碼塊,可以進行重發也可以對失敗消息進行記錄

異步confirm方法:

顧名思義,就是生產者發送消息后不用等待服務端回饋發送狀態,可以繼續執行后面的代碼,對於失敗消息重發進行異步處理:

Spring AMQP中添加配置:

生產者確認機制,確保消息正確發送,如果發送失敗會有錯誤回執,從而觸發重試

spring:
  rabbitmq:
    publisher-confirms: true


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM