RabbitMQ的原理和使用


轉載:RabbitMQ從入門到精通

轉載:輕松搞定RabbitMQ

轉載:RabbitMQ Java入門教程

一、RabbitMQ

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

二、RabbitMQ的使用場景

對於一個大型的軟件系統來說,它會有很多的組件或者說模塊或者說子系統或者(subsystem or Component or submodule)。那么這些模塊的如何通信?這和傳統的IPC有很大的區別。傳統的IPC很多都是在單一系統上的,模塊耦合性很大,不適合擴展(Scalability);如果使用socket那么不同的模塊的確可以部署到不同的機器上,但是還是有很多問題需要解決。比如:
 1)信息的發送者和接收者如何維持這個連接,如果一方的連接中斷,這期間的數據如何方式丟失?
 2)如何降低發送者和接收者的耦合度?
 3)如何讓Priority高的接收者先接到數據?
 4)如何做到load balance?有效均衡接收者的負載?
 5)如何有效的將數據發送到相關的接收者?也就是說將接收者subscribe 不同的數據,如何做有效的filter。
 6)如何做到可擴展,甚至將這個通信模塊發到cluster上?
 7)如何保證接收者接收到了完整,正確的數據?
  AMDQ協議解決了以上的問題,而RabbitMQ實現了AMQP。

三、RabbitMQ的結構

RabbitMQ的應用場景架構圖如下:


  1. Broker:簡單來說就是消息隊列服務器實體。
  2. Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
  3. Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
  4. Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
  5. Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  6. vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
  7. producer:消息生產者,就是投遞消息的程序。
  8. consumer:消息消費者,就是接受消息的程序。
  9. channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

四、RabbitMQ的使用過程

AMQP模型中,消息在producer中產生,發送到MQ的exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將消息發送給consumer,消息從queue到consumer有push和pull兩種方式。 消息隊列的使用過程大概如下:

  1. 客戶端連接到消息隊列服務器,打開一個channel。
  2. 客戶端聲明一個exchange,並設置相關屬性。
  3. 客戶端聲明一個queue,並設置相關屬性。
  4. 客戶端使用routing key,在exchange和queue之間建立好綁定關系。
  5. 客戶端投遞消息到exchange。
exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。 exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。

4.0 安裝和配置

RabbitMQ使用Erlang語言實現,因此在使用時首先要安裝和配置erlang環境,並安裝服務器后進行相關配置,由於不是本文主要內容所以忽略,詳見RabbitMQ簡介

RabbitMQ的客戶端使用時需要添加相關依賴。

4.1 點對點


消息生產者的代碼如下:


   
   
  
  
          
  1. package com.zenhobby.rabbit.demo;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. public class Send
  6. {
  7. //隊列名稱
  8. private final static String QUEUE_NAME = "hello";
  9. public static void main(String[] argv) throws java.io.IOException
  10. {
  11. /**
  12. * 創建連接連接到MabbitMQ
  13. */
  14. ConnectionFactory factory = new ConnectionFactory();
  15. //設置MabbitMQ所在主機ip或者主機名
  16. factory.setHost( "localhost");
  17. //創建一個連接
  18. Connection connection = factory.newConnection();
  19. //創建一個頻道
  20. Channel channel = connection.createChannel();
  21. //指定一個隊列
  22. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  23. //發送的消息
  24. String message = "hello world!";
  25. //往隊列中發出一條消息
  26. channel.basicPublish( "", QUEUE_NAME, null, message.getBytes());
  27. System.out.println( " [x] Sent '" + message + "'");
  28. //關閉頻道和連接
  29. channel.close();
  30. connection.close();
  31. }
  32. }
消息消費者的代碼如下:


   
   
  
  
          
  1. package com.zenhobby.rabbit.demo;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class Recv
  7. {
  8. //隊列名稱
  9. private final static String QUEUE_NAME = "hello";
  10. public static void main(String[] argv) throws java.io.IOException,
  11. java.lang.InterruptedException
  12. {
  13. //打開連接和創建頻道,與發送端一樣
  14. ConnectionFactory factory = new ConnectionFactory();
  15. factory.setHost( "localhost");
  16. Connection connection = factory.newConnection();
  17. Channel channel = connection.createChannel();
  18. //聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。
  19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20. System.out.println( " [*] Waiting for messages. To exit press CTRL+C");
  21. //創建隊列消費者
  22. QueueingConsumer consumer = new QueueingConsumer(channel);
  23. //指定消費隊列,關閉默認的消息應答
  24. channel.basicConsume(QUEUE_NAME, true, consumer);
  25. while ( true)
  26. {
  27. //nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)
  28. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  29. String message = new String(delivery.getBody());
  30. System.out.println( " [x] Received '" + message + "'");
  31. }
  32. }
  33. }
隊列分別在生產者和消費者處創建,主要是為了防止有一端未建立起來的時候丟失消息。

4.2 工作隊列

工作隊列的主要任務是:避免立刻執行資源密集型任務,然后必須等待其完成。相反地,我們進行任務調度:我們把任務封裝為消息發送給隊列。工作進行在后台運行並不斷的從隊列中取出任務然后執行。當你運行了多個工作進程時,任務隊列中的任務將會被工作進程共享執行。這樣的概念在web應用中極其有用,當在很短的HTTP請求間需要執行復雜的任務。

1.消息分發機制

默認的,RabbitMQ會一個一個的發送信息給下一個消費者(consumer),而不考慮每個任務的時長等等,且是一次性分配,並非一個一個分配。平均的每個消費者將會獲得相等數量的消息。這樣分發消息的方式叫做round-robin。

默認的任務分發雖然看似公平但存在弊端。比如:現在有2個消費者,所有的奇數的消息都是繁忙的,而偶數則是輕松的。按照輪詢的方式,奇數的任務交給了第一個消費者,所以一直在忙個不停。偶數的任務交給另一個消費者,則立即完成任務,然后閑得不行。而RabbitMQ則是不了解這些的。這是因為當消息進入隊列,RabbitMQ就會分派消息。它不看消費者為應答的數目,只是盲目的將第n條消息發給第n個消費者。

為了解決這個問題,我們使用basicQos( prefetchCount = 1)方法,來限制RabbitMQ只發不超過1條的消息給同一個消費者。當消息處理完畢后,有了反饋,才會進行第二次發送。


   
   
  
  
          
  1. int prefetchCount = 1;
  2. channel.basicQos(prefetchCount);
使用公平分發,必須關閉自動應答,改為手動應答。

2. 消息確認

每個Consumer可能需要一段時間才能處理完收到的數據。如果在這個過程中,Consumer出錯了,異常退出了,而數據還沒有處理完成,那么非常不幸,這段數據就丟失了。因為我們采用no-ack的方式進行確認,也就是說,每次Consumer接到數據后,而不管是否處理完成,RabbitMQ Server會立即把這個Message標記為完成,然后從queue中刪除了。
為了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。為了保證數據能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應該是在處理完數據后發送ack。在處理數據后發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ可以去安全的刪除它了。如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數據也不會丟失。這里並沒有用到超時機制。RabbitMQ僅僅通過Consumer的連接中斷來確認該Message並沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做數據處理。
默認情況下,消息確認是打開的(enabled):


   
   
  
  
          
  1. boolean autoAck = false;
  2. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
修改消費者如下:


   
   
  
  
          
  1. channel.basicQos( 1); //保證一次只分發一個
  2. // 創建隊列消費者
  3. final Consumer consumer = new DefaultConsumer(channel) {
  4. @Override
  5. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  6. String message = new String(body, "UTF-8");
  7. System.out.println( " [x] Received '" + message + "'");
  8. System.out.println( " [x] Proccessing... at " + new Date().toLocaleString());
  9. try {
  10. for ( char ch: message.toCharArray()) {
  11. if (ch == '.') {
  12. Thread.sleep( 1000);
  13. }
  14. }
  15. } catch (InterruptedException e) {
  16. } finally {
  17. System.out.println( " [x] Done! at " + new Date().toLocaleString());
  18. channel.basicAck(envelope.getDeliveryTag(), false);
  19. }
  20. }
  21. };
其中:

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
  
  
 
 
         
用於在消息處理完畢時返回應答狀態。如果MQ服務器未收到應答則在消費者掛掉之后重新把消息放入到隊列中以供其他消費者使用。如果關閉了自動消息應答,手動也未設置應答,這是一個很簡單的錯誤,但是后果卻是極其嚴重的。消息在分發出去以后,得不到回應,所以不會在內存中刪除,結果RabbitMQ會越來越占用內存,導致服務器掛掉。

3. 消息持久化

為了保證在RabbitMQ退出或者crash了數據仍沒有丟失,需要將queueMessage都要持久化。

queue的持久化需要在聲明時指定durable=True:

channel.queue_declare(queue='hello', durable=True)  
  
  
 
 
         
message的持久化需要在發送時指定property:

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 
  
  
 
 
         

修改后的生產者如下所示:


   
   
  
  
          
  1. static void Main(string[] args)
  2. {
  3. var factory = new ConnectionFactory() { HostName = "localhost" };
  4. using (var connection = factory.CreateConnection())
  5. {
  6. using (var channel = connection.CreateModel())
  7. {
  8. bool durable = true;
  9. channel.QueueDeclare( "task_queue", durable, false, false, null); //queue的持久化需要在聲明時指定durable=True
  10. var message = GetMessage(args);
  11. var body = Encoding.UTF8.GetBytes(message);
  12. var properties = channel.CreateBasicProperties();
  13. properties.SetPersistent( true); //需要持久化Message,即在Publish的時候指定一個properties,
  14. channel.BasicPublish( "", "task_hello", properties, body);
  15. }
  16. }
  17. }

4.3 Publish/Subscribe

1. 交換器

在工作隊列一節中使用的分發如下:

channel.basicPublish("", "hello", null, message.getBytes());
  
  
 
 
         
其中第一個入參為空即為默認的交換器,交換器是RabbitMQ中的概念,其主要工作是接受生產者發出的消息,並推送到消息隊列中(生產者並沒有直接向queue中發送任何消息,而是發給交換器由交換器轉交)。

這里寫圖片描述
交換器的規則有:

  1. direct (直連):
  2. topic (主題)
  3. headers (標題)
  4. fanout (分發)

Direct Exchange – 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。 


   
   
  
  
          
  1. Channel channel = connection.createChannel();
  2. channel.exchangeDeclare( "exchangeName", "direct"); //direct fanout topic
  3. channel.queueDeclare( "queueName");
  4. channel.queueBind( "queueName", "exchangeName", "routingKey");
  5. byte[] messageBodyBytes = "hello world".getBytes();
  6. //需要綁定路由鍵
  7. channel.basicPublish( "exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);


Fanout Exchange – 不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。 Fanout交換機轉發消息是最快的。

   
   
  
  
          
  1. Channel channel = connection.createChannel();
  2. channel.exchangeDeclare( "exchangeName", "fanout"); //direct fanout topic
  3. channel.queueDeclare( "queueName");
  4. channel.queueBind( "queueName", "exchangeName", "routingKey");
  5. channel.queueDeclare( "queueName1");
  6. channel.queueBind( "queueName1", "exchangeName", "routingKey1");
  7. byte[] messageBodyBytes = "hello world".getBytes();
  8. //路由鍵需要設置為空
  9. channel.basicPublish( "exchangeName", "", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);


Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。


   
   
  
  
          
  1. Channel channel = connection.createChannel();
  2. channel.exchangeDeclare( "exchangeName", "topic"); //direct fanout topic
  3. channel.queueDeclare( "queueName");
  4. channel.queueBind( "queueName", "exchangeName", "routingKey.*");
  5. byte[] messageBodyBytes = "hello world".getBytes();
  6. channel.basicPublish( "exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

Header Exchange

Headers類型的exchange使用的比較少,它也是忽略routingKey的一種路由方式。是使用Headers來匹配的。

Headers是一個鍵值對,可以定義成Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。匹配有兩種方式all和any。這兩種方式是在接收端必須要用鍵值"x-mactch"來定義。

all代表定義的多個鍵值對都要滿足,而any則代碼只要滿足一個就可以了。

fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型

消息生產者如下:


   
   
  
  
          
  1. package cn.slimsmart.rabbitmq.demo.headers;
  2. import java.util.Date;
  3. import java.util.Hashtable;
  4. import java.util.Map;
  5. import org.springframework.amqp.core.ExchangeTypes;
  6. import com.rabbitmq.client.AMQP;
  7. import com.rabbitmq.client.AMQP.BasicProperties;
  8. import com.rabbitmq.client.AMQP.BasicProperties.Builder;
  9. import com.rabbitmq.client.Channel;
  10. import com.rabbitmq.client.Connection;
  11. import com.rabbitmq.client.ConnectionFactory;
  12. public class Producer {
  13. private final static String EXCHANGE_NAME = "header-exchange";
  14. @SuppressWarnings( "deprecation")
  15. public static void main(String[] args) throws Exception {
  16. // 創建連接和頻道
  17. ConnectionFactory factory = new ConnectionFactory();
  18. factory.setHost( "192.168.36.102");
  19. // 指定用戶 密碼
  20. factory.setUsername( "admin");
  21. factory.setPassword( "admin");
  22. // 指定端口
  23. factory.setPort(AMQP.PROTOCOL.PORT);
  24. Connection connection = factory.newConnection();
  25. Channel channel = connection.createChannel();
  26. //聲明轉發器和類型headers
  27. channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS, false, true, null);
  28. String message = new Date().toLocaleString() + " : log something";
  29. Map<String,Object> headers = new Hashtable<String, Object>();
  30. headers.put( "aaa", "01234");
  31. Builder properties = new BasicProperties.Builder();
  32. properties.headers(headers);
  33. // 指定消息發送到的轉發器,綁定鍵值對headers鍵值對
  34. channel.basicPublish(EXCHANGE_NAME, "",properties.build(),message.getBytes());
  35. System.out.println( "Sent message :'" + message + "'");
  36. channel.close();
  37. connection.close();
  38. }
  39. }

消息消費者如下:


   
   
  
  
          
  1. package cn.slimsmart.rabbitmq.demo.headers;
  2. import java.util.Hashtable;
  3. import java.util.Map;
  4. import org.springframework.amqp.core.ExchangeTypes;
  5. import com.rabbitmq.client.AMQP;
  6. import com.rabbitmq.client.Channel;
  7. import com.rabbitmq.client.Connection;
  8. import com.rabbitmq.client.ConnectionFactory;
  9. import com.rabbitmq.client.QueueingConsumer;
  10. public class Consumer {
  11. private final static String EXCHANGE_NAME = "header-exchange";
  12. private final static String QUEUE_NAME = "header-queue";
  13. public static void main(String[] args) throws Exception {
  14. // 創建連接和頻道
  15. ConnectionFactory factory = new ConnectionFactory();
  16. factory.setHost( "192.168.36.102");
  17. // 指定用戶 密碼
  18. factory.setUsername( "admin");
  19. factory.setPassword( "admin");
  20. // 指定端口
  21. factory.setPort(AMQP.PROTOCOL.PORT);
  22. Connection connection = factory.newConnection();
  23. Channel channel = connection.createChannel();
  24. //聲明轉發器和類型headers
  25. channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS, false, true, null);
  26. channel.queueDeclare(QUEUE_NAME, false, false, true, null);
  27. Map<String, Object> headers = new Hashtable<String, Object>();
  28. headers.put( "x-match", "any"); //all any
  29. headers.put( "aaa", "01234");
  30. headers.put( "bbb", "56789");
  31. // 為轉發器指定隊列,設置binding 綁定header鍵值對
  32. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);
  33. QueueingConsumer consumer = new QueueingConsumer(channel);
  34. // 指定接收者,第二個參數為自動應答,無需手動應答
  35. channel.basicConsume(QUEUE_NAME, true, consumer);
  36. while ( true) {
  37. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  38. String message = new String(delivery.getBody());
  39. System.out.println(message);
  40. }
  41. }
  42. }


Default Exchange

其實除了上面四種以外還有一種Default Exchange,它是一種特別的Direct Exchange
當你手動創建一個隊列時,后台會自動將這個隊列綁定到一個名稱為空的Direct類型交換機上,綁定路由名稱與隊列名稱相同。有了這個默認的交換機和綁定,我們就可以像其他輕量級的隊列,如Redis那樣,直接操作隊列來處理消息。不過只是看起來是,實際上在RabbitMQ里直接操作是不可能的。消息始終都是先發送到交換機,由交換級經過路由傳送給隊列,消費者再從隊列中獲取消息的。不過由於這個默認交換機和路由的關系,使我們只關心隊列這一層即可,這個比較適合做一些簡單的應用,畢竟沒有發揮RabbitMQ的最大功能,如果都用這種方式去使用的話就真是殺雞用宰牛刀了。

2. 臨時隊列

如果要在生產者和消費者之間創建一個新的隊列,又不想使用原來的隊列,臨時隊列就是為這個場景而生的:
首先,每當我們連接到RabbitMQ,我們需要一個新的空隊列,我們可以用一個隨機名稱來創建,或者說讓服務器選擇一個隨機隊列名稱給我們。
一旦我們斷開消費者,隊列應該立即被刪除。Java客戶端提供queuedeclare()為我們創建一個非持久化、獨立、自動刪除的隊列名稱。

String queueName = channel.queueDeclare().getQueue();
  
  
 
 
         
通過上面的代碼就能獲取到一個隨機隊列名稱。 例如:它可能是:amq.gen-jzty20brgko-hjmujj0wlg。

3. 綁定

這里寫圖片描述

如果我們已經創建了一個分發交換器和隊列,現在我們就可以就將我們的隊列跟交換器進行綁定。

channel.queueBind(queueName, "logs", "");
  
  
 
 
         
執行完這段代碼后,日志交換器會將消息添加到我們的隊列中。


五、RabbitMQ實現RPC

RabbitMQ可以用於實現RPC,兩者有相像之處,使用RabbitMQ實現RPC分為如下幾個步驟:

1. Client interface(客戶端接口)

為了說明RPC服務可以使用,我們創建一個簡單的客戶端類。暴露一個方法——發送RPC請求,然后阻塞直到獲得結果。


   
   
  
  
          
  1. FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
  2. String result = fibonacciRpc.call( "4");
  3. System.out.println( "fib(4) is " + result);

2. Callback queue(回調隊列)

一般在RabbitMQ中做RPC是很簡單的。客戶端發送請求消息,服務器回復響應的消息。為了接受響應的消息,我們需要在請求消息中發送一個回調隊列。可以用默認的隊列:


   
   
  
  
          
  1. BasicProperties props = new BasicProperties
  2. .Builder()
  3. .replyTo(callbackQueueName)
  4. .build();
  5. channel.basicPublish( "", "rpc_queue", props, message.getBytes());
  6. // ... then code to read a response message from the callback_queue ...

3. Message properties(消息屬性)

AMQP協議為消息預定義了一組14個屬性。大部分的屬性是很少使用的。除了一下幾種:

  1. deliveryMode:標記消息傳遞模式,2-消息持久化,其他值-瞬態。在第二篇文章中還提到過。
  2. contentType:內容類型,用於描述編碼的mime-type。例如經常為該屬性設置JSON編碼。
  3. replyTo:應答,通用的回調隊列名稱
  4. correlationId:關聯ID,方便RPC響應與請求關聯
我們需要添加一個新的導入:

import com.rabbitmq.client.AMQP.BasicProperties;  
  
  
 
 
         

4. Correlation Id

在上述方法中為每個RPC請求創建一個回調隊列。這是很低效的。幸運的是,一個解決方案:可以為每個客戶端創建一個單一的回調隊列
新的問題被提出,隊列收到一條回復消息,但是不清楚是那條請求的回復。這是就需要使用correlationId屬性了。我們要為每個請求設置唯一的值。然后,在回調隊列中獲取消息,看看這個屬性,關聯response和request就是基於這個屬性值的。如果我們看到一個未知的correlationId屬性值的消息,可以放心的無視它——它不是我們發送的請求。
你可能問道,為什么要忽略回調隊列中未知的信息,而不是當作一個失敗?這是由於在服務器端競爭條件的導致的。雖然不太可能,但是如果RPC服務器在發送給我們結果后,發送請求反饋前就掛掉了,這有可能會發送未知correlationId屬性值的消息。如果發生了這種情況,重啟RPC服務器將會重新處理該請求。這就是為什么在客戶端必須很好的處理重復響應,RPC應該是冪等的

5. 實現


我們的RPC的處理流程:

  1. 當客戶端啟動時,創建一個匿名的回調隊列
  2. 客戶端為RPC請求設置2個屬性:replyTo:設置回調隊列名字;correlationId:標記request
  3. 請求被發送到rpc_queue隊列中。
  4. RPC服務器端監聽rpc_queue隊列中的請求,當請求到來時,服務器端會處理並且把帶有結果的消息發送給客戶端。接收的隊列就是replyTo設定的回調隊列。
  5. 客戶端監聽回調隊列,當有消息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。

RPC服務器端(RPCServer.java)


   
   
  
  
          
  1. /**
  2. * RPC服務器端
  3. *
  4. * @author arron
  5. * @date 2015年9月30日 下午3:49:01
  6. * @version 1.0
  7. */
  8. public class RPCServer {
  9. private static final String RPC_QUEUE_NAME = "rpc_queue";
  10. public static void main(String[] args) throws Exception {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. // 設置MabbitMQ所在主機ip或者主機名
  13. factory.setHost( "127.0.0.1");
  14. // 創建一個連接
  15. Connection connection = factory.newConnection();
  16. // 創建一個頻道
  17. Channel channel = connection.createChannel();
  18. //聲明隊列
  19. channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
  20. //限制:每次最多給一個消費者發送1條消息
  21. channel.basicQos( 1);
  22. //為rpc_queue隊列創建消費者,用於處理請求
  23. QueueingConsumer consumer = new QueueingConsumer(channel);
  24. channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
  25. System.out.println( " [x] Awaiting RPC requests");
  26. while ( true) {
  27. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  28. //獲取請求中的correlationId屬性值,並將其設置到結果消息的correlationId屬性中
  29. BasicProperties props = delivery.getProperties();
  30. BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
  31. //獲取回調隊列名字
  32. String callQueueName = props.getReplyTo();
  33. String message = new String(delivery.getBody(), "UTF-8");
  34. System.out.println( " [.] fib(" + message + ")");
  35. //獲取結果
  36. String response = "" + fib(Integer.parseInt(message));
  37. //先發送回調結果
  38. channel.basicPublish( "", callQueueName, replyProps,response.getBytes());
  39. //后手動發送消息反饋
  40. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  41. }
  42. }
  43. /**
  44. * 計算斐波列其數列的第n項
  45. *
  46. * @param n
  47. * @return
  48. * @throws Exception
  49. */
  50. private static int fib(int n) throws Exception {
  51. if (n < 0)
  52. throw new Exception( "參數錯誤,n必須大於等於0");
  53. if (n == 0)
  54. return 0;
  55. if (n == 1)
  56. return 1;
  57. return fib(n - 1) + fib(n - 2);
  58. }
  59. }
RPC客戶端(RPCClient.java):


   
   
  
  
          
  1. /**
  2. *
  3. * @author arron
  4. * @date 2015年9月30日 下午3:44:43
  5. * @version 1.0
  6. */
  7. public class RPCClient {
  8. private static final String RPC_QUEUE_NAME = "rpc_queue";
  9. private Connection connection;
  10. private Channel channel;
  11. private String replyQueueName;
  12. private QueueingConsumer consumer;
  13. public RPCClient() throws Exception {
  14. ConnectionFactory factory = new ConnectionFactory();
  15. // 設置MabbitMQ所在主機ip或者主機名
  16. factory.setHost( "127.0.0.1");
  17. // 創建一個連接
  18. connection = factory.newConnection();
  19. // 創建一個頻道
  20. channel = connection.createChannel();
  21. //聲明隊列
  22. channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
  23. //為每一個客戶端獲取一個隨機的回調隊列
  24. replyQueueName = channel.queueDeclare().getQueue();
  25. //為每一個客戶端創建一個消費者(用於監聽回調隊列,獲取結果)
  26. consumer = new QueueingConsumer(channel);
  27. //消費者與隊列關聯
  28. channel.basicConsume(replyQueueName, true, consumer);
  29. }
  30. /**
  31. * 獲取斐波列其數列的值
  32. *
  33. * @param message
  34. * @return
  35. * @throws Exception
  36. */
  37. public String call(String message) throws Exception{
  38. String response = null;
  39. String corrId = java.util.UUID.randomUUID().toString();
  40. //設置replyTo和correlationId屬性值
  41. BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
  42. //發送消息到rpc_queue隊列
  43. channel.basicPublish( "", RPC_QUEUE_NAME, props, message.getBytes());
  44. while ( true) {
  45. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  46. if (delivery.getProperties().getCorrelationId().equals(corrId)) {
  47. response = new String(delivery.getBody(), "UTF-8");
  48. break;
  49. }
  50. }
  51. return response;
  52. }
  53. public static void main(String[] args) throws Exception {
  54. RPCClient fibonacciRpc = new RPCClient();
  55. String result = fibonacciRpc.call( "4");
  56. System.out.println( "fib(4) is " + result);
  57. }
  58. }

這里的例子只是RabbitMQ中RPC服務的一個實現,你也可以根據業務需要實現更多。rpc有一個優點,如果一個RPC服務器處理不來,可以再增加一個、兩個、三個。我們的例子中的代碼還比較簡單,還有很多問題沒有解決:
  • 如果沒有發現服務器,客戶端如何處理?
  • 如果客戶端的RPC請求超時了怎么辦?
  • 如果服務器出現了故障,發生了異常,是否將異常發送到客戶端?
  • 在處理消息前,怎樣防止無效的消息?檢查范圍、類型?

以上內容轉載自網絡,詳見開頭轉載聲明。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM