目錄
RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld
RabbitMQ學習總結 第三篇:工作隊列Work Queue
RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe
RabbitMQ學習總結 第六篇:Topic類型的exchange
RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)
在http://www.cnblogs.com/leocook/p/mq_rabbitmq_2.html 這篇博文中我們實現了怎么去使用work queue來把比較耗時的任務分散給多個worker。
但是,如果我們想在遠程的機器上的一個函數並等待它返回結果,我們應該怎么辦呢?這就是另外一種模式了,它被稱為RPC(Remote procedure call)。
本篇博文中我們來實現怎么用RabbitMQ來構建一個RPC系統:一個client(客戶端)和一個可擴展的RPC server(服務端)。這里我們來模擬一個返回斐波拉契數的RPC服務。
1、Client端接口
為了說明一個RPC服務時怎么工作的,我們來創建一個簡單的client類。這里來實現一個名字為call的方法來發送RPC請求,並發生阻塞,直到接收到回復:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
RPC注意事項:
雖然RPC是一種常用的模式,但它也有一些缺陷。當無法確定使用本地調用還是使用RPC時,問題就出來了。有的時候不確定程序的運行環境,這樣來做會給程序的調試增加了一定的復雜度。使用RPC並不能夠讓代碼變得更簡潔,濫用的話只會讓代碼變得更不方便維護。
伴隨着上邊的問題,咱們來看看下邊的建議:
- 確定能很明顯的分辨的出哪些調用是本地調用,哪些是遠程調用。
- 完善系統的文檔。清楚的標記出,模塊間的依賴關系。
- 處理錯誤情況。當RPC服務掛了之后,客戶端應該怎么去處理呢?
當有疑問時避免使用RPC。如果可以的話,你可以使用異步管道(不用RPC-阻塞),結果被異步推送到下一個計算環節。
2、回調隊列(Callback queue)
一般用RabbitMQ來實現RPC是很簡單的。客戶端發送一個請求消息然后服務器端回應一個響應消息。為了接收服務端的響應消息,我們需要在請求中發送一個callback queue地址。我們也可以使用一個默認的queue(Java客戶端獨有的)。如下:
callbackQueueName = channel.queueDeclare().getQueue(); //綁定callback queue BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
消息屬性:
AMQP協議在發送消息時,預定義了14個屬性連同消息一起發送出去。很多屬性都是很少用到的,除了下邊的這些:
消息的投遞模型(deliveryMode):使消息持久化,和work queue里的設置一樣。
上下文類型(contentType):用來描述媒體類型(mime-type)。例如常用的JSON格式,它的mime-type是application/json。
我們需要導包:
import com.rabbitmq.client.AMQP.BasicProperties;
3、Correlation Id
在上邊的方法中建議我們為每個RPC請求都創建一個call queue,這樣效率很低。我們有更好的辦法,為每一個client創建一個call queue。
這樣處理的話又出現了一個新的問題,無法確定接收到的響應是對應哪個請求的。這時候就需要correlationId屬性,我們為每一個請求都設置一個correlationId屬性。當我們從callback queue中接收到一條消息之后,我們將會查看correlationId屬性,這樣就可以用一個請求來與之匹配了。如果從callback queue接收到了一條消息后,發現其中的correlationId未能找到與之匹配的請求,那么將把這條消息丟掉。
你可能會問我們為什么要要在callback queue里忽略掉不知道的message,而不是報錯呢?這是因為服務器端可能會出現的一種情況,雖然可能性很小,但還是有可能性的,有可能在RPC發送了響應之后,在發送確認完成任務的信息之前服務器重啟了。如果這種情況發生了的話,重啟了RPC服務之后,它將會再次接收到之前的請求,這樣的話client將會重復處理響應,RPC服務應該是等冪的。
4、總結

我們的RPC工作原理如下:
- 當Client啟動時,它將會創建一個匿名的callback queue。
- 對於一次RPC請求,client會發送一條含有兩個屬性的消息:replyTo和correlationId。Reply是設置的callback queue,correlationId是設置的當前請求的標示符。
- 請求將會被發送到rpc_queue里。
- RPC的worker(RPC server)等待queue中的請求。當出現一個請求之后,他將會處理任務,並向replyTo隊列中發送消息。
- 客戶端會等待callback queue上的消息。當消息出現時,它將會檢查correlationId屬性是否能與之前發送請求時的屬性一直,若一致的話,client將會處理回復的消息。
5、最終實現
斐波拉契任務:
private static int fib(int n) throws Exception { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
這里定義計算斐波拉契數的方法,假設傳進去的整數都是正整數。
RPC服務端的代碼實現如下RPCServer.java:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n ==0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); } public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); //一次只接收一條消息 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); //開啟消息應答機制 channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //拿到correlationId屬性 BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); try { String message = new String(delivery.getBody(),"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e){ System.out.println(" [.] " + e.toString()); response = ""; } finally { //拿到replyQueue,並綁定為routing key,發送消息 channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); //返回消息確認信息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } } } }
服務器端代碼實現很簡單的:
- 建立連接,信道,聲明隊列
- 為了能把任務壓力平均的分配到各個worker上,我們在方法channel.basicQos里設置prefetchCount的值。
- 我們使用basicConsume來接收消息,並等待任務處理,然后發送響應。
RPC客戶端代碼實現RPCClient.java:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.util.UUID; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); //拿到一個匿名(並非真的匿名,拿到了一個隨機生成的隊列名)的隊列,作為replyQueue。 replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString();//拿到一個UUID //封裝correlationId和replyQueue屬性 BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); //推消息,並加上之前封裝好的屬性 channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //檢驗correlationId是否匹配,確定是不是這次的請求 if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(),"UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc!= null) { try { fibonacciRpc.close(); } catch (Exception ignore) {} } } } }
參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-six-java.html
