高性能RabbitMQ


1,什么是RabbitMq

RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而集群和故障轉移是構建在開放電信平台框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。

百度百科 ,RabbitMQ 官網  AMQP 協議

2,幾種MQ對比

RabbitMQ 是用Erlang 語言進行開發的,一款設計之初就是抗高並發的語言

3,RabbitMQ 安裝

1.下載並安裝erlang,下載地址:http://www.erlang.org/download
2.配置erlang環境變量信息
  新增環境變量ERLANG_HOME=erlang的安裝地址
  將%ERLANG_HOME%\bin加入到path中
3.下載並安裝RabbitMQ,下載地址:http://www.rabbitmq.com/download.html


注意: RabbitMQ 它依賴於Erlang,需要先安裝Erlang。

  RabbitMQ 管理平台地址 http://127.0.0.1:15672

  默認賬號:guest/guest 用戶可以自己創建新的賬號

 

 https://blog.csdn.net/qq_35098526/article/details/80009424 安裝之后啟動不了,可以在sbin 里面:

 輸入:rabbitmq-plugins enable rabbitmq_management  (先定位到rabbitmq安裝目錄)命令,出現plugins安裝成功的提示。

 

過程:

Microsoft Windows [Version 10.0.17134.950]
C:\Program Files\RabbitMQ Server>
C:\Program Files\RabbitMQ Server>cd rabbitmq_server-3.7.8
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.8>cd  sbin
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.8\sbin>rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@DESKTOP-2MDM24J:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-2MDM24J...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.

 

 

 

4,RabbitMQ 五種隊列形式

    1.點對點隊列,也可以叫做簡單隊列

      生產者投遞的消息,每次只准一個消費者來消費,如果消費者集群的話,消息會被均攤。

      例如:50 個消息,2個消費者,消費者1會消費奇數,消費者2會消費偶數,兩個消費者不受影響,各自消費各自的消息

      producer:

public class Producer {

    private static final String QUEUE_NAME = "rabbitmq_simple_queue_one";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = MQConnectionUtils.getConnection();
        // 創建通道
        Channel channel = connection.createChannel();
        // 3.創建隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 50; i++) {
            String msg = "Hello, World :" + i;
            System.out.println(msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        channel.close();
        connection.close();
    }

}

consumer1:

public class Consumer {

    private static final String QUEUE_NAME = "rabbitmq_simple_queue_one";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("consumer1");
        Connection connection = MQConnectionUtils.getConnection();
        // 創建通道
        Channel channel = connection.createChannel();
        // 3.創建隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消費者獲取消息:" + msgString);
            }
        };
        // 3.監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer); //true 代表采用自動簽收的應答模式
    }

}

consumer2:

public class Consumer2 {

    private static final String QUEUE_NAME = "rabbitmq_simple_queue_one";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("consumer2");
        Connection connection = MQConnectionUtils.getConnection();
        // 創建通道
        Channel channel = connection.createChannel();
        // 3.創建隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消費者獲取消息:" + msgString);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        // 3.監聽隊列
       channel.basicConsume(QUEUE_NAME, true, consumer); //true 代表采用自動簽收的應答模式
    }

}

  2,工作隊列模式,也可以叫做公平隊列模式

       點對點簡單隊列弊端:消費者集群的話,消息會被均攤處理,但是不同的消費者處理消息的能力是不同的,consumer1 每秒處理1個消息,consumer2 美妙處理3個消息,如果消息均攤,consumer1的效率則被浪費。

       公平消費模式:誰處理的快,並且采用手動簽收,告知RabbitMQ之后,RabbitMQ 再給分發消息。這樣,誰處理的快,誰就會處理的多。

       

producer:

public class Producer {
    // 公平隊列名稱
    private static final String QUEUE_NAME = "rabbitmq_fair_queue_one";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = MQConnectionUtils.getConnection();
        // 創建通道
        Channel channel = connection.createChannel();
        // 創建隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 保證消費者只能取一個/每次
        channel.basicQos(1); //每次只給消費者1條消息,等消費完成,手動ack 應答之后,再給下一條 for (int i = 0; i < 50; i++) {
            String msg = "Hello, World: " + i;
            System.out.println(msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        channel.close();
        connection.close();

    }

}

consumer1:

public class Consumer {

    private static final String QUEUE_NAME = "rabbitmq_fair_queue_one";

    public static void main(String[] args) throws IOException, TimeoutException {
        
        System.out.println("consumer01");
        Connection connection = MQConnectionUtils.getConnection();
        // 創建通道
        final Channel channel = connection.createChannel();
        // 3.創建隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消費者獲取消息:" + msgString);
                try {
  Thread.sleep(200);                 } catch (Exception e) {
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 3.監聽隊列
        channel.basicConsume(QUEUE_NAME, false, consumer); //false 代表使用手動消息應答,需要使用channel.basicAck(envelope.getDeliveryTag(),false) 告知消息中間件
    }

}

consumer2:

public class Consumer2 {
    
    private static final String QUEUE_NAME = "rabbitmq_fair_queue_one";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("consumer02");
        Connection connection = MQConnectionUtils.getConnection();
        // 創建通道
        final Channel channel = connection.createChannel();
        // 3.創建隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消費者獲取消息:" + msgString);
                try {
                     Thread.sleep(1000); //讓這個消費者處理消息的能力更差一點
                } catch (Exception e) {
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 3.監聽隊列
    channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}

   3,發布訂閱模式,采用fanout 扇形交換機,

        高級隊列模式中,有交換機,生產者將消息發給交換機,在根據交換機的類型,發給定的的隊列,然后發給指定的消費者消費

  producer:

public class Producer {

    // 定義交換機名稱
    private static final  String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one";
    // 定義交換機類型
   private static final String EXCHANGE_TYPE = "fanout"; public static void main(String[] args) throws IOException, TimeoutException {
        // 和rabbitmq 建立連接
        Connection connection = MQConnectionUtils.getConnection();
        // 創建channel
        Channel channel = connection.createChannel();
        // 創建交換機
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
        
        String message = "pub/sub";
        
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        
        channel.close();
        
        connection.close();

    }

}

郵件消費者:

ublic class EmailConsumer {

    private static final String QUEUE_NAME = "rabbitmq_pubsub_email_queue_one";
    private static final String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one";

    public static void main(String[] args) throws IOException, TimeoutException {
        
        System.out.println("郵件消費者。。。");

        Connection connection = MQConnectionUtils.getConnection();

        Channel channel = connection.createChannel();
        // 定義隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 將隊列和交換機進行綁定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者獲取生產者消息 :" + msg);
            }
        };
        // 消費者監聽隊列消息  true 代表自動簽收
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

短信消費者:

// 信息消費者
public class TextConsumer {
    private static final String QUEUE_NAME = "rabbitmq_pubsub_text_queue_one";
    private static final String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one";

    public static void main(String[] args) throws IOException, TimeoutException {
        
        System.out.println("短信消費者。。。");
        Connection connection = MQConnectionUtils.getConnection();

        Channel channel = connection.createChannel();
        // 定義隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 將隊列和交換機進行綁定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者獲取生產者消息 :" + msg);
            }
        };
        // 消費者監聽隊列消息  true 代表自動簽收
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

4,路由模式:采用direct 交換機

producer:

public class Producer {
    // 定義交換機名稱
    private static final  String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one";
    // 定義交換機類型
  private static final String EXCHANGE_TYPE = "direct"; // 定義路由 private static final String ROUTINGKEY = "info"; public static void main(String[] args) throws IOException, TimeoutException {
        // 和rabbitmq 建立連接
        Connection connection = MQConnectionUtils.getConnection();
        // 創建channel
        Channel channel = connection.createChannel();
        // 創建交換機
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
        
        String message = "pub/sub";
        
        channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, message.getBytes());
        
        channel.close();
        
        connection.close();

    }

}

郵件消費者:

public class EmailConsumer {

    private static final String QUEUE_NAME = "rabbitmq_direct_email_queue_one";
    private static final String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one";
    private static final String ROUTINGKEY_INFO = "info"; public static void main(String[] args) throws IOException, TimeoutException {
        
        System.out.println("郵件消費者。。。");

        Connection connection = MQConnectionUtils.getConnection();

        Channel channel = connection.createChannel();
        // 定義隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 將隊列和交換機進行綁定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_INFO);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者獲取生產者消息 :" + msg);
            }
        };
        // 消費者監聽隊列消息  true 代表自動簽收
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

短信消費者:

public class TextConsumer {
    private static final String QUEUE_NAME = "rabbitmq_direct_text_queue_one";
    private static final String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one";
    // 設定路由
 private static final String ROUTINGKEY_INFO = "info"; private static final String ROUTINGKEY_WARN = "warn"; public static void main(String[] args) throws IOException, TimeoutException {

        System.out.println("短信消費者。。。");
        Connection connection = MQConnectionUtils.getConnection();

        Channel channel = connection.createChannel();
        // 定義隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 將隊列和交換機進行綁定 綁定路由
        //info 和  warn 路由的都能接收到
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_INFO);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_WARN);
        
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者獲取生產者消息 :" + msg);
            }
        };
        // 消費者監聽隊列消息 true 代表自動簽收
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

5,通配符模式,采用topic 交換機  # 代表任意 * 代表一個

producer:

public class Producer {

    // 定義交換機名稱
    private static final  String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one";
    // 定義交換機類型
    private static final String EXCHANGE_TYPE = "topic"; // 定義路由 private static final String ROUTINGKEY = "routingkey.info.error.warn"; public static void main(String[] args) throws IOException, TimeoutException {
        // 和rabbitmq 建立連接
        Connection connection = MQConnectionUtils.getConnection();
        // 創建channel
        Channel channel = connection.createChannel();
        // 創建交換機
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
        
        String message = "pub/sub";
        
        channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, message.getBytes());
        
        channel.close();
        
        connection.close();

    }

}

郵件消費者:

public class EmailConsumer {

    private static final String QUEUE_NAME = "rabbitmq_topic_email_queue_one";
    private static final String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one";
    private static final String ROUTINGKEY = "routingkey.#"; public static void main(String[] args) throws IOException, TimeoutException {
        
        System.out.println("郵件消費者。。。");

        Connection connection = MQConnectionUtils.getConnection();

        Channel channel = connection.createChannel();
        // 定義隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 將隊列和交換機進行綁定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者獲取生產者消息 :" + msg);
            }
        };
        // 消費者監聽隊列消息  true 代表自動簽收
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

短信消費者:

public class TextConsumer {
    private static final String QUEUE_NAME = "rabbitmq_topic_text_queue_one";
    private static final String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one";
    private static final String ROUTINGKEY = "routingkey.info.*"; // 設定路由

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消費者。。。");
        Connection connection = MQConnectionUtils.getConnection();

        Channel channel = connection.createChannel();
        // 定義隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 將隊列和交換機進行綁定 綁定路由
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者獲取生產者消息 :" + msg);
            }
        };
        // 消費者監聽隊列消息 true 代表自動簽收
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

 

     

    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM