一、rabbitmq實現rpc調用的原理
·rabbitmq實現rpc的原理是:客戶端向一個隊列中發送消息,並注冊一個回調的隊列用於接收服務端返回的消息,該消息需要聲明一個叫做correaltionId的屬性,該屬性將是該次請求的唯一標識。服務端在接受到消息(在需要時可以驗證correaltionId)后,處理消息,並將消息發送到客戶端注冊的回調隊列中。原理圖如下:
二、代碼實現
下面我們將模擬實現一個rpc客戶端和rpc服務端。客戶端給服務端發送message,服務端收到后處理message,再將處理后的消息返給客戶端
rpc客戶端
/** * rpc客戶端 */ public class RpcClient { //發送消息的隊列名稱 private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); //創建回調隊列 String callbackQueue = channel.queueDeclare().getQueue(); //創建回調隊列,消費者從回調隊列中接收服務端傳送的消息 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(callbackQueue,true,consumer); //創建消息帶有correlationId的消息屬性 String correlationId = UUID.randomUUID().toString(); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(callbackQueue).build(); String message = "hello rabbitmq"; channel.basicPublish("",RPC_QUEUE_NAME,basicProperties,message.getBytes()); System.out.println("RpcClient send message " + message + ", correaltionId = " + correlationId); //接收回調消息 while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String receivedCorrelationId = delivery.getProperties().getCorrelationId(); if(correlationId.equals(receivedCorrelationId)){ System.out.println("RpcClient receive format message " + new String(delivery.getBody(), "UTF-8") + ", correaltionId = " + correlationId); break; } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
rpc服務端
/** * rpc服務器 */ public class RpcServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static String format(String message){ return "......" + message + "......"; } public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = null; try { connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null); QueueingConsumer consumer = new QueueingConsumer(channel); //聲明消費者預取的消息數量 channel.basicQos(1); channel.basicConsume(RPC_QUEUE_NAME,false,consumer);//采用手動回復消息 System.out.println("RpcServer waitting for receive message"); while (true){ //接收並處理消息 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody(), "UTF-8"); System.out.println("RpcServer receive message " + message); String response = format(message); //確認收到消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); //取出消息的correlationId AMQP.BasicProperties properties = delivery.getProperties(); String correlationId = properties.getCorrelationId(); //創建具有與接收消息相同的correlationId的消息屬性 AMQP.BasicProperties replyProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).build(); channel.basicPublish("",properties.getReplyTo(),replyProperties,response.getBytes()); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
先運行服務端,再運行客戶端,結果如下:
RpcClient
RpcServer
代碼gitbu地址:https://github.com/wutianqi/rabbitmq-learn.git
參考資料:https://www.cnblogs.com/LipeiNet/p/5980802.html