1. 寫在前面
昨天簡單學習了一個消息隊列項目——RabbitMQ,今天趁熱打鐵,將學到的東西記錄下來。
學習的資料主要是官網給出的6個基本的消息發送/接收模型,或者稱為6種不同的使用場景,本文便是對這6種模型加以敘述。
2. Tutorials
在學習6種模型之前,我們首先需要安裝RabbitMQ。RabbitMQ支持多種系統平台,各平台的安裝方法可以點此查看。安裝好之后,我們使用如下命令啟用Web端的管理插件:rabbitmq-plugins enable rabbitmq_management
,然后啟動RabbitMQ。接着用瀏覽器訪問http://localhost:15672/
,若能看到RabbitMQ相關Web頁面,說明啟動成功。
2.1 Hello World
正所謂萬事開頭難,我們先從最簡單的Hello World開始。首先當然是新建一個項目,導入RabiitMQ相關jar。我采用Maven來構建項目,因此只需要在pom文件中添加如下依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
接下來學習最簡單的消息隊列模型,如下圖:
在圖中,P
代表producer
,它是消息的生產者;C
代表consumer
,它是消息的消費者;而紅色的矩形正是我們所謂的消息隊列,它位於RabbitMQ
中(RabbitMQ
中可以有很多這樣的隊列,並且每個隊列都有一個唯一的名字)。生產者(們)可以將消息發送到消息隊列中,消費者(們)可以從消息隊列中取出消息。
這種模型是不是很簡單呢?下面我們使用Java,借助於RabbitMQ來實現這種模型的消息通信。
首先我們介紹如何send
消息到消息隊列。send
之前,當然是和RabbitMQ服務器建立連接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
接下來我們創建一個channel
,大多數API都是通過這個對象來調用的:
Channel channel = connection.createChannel();
之后,我們便可以調用channel
的如下方法去聲明一個隊列:
channel.queueDeclare("hello", false, false, false, null);
該方法的第一個參數是隊列的名稱,其余的參數先不管,之后會介紹。我們可以嘗試着去執行以上的5行代碼,然后打開Web端,可以看到新建了一個叫作hello
的隊列:
有了隊列,我們便可以向其中發送消息了,同樣還是調用channel
對象的API:
channel.basicPublish("", "hello", null, "Hello World".getBytes());
以上代碼所做的事情就是發送了一條字符串消息“Hello World”(第4個參數)到消息隊列。你可能注意到我們調用了String對象的getBytes
方法,沒錯,我們發送的實際上二進制數據。因此,理論上你能夠發送任何數據到消息隊列中,而不僅僅是文本信息。
第2個參數叫做路由鍵(routingKey),在該模型下必須與隊列名相同,至於為什么,和其他參數一樣,之后會了解到。
我們可以修改發送的文本,再次執行上述代碼,然后打開Web端查看,便可以查看到我們發送的消息:
點擊上圖的name字段下的hello,可以查看hello
隊列中的具體信息:
接下來,我們去嘗試着去獲取生產者發送的消息,和send
方法一樣,我們同樣需要連接服務器,創建channel
,聲明隊列:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
之后我們可以調用channel
的相關方法去監聽隊列,接收消息:
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
以上basicConsume
方法中,第一個參數是隊列的名字;第二個參數表示是否自動確認消息的接收情況,我們使用true,自動確認;第三個參數需要傳入一個實現了Consumer
接口的對象,我們簡單的new
一個默認的Consumer
的實現類DefaultConsumer
,然后在handleDelivery
方法中去處理接收到的消息(handleDelivery
方法會在接收到消息時被回調)。
運行以上代碼,我們可以打印出之前向隊列中send
的數據:
Hello World
Hello World2
下面是Hello World的完整代碼:
public class App {
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicPublish("", "hello", null, "Hello World2".getBytes());
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this){
// 因為以上接收消息的方法是異步的(非阻塞),當采用單元測試方式執行該方法時,程序會在打印消息前結束,因此使用wait來防止程序提前終止。若使用main方法執行,則不需要擔心該問題。
wait();
}
}
}
2.2 Work queues
接下來我們學習第二種模型——Work Queues。顧名思義,這種模型描述的是一個生產者(Boss)向隊列發消息(任務),多個消費者(worker)從隊列接受消息(任務),如下圖所示:
下面我們用代碼去實現。先是生產者send
消息到隊列,這次我們多發送些數據:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
for (int i = 0; i < 9; i++) {
channel.basicPublish("", "hello", null, String.valueOf(i).getBytes());
}
channel.close();
connection.close();
}
然后是消費者接收數據:
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
try {
// Thread.sleep(1000);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
synchronized (this) {
wait();
}
}
代碼基本上和Hello World
的代碼一樣,只是加上句sleep
來模擬消費者(worker)處理消息所花的時間。
我們可以先執行三次receive
方法(修改sleep
的時間,其中消費者1 sleep 10s,消費者2,3 sleep 1s),讓三個消費者(worker)一起等待消息的到來,然后執行send
方法發送9條消息,觀察三個消費者收到的消息情況。
若不出意外,你會看到如下的打印結果:
// --------消費者1--------
0
// 10s 后
3
// 10s 后
6
// --------消費者2--------
1
// 1s 后
4
// 1s 后
7
// --------消費者3--------
2
// 1s 后
5
// 1s 后
8
通過打印結果,我們可以總結出Work queues的幾個特點:
- 一條消息只會被一個消費者接收;
- 消息是平均分配給消費者的;
- 消費者只有在處理完某條消息后,才會收到下一條消息。
事實上,RabbitMQ會循環地(一個接一個地)發送消息給消費者,這種分配消息的方式被稱為round-robin(輪詢)。
2.2.1 消息確認
看到這里,不知你是否會擔心:由於worker(消費者)執行任務需要一定的時間(以上用sleep模擬),要是某個worker在運行過程中掛掉,那分配給它的任務豈不是丟失了(永遠不可能被執行了)。為解決這個問題,RabbitMQ提供了消息確認機制,即worker需要主動的去確認消息已經接收了,RabbitMQ才認為消息被“真正地接收了”,實現代碼如下:
// send的代碼不用變,只需改變basicConsume的第二個參數為false,表示不要自動確認
channel.basicConsume("hello", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
try {
// 這里把時間加長了一點便於測試
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 這里手動地確定
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
下面做測試。首先執行send
方法,發送9條消息到隊列,查看web端情況如下:
此時隊列中有9條未被分發的消息。接着運行改變后的receive
方法,然后快速地去Web端查看隊列中的消息情況(記得刷新):
發現隊列中沒有待分發(Ready字段)的消息了,而有9條未被確認(Unacked字段)的消息。但控制台打印出數字6
時,關閉程序,再次去web端查看:
此時隊列中又有3條待分發的消息了。原因正是由於我們提前終止了receive
方法的執行,導致最后3條消息沒有被確認而被重新歸還到Ready中。
2.2.2 消息持久化
如果你不是一次性跟着本文運行代碼到這里,而是第二天接着昨天的工作繼續進行,你可能會發現昨天你創建的隊列和添加到隊列里的消息沒有了。很可能的原因就是消息沒有持久化,即按照以上代碼運行生成的隊列和添加到隊列中的消息都是儲存在內存中的,RabbitMQ一旦關閉它們就沒有了。如果你想將下次啟動時還能看到關閉前的消息,你應該將其持久化:
// 將第二個參數設為true,表示聲明一個需要持久化的隊列。
// 需要注意的是,若你已經定義了一個非持久的,同名字的隊列,要么將其先刪除(不然會報錯),要么換一個名字。
channel.queueDeclare("hello", true, false, false, null);
// 修改了第三個參數,這是表明消息需要持久化
channel.basicPublish("", "hello",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
總的來說,Work queues(Task Queuess)的概念在一些Web場景的應用中是很有用的,比如我們能夠用它來構建一個master-slave結構的分布式爬蟲系統:系統中有一個master節點和多個slave節點,master節點負責向各個slave節點分配爬取任務。
2.3 Publish/Subscribe
但有些時候,我們可能希望一條消息能夠被多個消費者接受到,比如一些公告信息等,這時候用Work Queue模型顯然不合適,而Publish/Subscribe模型正是對應這種使用場景的。
在介紹Publish/Subscribe之前,我們快速回顧之前的兩個模型,它們好像都是生產者將消息直接發送到消息隊列,但其實不是這樣的,甚至有可能生產者根本就不知道消息發送到了哪一個消息隊列。
先別着急,下面我們完整地介紹RabbitMQ消息發送/接受的方式。
事實上,生產者是把消息發送到了交換機(exchange)中,然后交換機負責(決定)將消息發送到(哪一個)消息隊列中。其模型如下圖:
這時候你可能會疑惑:既然消息是被發送到了交換機中,那我們之前發送的消息是被發送到了哪一個交換機中了?它有沒有機制能夠讓特定的消息發送到指定的隊列?
先回答第一個問題。還記得我們在Hello World中寫的發送消息的代碼嗎?
channel.basicPublish("", "hello", null, message.getBytes());
事實上第一個參數便是指定交換機的名字,即指定消息被發送到哪一個交換機。空字符串表示默認交換機(Default Exchange),即我們之前發送的消息都是先發送到默認交換機,然后它再路由到相應的隊列中。其實我們可以通過Web頁面去查看所有存在的交換機:
接着回答第二個問題。路由的依據便是通過第二個參數——路由鍵(routing key)指定的,之前已經提到過。在之前代碼中,我們指定第二個參數為"hello",便是指定消息應該被交換機路由到路由鍵為hello的隊列中。而默認交換機(Default Exchange)有一個非常有用的性質:
每一個被創建的隊列都會被自動的綁定到默認交換機上,並且路由鍵就是隊列的名字。
交換機還有4種不同的類型,分別是direct
,fanout
,topic
,headers
,每種類型路由的策略不同。
direct
類型的交換機要求和它綁定的隊列帶有一個路由鍵K,若有一個帶有路由鍵R的消息到達了交換機,交換機會將此消息路由到路由鍵K = R的隊列。默認交換機便是該類型。因此,在下圖中,消息會沿着綠色箭頭路由:
fanout
類型的交換機會路由每一條消息到所有和它綁定的隊列,忽略路由鍵。
剩下的兩種類型之后再做介紹。
在以上概念基礎上,我們來看第3種消息模型:Publish/Subscribe。如下圖:
該模型是要讓所有的消費者都能夠接收到每一條消息。顯然,fanout
類型的交換機更符合我們當前的需求。為此,先創建一個fanout
類型的交換機。
channel.exchangeDeclare("notice", "fanout");
其中,第一個參數是交換機的名稱;第二個參數是交換機的類型。
然后我們可以send
消息了:
channel.basicPublish( "notice", "", null, message.getBytes());
對於消費者,我們需要為每一個消費者創建一個獨立的隊列,然后將隊列綁定到剛才指定的交換機上即可:
// 該方法會創建一個名稱隨機的臨時隊列
String queueName = channel.queueDeclare().getQueue();
// 將隊列綁定到指定的交換機("notice")上
channel.queueBind(queueName, "notice", "");
以下完整的代碼:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice", "fanout");
channel.basicPublish( "notice", "", null, "Hello China".getBytes());
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "notice", "");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this) {
wait();
}
}
首先運行兩次receive
方法,讓兩個消費者等待接收消息,然后可以在Web端查看此時的隊列情況,如下圖所示:
可以看到圖中有兩個名稱隨機的隊列。接着運行send
方法發送一條消息,最終我們會看到兩個消費者都打印出了Hello China
。然后停止虛擬機讓消費者斷開連接,再次在Web端查看隊列情況,會發現剛才的兩個隊列被自動刪除了。
2.4 Routing
以上是Publish/Subscribe模式,它已經能讓我們的通知(notice)系統正常運轉了。現在再考慮這樣一個新需求:對於一些機密通知,我們只想讓部分人看到。這就要求交換機對綁定在其上的隊列進行篩選,於是引出了又一個新的模型:Routing。
之前我們說過,對於direct
類型的交換機,它會根據routing key進行路由,因此我們可以借助它來實現我們的需求,模型結構如下圖:
下面用代碼來實現。先看生產者。
首先要聲明一個direct
類型的交換機:
// 這里名稱改為notice2
channel.exchangeDeclare("notice2", "direct");
需要注意的是,因為我們之前聲明了一個fanout
類型的名叫notice
的交換機,因此不能再聲明一個同名的類型卻不一樣的交換機。
然后可以發送消息了,我們發送10條消息,其中偶數條消息是秘密消息,只能被routing key 為s的隊列接受,其余的消息所有的隊列都能接受。
for (int i = 0; i < 10; i++) {
String routingKey = "n"; // normal
if (i % 2 == 0) {
routingKey = "s"; // secret
}
channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
}
接下來是消費者:
// 聲明一個名稱隨機的臨時的隊列
String queueName = channel.queueDeclare().getQueue();
// 綁定交換機,同時帶上routing key
channel.queueBind(queueName, "notice2", "n");
// 消費者2號運行時,打開以下注釋
// channel.queueBind(queueName, "notice2", "s");
注意,我們可以多次調用隊列綁定方法,調用時,隊列名和交換機名都相同,而routing key不同,這樣可以使一個隊列帶有多個routing key。
以下是完整代碼:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice2", "direct");
for (int i = 0; i < 10; i++) {
String routingKey = "n"; // normal
if (i % 2 == 0) {
routingKey = "s"; // secret
}
channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
}
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice2", "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "notice2", "n");
// channel.queueBind(queueName, "notice2", "s");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this) {
wait();
}
}
測試時,我們可以先運行一個receive
,然后打開channel.queueBind(queueName, "notice2", "s")
注釋,再運行一次receive
,這樣就有兩個消費者綁定到notice2交換機上,其中消費者1只能收到normal類型的消息,而消費者2既能收到normal類型的消息,又能收到secret類型的消息。接着可以運行send方法。如不出意外,可以看到如下打印結果:
// 消費者1
1
3
5
7
9
// 消費者2
0
1
2
3
4
5
6
7
8
9
2.5 Topic
有了以上的改進,我們的notice
系統基本ok了。但有些時候,我們還需要更加靈活的消息刷選方式。比如我們對於電影信息,我們可能需要對它的地區,類型,限制級進行篩選。這時候就要借助Topics模型了。
在Topics模型中,我們“升級”了routing key,它可以由多個關鍵詞組成,詞與詞之間由點號(.
)隔開。特別地,規定*
表示任意的一個詞;#
號表示任意的0個或多個詞。
假設我們現在需要接收電影信息,每條電影消息附帶的routingKey有地區、類型、限制級3個關鍵字,即:district.type.age
。現在想實現的功能如下圖:
如上圖所示,隊列Q1只關心美國適合13歲以上的電影信息,隊列Q2對動作片感興趣,而隊列Q3喜歡中國電影。
下面用Java代碼去實現上述功能,相較於之前基本上沒有什么改變,下面直接給出代碼:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("movie", "topic");
channel.basicPublish("movie", "American.action.13", null, "The Bourne Ultimatum".getBytes());
channel.basicPublish("movie", "American.comedy.R", null, "The Big Lebowski".getBytes());
channel.basicPublish("movie", "American.romantic.13", null, "Titanic".getBytes());
channel.basicPublish("movie", "Chinese.action.13", null, "卧虎藏龍".getBytes());
channel.basicPublish("movie", "Chinese.comedy.13", null, "大話西游".getBytes());
channel.basicPublish("movie", "Chinese.romantic.13", null, "梁山伯與祝英台".getBytes());
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("movie", "topic");
// 隊列1
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "movie", "American.*.13");
// 隊列2
// String queueName = channel.queueDeclare().getQueue();
// channel.queueBind(queueName, "movie", "*.action.*");
// 隊列3
// String queueName = channel.queueDeclare().getQueue();
// channel.queueBind(queueName, "movie", "Chinese.#");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this) {
wait();
}
}
運行3次receive
方法,注意打開或關閉相應的注釋;再運行send
方法,可以看到控制台輸出如下內容:
// 消費者1
The Bourne Ultimatum
Titanic
// 消費者2
The Bourne Ultimatum
卧虎藏龍
// 消費者3
卧虎藏龍
大話西游
梁山伯與祝英台
2.6 RPC
第6種模型是用來做RPC(Remote procedure call, 遠程程序調用)的。這里直接貼上代碼,就不做解釋了,想要了解更多細節,可參考這里。代碼演示的是,客戶端調用服務端的fib
方法,得到返回結果。
RPCServer.java
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* Description:
*
* @author derker
* @Time 2016-10-26 18:24
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
AMQP.BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
RPCClient.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;
/**
* Description:
*
* @author derker
* @Time 2016-10-26 18:36
*/
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(10)");
response = fibonacciRpc.call("10");
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}