目錄
RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld
RabbitMQ學習總結 第三篇:工作隊列Work Queue
RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe
RabbitMQ學習總結 第六篇:Topic類型的exchange
RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)
1、相關概念
RabbitMQ是一個消息代理,事實上,它接收生產者產生的消息,然后將消息傳遞給消費者。在這個過程中,它可以路由,可以緩沖,或者更具你設定的規則來將消息持久化。RabbitMQ和消息傳輸過程中一般會用一些術語:
- 生產者(Producing ):
意思無非是指發送消息的那一端,如果一個程序發送消息,那么它就將被稱為生產者,這里用大寫的P來表示。

- 隊列(queue ):
相當於郵箱的名字,它活動在RabbitMQ服務器里邊。雖然消息流會通過RabbitMQ和你的應用程序,但是只會被存儲在隊列中。隊列是不受任何限制的,它可以盡可能多的去存儲你需要存儲的消息(本質上來說它是個無限緩沖)。可以多個生產者向同一個消息隊列發送消息,也可以多個消費者同時從一個消息隊列中來接收消息。消息隊列可以如下圖模型。

- 消費者(Consuming ):
即接收端,消費者主要是等待接收消息的程序,用下圖表示:

注意:在大多數應用場景中,生產者、消費者以及RabbitMQ服務是不會同時運行在一台機器上的。
下邊將會實現兩個Java程序,一個只發送生產者一條消息的生產者,一個接收消息、並打印消息的消費者。
在下邊的對話中,”P”是我們的生產者,”C”是我們的消費者,中間的是矩形是隊列(BabbitMQ維護的消息緩沖)

2、Java客戶端的包
RabbitMQ遵守AMQP協議,AMQP協議是一個開放、通用的消息協議。關於AMQP協議的客戶端,有多種語言的實現版本。本文使用的是RabbitMQ提供的Java客戶端。
可在http://www.rabbitmq.com/java-client.html這里下載,然后將相對應的Jar包拷貝你的工作目錄下。RabbitMQ的Java客戶端在Maven庫中也有,groupId是com.rabbitmq,artifactId是amqp-client。
3、發送端

我們將消息發送器起名為Send,消息接收器起名為Recv。發送器將會連接RabbitMQ,發送一條消息,然后退出。
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { /*連接可以抽象為socket連接,為我們維護協議版本信息和協議證書等。這里我們連接 上了本機的消息服務器實體(localhost)。如果我們想連接其它主機上的RabbitMQ服務,只需要修改一下主機名或是IP就可以了*/ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /*接下創建channel(信道),這是絕大多數API都能用到的。為了發送消息,你必須要聲明一個消息消息隊列,然后向該隊列里推送消息*/ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); /*聲明一個冪等的隊列(只有在該隊列不存在時,才會被創建)。消息的上下文是一個 字節數組,你可以指定它的編碼。*/ channel.close(); connection.close(); } }
注:若RabbitMQ服務磁盤空間不足的話,運行會出錯,參數設置為:disk_free_limit,更多的參數配置可在這里找到http://www.rabbitmq.com/configure.html#config-items
4、接收端
RabbitMQ會往接收器上推消息,與只發送一條消息的發送端不同,這里我們將監聽消息並將消息打印出來。

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { /*這里怎么打開連接和信道,以及聲明用於接收消息的隊列,這些步驟與發送端基本上是一樣的*/ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /*確保這里的隊列是存在的*/ channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); /*這里用到了額外的類QueueingConsumer來緩存服務器將要推過來的消息。我們通知服務器向接收端推送消息,
然后服務器將會向客戶端異步推送消息,這里提供了一個可以回調的對象來緩存消息,直到我們做好准備來使用
它,這個類就是QueueingConsumer*/ QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
消息接收完后QueueingConsumer.nextDelivery()將會發生阻塞,暫停運行,直到有其他的消息推過來。
如果你需要檢查和驗證隊列的話,需要使用rabbitmqctl list_queues。
參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-one-java.html
