Java 整合 RabbitMQ


一、Java項目創建並整合RabbitMQ

1、創建Maven項目

2、添加依賴

官方地址: https://www.rabbitmq.com/java-client.html

依賴地址: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.10.0

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.10.0</version>
    </dependency>
</dependencies>

二、RabbitMQ簡單隊列實戰

⽂檔:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

1、消息生產者

package com.xdclass.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        try (
                //JDK7語法 或⾃動關閉 connnection 和channel
                //創建連接
                Connection connection = factory.newConnection();
                //創建信道
                Channel channel = connection.createChannel()) {
            /**
             * 隊列名稱
             * 持久化配置: mq重啟后還在
             * 是否獨占:只能有⼀個消費者監聽隊列;當connection關閉是否刪除隊列,⼀般是false,發布訂閱是獨占
             * ⾃動刪除: 當沒有消費者的時候,⾃動刪除掉,⼀般是false
             * 其他參數
             *
             * 隊列不存在則會⾃動創建,如果存在則不會覆蓋,所以此時的時候需要注意屬性
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!222222";
            /**
             * 參數說明:
             * 交換機名稱:不寫則是默認的交換機,那路由鍵需要和隊列名稱⼀樣才可以被路由,
             * 路由鍵名稱
             * 配置信息
             * 發送的消息數據:字節數組
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

2、消息消費者

會⼀直監聽隊列

package com.xdclass.simple;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //消費者⼀般不增加⾃動關閉
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages.To exit press CTRL + C");
        //回調方法,下⾯兩種都⾏
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // consumerTag 是固定的 可以做此會話的名字,deliveryTag 每次接收消息 +1
                System.out.println("consumerTag 消息標識 = " + consumerTag);
                //可以獲取交換機,路由鍵等System.out.println("envelope元數據 = "+envelope);
                System.out.println("properties配置信息 = " + properties);
                System.out.println("body=" + new String(body, "utf-8"));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);


        //也可以用下面這種
        /*
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //⾃動確認消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        */
    }
}

三、RabbitMQ工作隊列—輪詢策略

1、工作隊列

  • 消息生產能力大於消費能力,增加多幾個消費節點
  • 和簡單隊列類似,增加多個幾個消費節點,處於競爭關
  • 默認策略: round robin 輪詢

2、生產者代碼

package com.xdclass.work.rr;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {
    private final static String QUEUE_NAME = "work_mq_rr";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        try (
                //JDK7語法 或⾃動關閉 connnection 和channel
                //創建連接
                Connection connection = factory.newConnection();
                //創建信道
                Channel channel = connection.createChannel()) {

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            for (int i = 0; i < 10; i++) {
                String message = "Hello World! i==" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

3、消費者代碼,延遲1s消費

public class Recv1 {
    private final static String QUEUE_NAME = "work_mq_rr";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*]Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            //模擬消費緩慢
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] Received '" + message + "'");
            //⼿工確認消息消費,不是多條確認
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //關閉⾃動確認消息
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}

4、消費者代碼,延遲3s消費

public class Recv2 {
    private final static String QUEUE_NAME = "work_mq_rr";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*]Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            //模擬消費緩慢
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] Received '" + message + "'");
            //⼿工確認消息消費,不是多條確認
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //關閉⾃動確認消息
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}

5、輪詢策略驗證

  • 先啟動兩個消費者,再啟動生產者,消息會平分到兩個消費端
  • 缺點:存在部分節點消費過快,部分節點消費慢,導致不能合理處理消息

四、RabbitMQ工作隊列—公平策略

1、公平策略驗證

  • 修改消費者策略
  • 解決消費者能力消費不足的問題,降低消費時間問題

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM