RabbitMQ六中工作模式-RPC模式


RabbitMQ-RPC模式

如果我們需要在遠程電腦上運行一個方法,並且還要等待一個返回結果該怎么辦?這和前面的例子不太一樣, 這種模式我們通常稱為遠程過程調用,即RPC.

在本節中,我們將會學習使用RabbitMQ去搭建一個RPC系統:一個客戶端和一個可以升級(擴展)的RPC服務器。為了模擬一個耗時任務,我們將創建一個返回斐波那契數列的虛擬的RPC服務。

客戶端

在客戶端定義一個RPCClient類,並定義一個call()方法,這個方法發送一個RPC請求,並等待接收響應結果

RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println( "第四個斐波那契數是: " + result);

回調隊列 Callback Queue

使用RabbitMQ去實現RPC很容易。一個客戶端發送請求信息,並得到一個服務器端回復的響應信息。為了得到響應信息,我們需要在請求的時候發送一個“回調”隊列地址。我們可以使用默認隊列。下面是示例代碼:

//定義回調隊列,
//自動生成對列名,非持久,獨占,自動刪除
callbackQueueName = ch.queueDeclare().getQueue();

//用來設置回調隊列的參數對象
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();
//發送調用消息
ch.basicPublish("", "rpc_queue", props, message.getBytes());

消息屬性 Message Properties

AMQP 0-9-1協議定義了消息的14個屬性。大部分屬性很少使用,下面是比較常用的4個:

deliveryMode:將消息標記為持久化(值為2)或非持久化(任何其他值)。

contentType:用於描述mime類型。例如,對於經常使用的JSON格式,將此屬性設置為:application/json

replyTo:通常用於指定回調隊列。

correlationId:將RPC響應與請求關聯起來非常有用。

關聯id (correlationId):

在上面的代碼中,我們會為每個RPC請求創建一個回調隊列。 這是非常低效的,這里還有一個更好的方法:讓我們為每個客戶端創建一個回調隊列。

這就提出了一個新的問題,在隊列中得到一個響應時,我們不清楚這個響應所對應的是哪一條請求。這時候就需要使用關聯id(correlationId)。我們將為每一條請求設置唯一的的id值。稍后,當我們在回調隊列里收到一條消息的時候,我們將查看它的id屬性,這樣我們就可以匹配對應的請求和響應。如果我們發現了一個未知的id值,我們可以安全的丟棄這條消息,因為它不屬於我們的請求。

小結

RPC的工作方式是這樣的:

  • 對於RPC請求,客戶端發送一條帶有兩個屬性的消息:replyTo,設置為僅為請求創建的匿名獨占隊列,和correlationId,設置為每個請求的惟一id值。
  • 請求被發送到rpc_queue隊列。
  • RPC工作進程(即:服務器)在隊列上等待請求。當一個請求出現時,它執行任務,並使用replyTo字段中的隊列將結果發回客戶機。
  • 客戶機在回應消息隊列上等待數據。當消息出現時,它檢查correlationId屬性。如果匹配請求中的值,則向程序返回該響應數據。

代碼

服務器端

package rabbitmq.rpc;

import java.io.IOException;
import java.util.Random;
import java.util.Scanner;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		/*
		 * 定義隊列 rpc_queue, 將從它接收請求信息
		 * 
		 * 參數:
		 * 1. queue, 對列名
		 * 2. durable, 持久化
		 * 3. exclusive, 排他
		 * 4. autoDelete, 自動刪除
		 * 5. arguments, 其他參數屬性
		 */
		ch.queueDeclare("rpc_queue",false,false,false,null);
		ch.queuePurge("rpc_queue");//清除隊列中的內容
		
		ch.basicQos(1);//一次只接收一條消息
		
		
		//收到請求消息后的回調對象
		DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				//處理收到的數據(要求第幾個斐波那契數)
				String msg = new String(message.getBody(), "UTF-8");
				int n = Integer.parseInt(msg);
				//求出第n個斐波那契數
				int r = fbnq(n);
				String response = String.valueOf(r);
				
				//設置發回響應的id, 與請求id一致, 這樣客戶端可以把該響應與它的請求進行對應
				BasicProperties replyProps = new BasicProperties.Builder()
						.correlationId(message.getProperties().getCorrelationId())
						.build();
				/*
				 * 發送響應消息
				 * 1. 默認交換機
				 * 2. 由客戶端指定的,用來傳遞響應消息的隊列名
				 * 3. 參數(關聯id)
				 * 4. 發回的響應消息
				 */
				ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
				//發送確認消息
				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
			}
		};
		
		//
		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//消費者開始接收消息, 等待從 rpc_queue接收請求消息, 不自動確認
		ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
	}

	protected static int fbnq(int n) {
		if(n == 1 || n == 2) return 1;
		
		return fbnq(n-1)+fbnq(n-2);
	}
}

客戶端

package rabbitmq.rpc;

import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCClient {
	Connection con;
	Channel ch;
	
	public RPCClient() throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		con = f.newConnection();
		ch = con.createChannel();
	}
	
	public String call(String msg) throws Exception {
		//自動生成對列名,非持久,獨占,自動刪除
		String replyQueueName = ch.queueDeclare().getQueue();
		//生成關聯id
		String corrId = UUID.randomUUID().toString();
		
		//設置兩個參數:
		//1. 請求和響應的關聯id
		//2. 傳遞響應數據的queue
		BasicProperties props = new BasicProperties.Builder()
				.correlationId(corrId)
				.replyTo(replyQueueName)
				.build();
		//向 rpc_queue 隊列發送請求數據, 請求第n個斐波那契數
		ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
		
		//用來保存結果的阻塞集合,取數據時,沒有數據會暫停等待
		BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
		
		//接收響應數據的回調對象
		DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				//如果響應消息的關聯id,與請求的關聯id相同,我們來處理這個響應數據
				if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
					//把收到的響應數據,放入阻塞集合
					response.offer(new String(message.getBody(), "UTF-8"));
				}
			}
		};

		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//開始從隊列接收響應數據
		ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
		//返回保存在集合中的響應數據
		return response.take();
	}
	
	public static void main(String[] args) throws Exception {
		RPCClient client = new RPCClient();
		while (true) {
			System.out.print("求第幾個斐波那契數:");
			int n = new Scanner(System.in).nextInt();
			String r = client.call(""+n);
			System.out.println(r);
		}
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM