RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe


目錄

RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld

RabbitMQ學習總結 第三篇:工作隊列Work Queue

RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe

RabbitMQ學習總結 第五篇:路由Routing

RabbitMQ學習總結 第六篇:Topic類型的exchange

RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)

 

上篇中我們實現了Work Queue的創建,在Work Queue背后,其實是rabbitMQ把每條任務消息只發給一個消費者。本篇中我們將要研究如何把一條消息推送給多個消費者,這種模式被稱為publish/subscribe(發布/訂閱)。

為了說明這個模式,我們將會構建一個簡單的日志系統。這將會包含兩部分程序,第一個是發送日志信息,第二個將會接收並打印它們。

在我們的日志系統里,每個運行的消費者程序都能接收到消息。這樣我就運行一個receiver並把日志寫到磁盤上,同時我們再運行另外一個消費者來把日志消息打印到屏幕上。

從本質上來說,是把日志消息推送到了所有的消費者端。

1、消息交換機

上篇中我們往Queue里發送消息,並從Queue里取出消息。現在我們來介紹RabbitMQ的完全消息模型。

我們來快速回顧一下之前博文中的內容:

  • 一個生產者者應用程序發送消息;
  • 一個消息隊列用來存儲和緩存消息;
  • 一個消費者程序接收消息

RabbitMQ的消息發送模型核心思想是生產者不直接把消息發送到消息隊列中。事實上,生產者不知道自己的消息將會被緩存到哪個隊列中。

其實生產者者可以把消息發送到exchange(消息交換機)上。exchange是一個很簡單的事情,它一邊接收生產者的消息,另一邊再把消息推送到消息隊列中。Exchange必須知道在它接收到一條消息時應該怎么去處理。應該把這條消息推送到指定的消息隊列中?還是把消息推送到所有的隊列中?或是把消息丟掉?這些規則都可以用exchange類型來定義。

有一些可用的exchange類型:direct, topic, headers和fanout。這里我們主要看最后一個:fanout,這里我們創建一個名字為logs、類型為fanout的exchange:

channel.exchangeDeclare("logs", "fanout");

fanout類型的exchange是很簡單的。就是它把它能接收到的所有消息廣播到它知道的所有隊列中。這正是我們的日志系統所需要的。

列出exchange

可以在服務器上使用rabbitmqctl命令來列出RabbitMQ服務器上的所有消息exchange:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

在這個列表中有一些形如amp.*的exchange,還有默認(未命名)的交換機。這些都是被默認創建的,但這些已經被默認創建的都不是你現在需要用到的。

沒有名字的exchange

在之前的博文里沒有使用都exchange的相關知識,但是任然能夠發送消息。之所以能發送成功是因為我們使用一個默認exchange,我們使用(””)來標識的。

channel.basicPublish("", "hello", null, message.getBytes());

第一個參數就是exchange的名字。空字符串的符號指的是默認的或沒有命名的exchange:消息會根據routingKey被路由到指定的消息隊列中。

現在我們來吧消息推送到已命名的exchange上:

channel.basicPublish( "logs", "", null, message.getBytes());

2、臨時隊列

如果你看過之前幾篇的博文,應該會發現我們都是使用了一個指定名字的消息隊列(hello和task_queue)。對應的生產者和消費者之間都要使用相同的消息隊列名稱,這在很重要的。

但是在我們的log系統中卻不是這樣,我們希望能夠接收到所有的log消息,不只是其中的一部分。我們只要處理當前的log消息,不用管過去的歷史log。為了實現,我們需要做以下兩步:

  1. 無論什么時候我們和RabbitMQ建立連接時,我們都要刷新、清空Queue。為了達到這一的目的,我們可以用一個隨機的名字(隨機性可由自己來定義)來創建Queue,也可以讓服務器來自動建立一個隨見的Queue。
  2. 當消費者斷開連接時,Queue能自動被刪除。

使用Java客戶端時,我們使用無參數的queueDeclare方法,就可以創建一個已經生成名字的、排他性的且會自動刪除的Queue:

String queueName = channel.queueDeclare().getQueue();

這是就拿到了一個隨機名字的queue,形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg

3、綁定(bindings)

我們已經創建了一個fanout類型的exchange和一個隊列。現在我們需要讓exchange向我們的queue里發送消息。Exchange和queue之間關系被稱為binding(綁定)。

channel.queueBind(queueName, "logs", "");

現在開始,名字為logs的exchange就會忘我們的queue里退消息了。

查看binding列表:

使用rabbitmqctl list_bindings命令來看已經存在的所有的binding。

4、最終實現

發送日志消息的生產者程序和之前的程序沒有太多的差別。最大的區別就是我們把消息推送到一個命名的exchange上,而不是之前未命名的默認exchange。在我們發送消息時需要提供一個routingKey,但對於fanout類型的exchange可以忽略。下邊是生產者的代碼EmitLog.java:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //聲明exchange名字以及類型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        
        // getMessage的實現請見上篇博文
        String message = getMessage(argv);

        //指定exchange的名字
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

正如你所見,在建立連接后我們聲明了exchange。這一步是必須的,因為禁止向一個不存在的exchange推送消息。

如果沒有向exchange負責的queue,那么消息將會被丟失,這是沒有問題的;如果沒有消費者監聽的話,我們會安全的丟掉這些消息。

ReceiveLogs.java的代碼如下:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //聲明消息路由的名稱和類型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //聲明一個隨機消息隊列
        String queueName = channel.queueDeclare().getQueue();
        
        //綁定消息隊列和消息路由
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        
        //啟動一個消費者
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");
        }
    }
}
  • 編譯文件:

javac -cp .:amqp-client-3.3.5.jar ReceiveLogs.java EmitLog.java

  • 把日志存到文件里:

java -cp .:amqp-client-3.3.5.jar ReceiveLogs > logs_from_rabbit.log

然后監聽該日志文件:

tail -10f logs_from_rabbit.log

  • 往屏幕上打印日志消息:

java -cp .:amqp-client-3.3.5.jar ReceiveLogs

  • 啟動生產者:

java -cp .:amqp-client-3.3.5.jar EmitLog

日志輸出到文件中:

日志消息打印到了屏幕上:

在運行ReceiveLogs的時候,使用rabbitmqctl list_bindings命令來查看RabbitMQ中的exchange

leo@leocook:~$ sudo rabbitmqctl list_bindings
Listing bindings ...
        exchange        amq.gen-1Zuyn_44c8IWsdJWrI42Og  queue   amq.gen-1Zuyn_44c8IWsdJWrI42Og  []
        exchange        amq.gen-rSrGSPWLNTuq1dfXipPfAA  queue   amq.gen-rSrGSPWLNTuq1dfXipPfAA  []
        exchange        task_queue      queue   task_queue      []
logs    exchange        amq.gen-1Zuyn_44c8IWsdJWrI42Og  queue           []
logs    exchange        amq.gen-rSrGSPWLNTuq1dfXipPfAA  queue           []
...done.

總結:

1、在生產者和消費者的信道中聲明exchange名字以及類型

2、在生產者的信道中指定發送目標的exchange

3、在消費者端的信道中聲明一個隨機的消息隊列,並拿到這個隊列名稱;然后在信道上綁定該消息隊列和消息路由

 

下篇咱們來討論,消費者端怎么才能拿到生產者發送消息中的部分消息。

 

參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM