RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)


目錄

RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld

RabbitMQ學習總結 第三篇:工作隊列Work Queue

RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe

RabbitMQ學習總結 第五篇:路由Routing

RabbitMQ學習總結 第六篇:Topic類型的exchange

RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)

 

http://www.cnblogs.com/leocook/p/mq_rabbitmq_2.html 這篇博文中我們實現了怎么去使用work queue來把比較耗時的任務分散給多個worker。

但是,如果我們想在遠程的機器上的一個函數並等待它返回結果,我們應該怎么辦呢?這就是另外一種模式了,它被稱為RPC(Remote procedure call)。

本篇博文中我們來實現怎么用RabbitMQ來構建一個RPC系統:一個client(客戶端)和一個可擴展的RPC server(服務端)。這里我們來模擬一個返回斐波拉契數的RPC服務。

1、Client端接口

為了說明一個RPC服務時怎么工作的,我們來創建一個簡單的client類。這里來實現一個名字為call的方法來發送RPC請求,並發生阻塞,直到接收到回復:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

RPC注意事項:

雖然RPC是一種常用的模式,但它也有一些缺陷。當無法確定使用本地調用還是使用RPC時,問題就出來了。有的時候不確定程序的運行環境,這樣來做會給程序的調試增加了一定的復雜度。使用RPC並不能夠讓代碼變得更簡潔,濫用的話只會讓代碼變得更不方便維護。

伴隨着上邊的問題,咱們來看看下邊的建議:

  • 確定能很明顯的分辨的出哪些調用是本地調用,哪些是遠程調用。
  • 完善系統的文檔。清楚的標記出,模塊間的依賴關系。
  • 處理錯誤情況。當RPC服務掛了之后,客戶端應該怎么去處理呢?

當有疑問時避免使用RPC。如果可以的話,你可以使用異步管道(不用RPC-阻塞),結果被異步推送到下一個計算環節。

2、回調隊列(Callback queue)

一般用RabbitMQ來實現RPC是很簡單的。客戶端發送一個請求消息然后服務器端回應一個響應消息。為了接收服務端的響應消息,我們需要在請求中發送一個callback queue地址。我們也可以使用一個默認的queue(Java客戶端獨有的)。如下:

callbackQueueName = channel.queueDeclare().getQueue();

//綁定callback queue
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

消息屬性:

AMQP協議在發送消息時,預定義了14個屬性連同消息一起發送出去。很多屬性都是很少用到的,除了下邊的這些:

消息的投遞模型(deliveryMode):使消息持久化,和work queue里的設置一樣。

上下文類型(contentType):用來描述媒體類型(mime-type)。例如常用的JSON格式,它的mime-type是application/json。

我們需要導包:

import com.rabbitmq.client.AMQP.BasicProperties;

3、Correlation Id

在上邊的方法中建議我們為每個RPC請求都創建一個call queue,這樣效率很低。我們有更好的辦法,為每一個client創建一個call queue。

這樣處理的話又出現了一個新的問題,無法確定接收到的響應是對應哪個請求的。這時候就需要correlationId屬性,我們為每一個請求都設置一個correlationId屬性。當我們從callback queue中接收到一條消息之后,我們將會查看correlationId屬性,這樣就可以用一個請求來與之匹配了。如果從callback queue接收到了一條消息后,發現其中的correlationId未能找到與之匹配的請求,那么將把這條消息丟掉。

你可能會問我們為什么要要在callback queue里忽略掉不知道的message,而不是報錯呢?這是因為服務器端可能會出現的一種情況,雖然可能性很小,但還是有可能性的,有可能在RPC發送了響應之后,在發送確認完成任務的信息之前服務器重啟了。如果這種情況發生了的話,重啟了RPC服務之后,它將會再次接收到之前的請求,這樣的話client將會重復處理響應,RPC服務應該是等冪的。

4、總結

我們的RPC工作原理如下:

  • 當Client啟動時,它將會創建一個匿名的callback queue。
  • 對於一次RPC請求,client會發送一條含有兩個屬性的消息:replyTocorrelationId。Reply是設置的callback queue,correlationId是設置的當前請求的標示符。
  • 請求將會被發送到rpc_queue里。
  • RPC的worker(RPC server)等待queue中的請求。當出現一個請求之后,他將會處理任務,並向replyTo隊列中發送消息。
  • 客戶端會等待callback queue上的消息。當消息出現時,它將會檢查correlationId屬性是否能與之前發送請求時的屬性一直,若一致的話,client將會處理回復的消息。

5、最終實現

斐波拉契任務:

private static int fib(int n) throws Exception {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

這里定義計算斐波拉契數的方法,假設傳進去的整數都是正整數。

RPC服務端的代碼實現如下RPCServer.java:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
  
public class RPCServer {
  private static final String RPC_QUEUE_NAME = "rpc_queue";

  private static int fib(int n) {
    if (n ==0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
  }

  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
  
      connection = factory.newConnection();
      channel = connection.createChannel();
      
      channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
  
      //一次只接收一條消息
      channel.basicQos(1);
  
      QueueingConsumer consumer = new QueueingConsumer(channel);

      //開啟消息應答機制
      channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
      System.out.println(" [x] Awaiting RPC requests");
  
      while (true) {
        String response = null;
        
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        
        //拿到correlationId屬性
        BasicProperties props = delivery.getProperties();
        BasicProperties replyProps = new BasicProperties
                                         .Builder()
                                         .correlationId(props.getCorrelationId())
                                         .build();
        
        try {
          String message = new String(delivery.getBody(),"UTF-8");
          int n = Integer.parseInt(message);
  
          System.out.println(" [.] fib(" + message + ")");
          response = "" + fib(n);
        } catch (Exception e){
          System.out.println(" [.] " + e.toString());
          response = "";
        }
        finally {  
          //拿到replyQueue,並綁定為routing key,發送消息
          channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
          //返回消息確認信息
          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
      }
    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }                    
  }
}

服務器端代碼實現很簡單的:

  • 建立連接,信道,聲明隊列
  • 為了能把任務壓力平均的分配到各個worker上,我們在方法channel.basicQos里設置prefetchCount的值。
  • 我們使用basicConsume來接收消息,並等待任務處理,然后發送響應。

RPC客戶端代碼實現RPCClient.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;

public class RPCClient {
  private Connection connection;
  private Channel channel;
  private String requestQueueName = "rpc_queue";
  private String replyQueueName;
  private QueueingConsumer consumer;

  public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    //拿到一個匿名(並非真的匿名,拿到了一個隨機生成的隊列名)的隊列,作為replyQueue。
    replyQueueName = channel.queueDeclare().getQueue(); 
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
  }
  
  public String call(String message) throws Exception {     
    String response = null;
    String corrId = UUID.randomUUID().toString();//拿到一個UUID
    
    //封裝correlationId和replyQueue屬性
    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();
    //推消息,並加上之前封裝好的屬性
    channel.basicPublish("", requestQueueName, props, message.getBytes());
    
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      //檢驗correlationId是否匹配,確定是不是這次的請求
      if (delivery.getProperties().getCorrelationId().equals(corrId)) {
        response = new String(delivery.getBody(),"UTF-8");
        break;
      }
    }

    return response; 
  }
    
  public void close() throws Exception {
    connection.close();
  }
  
  public static void main(String[] argv) {
    RPCClient fibonacciRpc = null;
    String response = null;
    try {
      fibonacciRpc = new RPCClient();
  
      System.out.println(" [x] Requesting fib(30)");   
      response = fibonacciRpc.call("30");
      System.out.println(" [.] Got '" + response + "'");
    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (fibonacciRpc!= null) {
        try {
          fibonacciRpc.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
}

 

參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-six-java.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM