RabbitMQ JAVA客戶端調用


1.安裝erlang

  下載地址:http://www.erlang.org/downloads

  設置ERLANG環境變量

2.安裝RabbitMQ

  下載地址: http://www.rabbitmq.com/download.html

 

輸入命令安裝各種管理插件:

D:\RabbitMQServer\rabbitmq_server-3.7.10\sbin>rabbitmq-plugins enable rabbitmq_management

重啟服務

net stop rabbitmq && net start rabbitmq

登錄

http://127.0.0.1:15672 默認用戶名密碼 guest  guest

常用命令(RabbitMQ命令在sbin目錄下D:\RabbitMQServer\rabbitmq_server-3.7.10\sbin,記得設置環境變量)

rabbitmqctl delete_vhost test_vhosts 刪除虛擬機test_vhosts 

3. RabbitMQ知識整理

來自(https://blog.csdn.net/dreamchasering/article/details/77653512)

什么是MQ?

      MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取隊列中的消息。

      RabbitMQ是MQ的一種。下面詳細介紹一下RabbitMQ的基本概念。

      1、隊列、生產者、消費者

      隊列是RabbitMQ的內部對象,用於存儲消息。生產者(下圖中的P)生產消息並投遞到隊列中,消費者(下圖中的C)可以從隊列中獲取消息並消費。

      

      多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息並處理。

      

2、Exchange、Binding

      剛才我們看到生產者將消息投遞到隊列中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),再通過Binding將Exchange與Queue關聯起來。

      

3、Exchange Type、Bingding key、routing key

      在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key。在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。

      生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,生產者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪里。

      RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。

      fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。

      direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。

      topic:將消息路由到binding key與routing key模式匹配的隊列中。

      附上一張RabbitMQ的結構圖:

      

    

最后來具體解析一下幾個問題:

1、可以自動創建隊列,也可以手動創建隊列,如果自動創建隊列,那么是誰負責創建隊列呢?是生產者?還是消費者? 

      如果隊列不存在,當然消費者不會收到任何的消息。但是如果隊列不存在,那么生產者發送的消息就會丟失。所以,為了數據不丟失,消費者和生產者都可以創建隊列。那么如果創建一個已經存在的隊列呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創建如果參數和第一次不一樣,那么該操作雖然成功,但是隊列屬性並不會改變。

      隊列對於負載均衡的處理是完美的。對於多個消費者來說,RabbitMQ使用輪詢的方式均衡的發送給不同的消費者。

2、RabbitMQ的消息確認機制

      默認情況下,如果消息已經被某個消費者正確的接收到了,那么該消息就會被從隊列中移除。當然也可以讓同一個消息發送到很多的消費者。

      如果一個隊列沒有消費者,那么,如果這個隊列有數據到達,那么這個數據會被緩存,不會被丟棄。當有消費者時,這個數據會被立即發送到這個消費者,這個數據被消費者正確收到時,這個數據就被從隊列中刪除。

     那么什么是正確收到呢?通過ack。每個消息都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數據沒有被ack,那么:

     RabbitMQ Server會把這個信息發送到下一個消費者。

     如果這個app有bug,忘記了ack,那么RabbitMQServer不會再發送數據給它,因為Server認為這個消費者處理能力有限。

    而且ack的機制可以起到限流的作用(Benefitto throttling):在消費者處理完成數據后發送ack,甚至在額外的延時后發送ack,將有效的均衡消費者的負載。

4.JAVA demo

引入RabbitMQ客戶端

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>

3.1 使用默認配置直接發送消息到隊列

生產者

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 
 * 默認發送,直接將消息發送到某個隊列,默認交換機type為direct
 * 
 * @author
 * @date 2019/01/10 11:17:10
 */
public class ProducterDirectDemo {
    public static void main(String[] args) throws IOException, TimeoutException {

        String queneName = "testQuene";
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("test_vhosts");
            // 創建與RabbitMQ服務器的TCP連接
            connection = factory.newConnection();
            // 創建一個頻道
            channel = connection.createChannel();
            // 聲明默認的隊列
            channel.queueDeclare(queneName, true, false, true, null);
            while (true) {
                channel.basicPublish("", queneName, null, UUID.randomUUID().toString().getBytes());
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

消費者

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 接收默認消息
 * 
 * @author
 * @date 2019/01/10 11:14:32
 */
public class ConsumerDirectDemo {
    public static void main(String[] args) {
        String queneName = "testQuene";
        Connection connection = null;
        Channel channel = null;
        try {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("test_vhosts");
            connection = factory.newConnection();
            channel = connection.createChannel();

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(envelope.getExchange() + "," + envelope.getRoutingKey() + "," + message);
                }
            };
            // channel綁定隊列,autoAck為true表示一旦收到消息則自動回復確認消息
            channel.basicConsume(queneName, true, consumer);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

3.2 設置交換器,隊列,路由發送消息

生產者

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 指定交換機,隊列,路由key方式
 * 
 * @author
 * @date 2019/01/10 11:19:38
 */
public class ProducterAllDemo {
    public static void main(String[] args) throws IOException, TimeoutException {

        String queneName = "firstQueue";
        String exchangeName = "amq.fanout";
        String routingKey = "test1";
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("test_vhosts");

            // 創建與RabbitMQ服務器的TCP連接
            connection = factory.newConnection();
            // 創建一個頻道
            channel = connection.createChannel();
            // 聲明交換機類型
            channel.exchangeDeclare("amq.fanout", "fanout", true);
            // 聲明默認的隊列 (也可不申明隊列,使用默認隊列)
            channel.queueDeclare(queneName, true, false, true, null);
            // String queue = channel.queueDeclare().getQueue();
            // 將隊列與交換機綁定
            channel.queueBind(queneName, exchangeName, routingKey);
            // 指定一個隊列
            // channel.queueDeclare(queneName, false, false, false, null);
            while (true) {
                channel.basicPublish(exchangeName, routingKey, null, UUID.randomUUID().toString().getBytes());
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }

    }
}

消費者

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 
 * @author
 * @date 2019/01/10 11:19:42
 */
public class ConsumerAllDemo {
    public static void main(String[] args) {
        String queneName = "firstQueue";
        String exchangeName = "amq.fanout";
        String routingKey = "test1";
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("test_vhosts");
            connection = factory.newConnection();
            channel = connection.createChannel();

            // 聲明交換機類型
            channel.exchangeDeclare(exchangeName, "fanout", true);
            // 聲明默認的隊列(也可不申明隊列,使用默認隊列)
            channel.queueDeclare(queneName, true, false, true, null);
            // String queue = channel.queueDeclare().getQueue();
            // 將隊列與交換機綁定
            channel.queueBind(queneName, exchangeName, routingKey);

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(envelope.getExchange() + "," + envelope.getRoutingKey() + "," + message);
                }
            };
            // channel綁定隊列、消費者,autoAck為true表示一旦收到消息則自動回復確認消息
            channel.basicConsume(queneName, true, consumer);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM