RabbitMQ/JAVA (發布/訂閱模式)


發布/訂閱模式即生產者將消息發送給多個消費者。

下面介紹幾個在發布/訂閱模式中的關鍵概念--

1. Exchanges (轉發器)

可能原來我們都是基於一個隊列發送和接收消息。現在介紹一下完整的消息傳遞模式。

Rabbitmq消息模式的核心理念是:生產者沒有直接發送任何消息到隊列實際上,生產者都不知道這個消息是發送給哪個隊列的。相反,生產者只能發送消息給轉發器

轉發器一方面接收生產者的消息,另一方面向隊列推送消息。

轉發器必須清楚的指導如何處理接收到的消息,需要附加隊列嗎?附加幾個?或者是否丟棄?這些規則通過轉發器的類型進行定義。類型有:Direct、Topic、Headers、Fanout。

這里我們關注最后一個。現在讓我們創建一個Fanout類型的轉發器,定義如下:

channel.exchangeDeclare("logs", "fanout"); 

2. Nameless exchange(匿名轉發)

之前我們對轉發器可能一無所知,但還是可以將消息發送到隊列,那是因為我們用了默認的轉發器,轉發器名為空字符串" "。之前我們發布消息的代碼是:

channel.basicPublish("", "hello", null, message.getBytes());

第一個參數就是轉發器的名字。空字符串表示匿名的轉發器。消息通過隊列的routingKey路由到指定的隊列中去。

現在我們就可以指定轉發器的名字了;

channel.basicPublish( "logs", "", null, message.getBytes());

3.Temporary queues(臨時隊列)

當我們需要為消費者指定同一個隊列的時候,隊列有名字對我們來說是非常重要的。

但有時我們並不關心這個問題,我們只對當前流動的消息感興趣。這個時候我們采取以下兩個步驟解決:

1)當我們連接到RabbitMQ時,需要一個新的空隊列,為此我們需要創建一個隨機名字的空隊列,或者更好的。讓服務器選好一個隨機名字的空隊列直接給我們。

2)一旦消費者斷開連接,隊列將自動刪除。

這里我們提供一個無參的queueDeclare()方法,創建一個非持久化、獨立的、自動刪除的隊列,且名字是隨機生成的。

String queueName = channel.queueDeclare().getQueue();

queueName是一個隨機隊列名。

4.Bindings(綁定)

我們已經創建了一個廣播的轉發器和一個隨機隊列。現在需要告訴轉發器轉發消息到隊列。這個關聯轉發器和隊列我們叫他Binding。

channel.queueBind(queueName, "logs", "");

這樣,轉發器附加到日志隊列上去。

下面是一個關於日志系統的完整例子:

發送端代碼(生產者)EmitLog.java

package sublog;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {
    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException {
        /**
         * 創建連接連接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        // 設置MabbitMQ所在主機ip或者主機名
        factory.setHost("115.159.181.204");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 創建一個連接
        Connection connection = factory.newConnection();  
        // 創建一個頻道  
        Channel channel = connection.createChannel();  

        // 指定轉發——廣播
        ((com.rabbitmq.client.Channel) channel).exchangeDeclare(EXCHANGE_NAME, "fanout");

        for(int i=0;i<3;i++){
            // 發送的消息
            String message = "Hello World!";
            ((com.rabbitmq.client.Channel) channel).basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }

        // 關閉頻道和連接
        channel.close();
        connection.close();
    }
}

消費者1 ReceiveLogs2Console.java

package sublog;

import java.io.IOException;


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;


public class ReceiveLogs2Console {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws IOException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("115.159.181.204");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 打開連接和創建頻道,與發送端一樣
        com.rabbitmq.client.Connection connection =factory.newConnection();
        final Channel channel =  connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 聲明一個隨機隊列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        
        // 創建隊列消費者
        final Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
              }
            };
            channel.basicConsume(queueName, true, consumer);
    }
}

消費者2 ReceiveLogs2File.java

package sublog;


import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ReceiveLogs2File {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws IOException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("115.159.181.204");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 打開連接和創建頻道,與發送端一樣
        com.rabbitmq.client.Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 聲明一個隨機隊列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        
        // 創建隊列消費者
        final Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                print2File(message);
//                System.out.println(" [x] Received '" + message + "'");
              }
            };
            channel.basicConsume(queueName, true, consumer);
    }
    
    private static void print2File(String msg) {
        try {
            String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
            String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
            File file = new File(dir, logFileName + ".log");
            FileOutputStream fos = new FileOutputStream(file, true);
            fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
            fos.flush();
            fos.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }  
}

我們用1個生產者用於發送log消息,2個消費者,一個用於打印接收到的消息,另一個除了打印接收到的消息還寫有日志信息的文件。

生產者聲明了一個廣播模式的轉換器,訂閱這個轉換器的消費者都可以收到每一條消息。可以看到在生產者中,沒有聲明隊列。這也驗證了之前說的。生產者其實只關心exchange,至於exchange會把消息轉發給哪些隊列,並不是生產者關心的。

2個消費者,一個打印日志,一個寫入文件,除了這2個地方不一樣,其他地方一模一樣。也是聲明一下廣播模式的轉換器,而隊列則是隨機生成的,消費者實例啟動后,會創建一個隨機實例,這個在管理頁面可以看到(如圖)。而實例關閉后,隨機隊列也會自動刪除。最后將隊列與轉發器綁定。

注:運行的時候要先運行2個消費者實例,然后在運行生產者實例。否則獲取不到實例。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM