目錄
RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld
RabbitMQ學習總結 第三篇:工作隊列Work Queue
RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe
RabbitMQ學習總結 第六篇:Topic類型的exchange
RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)
上篇中我們實現了Work Queue的創建,在Work Queue背后,其實是rabbitMQ把每條任務消息只發給一個消費者。本篇中我們將要研究如何把一條消息推送給多個消費者,這種模式被稱為publish/subscribe(發布/訂閱)。
為了說明這個模式,我們將會構建一個簡單的日志系統。這將會包含兩部分程序,第一個是發送日志信息,第二個將會接收並打印它們。
在我們的日志系統里,每個運行的消費者程序都能接收到消息。這樣我就運行一個receiver並把日志寫到磁盤上,同時我們再運行另外一個消費者來把日志消息打印到屏幕上。
從本質上來說,是把日志消息推送到了所有的消費者端。
1、消息交換機
上篇中我們往Queue里發送消息,並從Queue里取出消息。現在我們來介紹RabbitMQ的完全消息模型。
我們來快速回顧一下之前博文中的內容:
- 一個生產者者應用程序發送消息;
- 一個消息隊列用來存儲和緩存消息;
- 一個消費者程序接收消息
RabbitMQ的消息發送模型核心思想是生產者不直接把消息發送到消息隊列中。事實上,生產者不知道自己的消息將會被緩存到哪個隊列中。
其實生產者者可以把消息發送到exchange(消息交換機)上。exchange是一個很簡單的事情,它一邊接收生產者的消息,另一邊再把消息推送到消息隊列中。Exchange必須知道在它接收到一條消息時應該怎么去處理。應該把這條消息推送到指定的消息隊列中?還是把消息推送到所有的隊列中?或是把消息丟掉?這些規則都可以用exchange類型來定義。
有一些可用的exchange類型:direct, topic, headers和fanout。這里我們主要看最后一個:fanout,這里我們創建一個名字為logs、類型為fanout的exchange:
channel.exchangeDeclare("logs", "fanout");
fanout類型的exchange是很簡單的。就是它把它能接收到的所有消息廣播到它知道的所有隊列中。這正是我們的日志系統所需要的。
列出exchange:
可以在服務器上使用rabbitmqctl命令來列出RabbitMQ服務器上的所有消息exchange:
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
在這個列表中有一些形如amp.*的exchange,還有默認(未命名)的交換機。這些都是被默認創建的,但這些已經被默認創建的都不是你現在需要用到的。
沒有名字的exchange:
在之前的博文里沒有使用都exchange的相關知識,但是任然能夠發送消息。之所以能發送成功是因為我們使用一個默認exchange,我們使用(””)來標識的。
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數就是exchange的名字。空字符串的符號指的是默認的或沒有命名的exchange:消息會根據routingKey被路由到指定的消息隊列中。
現在我們來吧消息推送到已命名的exchange上:
channel.basicPublish( "logs", "", null, message.getBytes());
2、臨時隊列
如果你看過之前幾篇的博文,應該會發現我們都是使用了一個指定名字的消息隊列(hello和task_queue)。對應的生產者和消費者之間都要使用相同的消息隊列名稱,這在很重要的。
但是在我們的log系統中卻不是這樣,我們希望能夠接收到所有的log消息,不只是其中的一部分。我們只要處理當前的log消息,不用管過去的歷史log。為了實現,我們需要做以下兩步:
- 無論什么時候我們和RabbitMQ建立連接時,我們都要刷新、清空Queue。為了達到這一的目的,我們可以用一個隨機的名字(隨機性可由自己來定義)來創建Queue,也可以讓服務器來自動建立一個隨見的Queue。
- 當消費者斷開連接時,Queue能自動被刪除。
使用Java客戶端時,我們使用無參數的queueDeclare方法,就可以創建一個已經生成名字的、排他性的且會自動刪除的Queue:
String queueName = channel.queueDeclare().getQueue();
這是就拿到了一個隨機名字的queue,形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg
3、綁定(bindings)
我們已經創建了一個fanout類型的exchange和一個隊列。現在我們需要讓exchange向我們的queue里發送消息。Exchange和queue之間關系被稱為binding(綁定)。
channel.queueBind(queueName, "logs", "");
現在開始,名字為logs的exchange就會忘我們的queue里退消息了。
查看binding列表:
使用rabbitmqctl list_bindings命令來看已經存在的所有的binding。
4、最終實現
發送日志消息的生產者程序和之前的程序沒有太多的差別。最大的區別就是我們把消息推送到一個命名的exchange上,而不是之前未命名的默認exchange。在我們發送消息時需要提供一個routingKey,但對於fanout類型的exchange可以忽略。下邊是生產者的代碼EmitLog.java:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明exchange名字以及類型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // getMessage的實現請見上篇博文 String message = getMessage(argv); //指定exchange的名字 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
正如你所見,在建立連接后我們聲明了exchange。這一步是必須的,因為禁止向一個不存在的exchange推送消息。
如果沒有向exchange負責的queue,那么消息將會被丟失,這是沒有問題的;如果沒有消費者監聽的話,我們會安全的丟掉這些消息。
ReceiveLogs.java的代碼如下:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明消息路由的名稱和類型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //聲明一個隨機消息隊列 String queueName = channel.queueDeclare().getQueue(); //綁定消息隊列和消息路由 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); //啟動一個消費者 channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
- 編譯文件:
javac -cp .:amqp-client-3.3.5.jar ReceiveLogs.java EmitLog.java
- 把日志存到文件里:
java -cp .:amqp-client-3.3.5.jar ReceiveLogs > logs_from_rabbit.log
然后監聽該日志文件:
tail -10f logs_from_rabbit.log
- 往屏幕上打印日志消息:
java -cp .:amqp-client-3.3.5.jar ReceiveLogs
- 啟動生產者:
java -cp .:amqp-client-3.3.5.jar EmitLog
日志輸出到文件中:
日志消息打印到了屏幕上:
在運行ReceiveLogs的時候,使用rabbitmqctl list_bindings命令來查看RabbitMQ中的exchange:
leo@leocook:~$ sudo rabbitmqctl list_bindings Listing bindings ... exchange amq.gen-1Zuyn_44c8IWsdJWrI42Og queue amq.gen-1Zuyn_44c8IWsdJWrI42Og [] exchange amq.gen-rSrGSPWLNTuq1dfXipPfAA queue amq.gen-rSrGSPWLNTuq1dfXipPfAA [] exchange task_queue queue task_queue [] logs exchange amq.gen-1Zuyn_44c8IWsdJWrI42Og queue [] logs exchange amq.gen-rSrGSPWLNTuq1dfXipPfAA queue [] ...done.
總結:
1、在生產者和消費者的信道中聲明exchange名字以及類型
2、在生產者的信道中指定發送目標的exchange
3、在消費者端的信道中聲明一個隨機的消息隊列,並拿到這個隊列名稱;然后在信道上綁定該消息隊列和消息路由
下篇咱們來討論,消費者端怎么才能拿到生產者發送消息中的部分消息。
參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-three-java.html