rabbitmq學習(四):利用rabbitmq實現遠程rpc調用


一、rabbitmq實現rpc調用的原理

·rabbitmq實現rpc的原理是:客戶端向一個隊列中發送消息,並注冊一個回調的隊列用於接收服務端返回的消息,該消息需要聲明一個叫做correaltionId的屬性,該屬性將是該次請求的唯一標識。服務端在接受到消息(在需要時可以驗證correaltionId)后,處理消息,並將消息發送到客戶端注冊的回調隊列中。原理圖如下:  

  

二、代碼實現

  下面我們將模擬實現一個rpc客戶端和rpc服務端。客戶端給服務端發送message,服務端收到后處理message,再將處理后的消息返給客戶端

  rpc客戶端

  

/**
 * rpc客戶端
 */
public class RpcClient {
    //發送消息的隊列名稱
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
           connection = connectionFactory.newConnection();
           channel = connection.createChannel();
           //創建回調隊列
           String callbackQueue = channel.queueDeclare().getQueue();
           //創建回調隊列,消費者從回調隊列中接收服務端傳送的消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(callbackQueue,true,consumer);

            //創建消息帶有correlationId的消息屬性
            String correlationId = UUID.randomUUID().toString();
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(callbackQueue).build();
            String message = "hello rabbitmq";
            channel.basicPublish("",RPC_QUEUE_NAME,basicProperties,message.getBytes());
            System.out.println("RpcClient send message " + message + ", correaltionId = " + correlationId);

            //接收回調消息
            while (true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String receivedCorrelationId = delivery.getProperties().getCorrelationId();
                if(correlationId.equals(receivedCorrelationId)){
                    System.out.println("RpcClient receive format message " + new String(delivery.getBody(), "UTF-8") + ", correaltionId = " + correlationId);
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}

 

   rpc服務端

  

/**
 * rpc服務器
 */
public class RpcServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static String format(String message){
        return "......" + message + "......";
    }

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //聲明消費者預取的消息數量
            channel.basicQos(1);
            channel.basicConsume(RPC_QUEUE_NAME,false,consumer);//采用手動回復消息
            System.out.println("RpcServer waitting for receive message");

            while (true){
                //接收並處理消息
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("RpcServer receive message " + message);
                String response = format(message);
                //確認收到消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

                //取出消息的correlationId
                AMQP.BasicProperties properties = delivery.getProperties();
                String correlationId = properties.getCorrelationId();

                //創建具有與接收消息相同的correlationId的消息屬性
                AMQP.BasicProperties replyProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).build();
                channel.basicPublish("",properties.getReplyTo(),replyProperties,response.getBytes());
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

   先運行服務端,再運行客戶端,結果如下:

  RpcClient

  

  RpcServer

  

 

代碼gitbu地址:https://github.com/wutianqi/rabbitmq-learn.git

參考資料:https://www.cnblogs.com/LipeiNet/p/5980802.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM