RabbitMQ 消息隊列入門


前言

中間件

消息隊列

  • 異步處理,注冊完發短信
  • 應用解耦,訂單接口調用扣庫存接口,失敗了怎么辦?
  • 流量削峰,大量請求到達業務接口,這不行!
  • 日志處理,每個業務代碼都調用一下寫日志的方法嗎?結合AOP思想,業務程序為什么要關心寫日志的事情?
  • 消息通訊等,ABC處在聊天室里面,一起聊天?foreach嗎?

官網有7個入門教程,過了一遍,做個筆記。

正文

HelloWorld

概述

RabbitMQ,是個消息代理人message broker。它接收存儲轉發消息。

幾個常用的術語:

  1. 生產者Producer,生產發送消息。
  2. 消費者Consumer,接收消息。
  3. 隊列Queue,只受系統內存和硬盤大小限制。存儲消息,生產者往隊列里面發送,消費者監聽讀取。

這幾個對象可以分布在不同的機器。

img

使用Client

P和C的角色。maven倉庫包為amqp-clientslf4j-nop

<dependencies> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-nop --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.30</version> </dependency> <endencies> 

發送

也就是Producer.java

public class Send { private static final String QUEUE_NAME = "hello1"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("xxx.xxx.xxx.xxx"); factory.setPort(5672); factory.setUsername("full_access"); factory.setPassword("111111"); factory.setVirtualHost("test_host1"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello world"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(Charset.forName("utf-8"))); System.out.println(" [x] Sent '" + message + "'"); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } 

Connection 和Channel 都實現了ICloseable接口,所以可以使用try(...)接口自動釋放資源。Channel是我們要經常使用的API對象。channel.queueDeclare是冪等的,只有在沒有的情況下才會創建。然后調用basicPublish方法,往隊列發送字節數組消息。

接收

Consumer,Receiver.java

public class Receiver { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("xxx.xxx.xxx.xxx"); factory.setPort(5672); factory.setUsername("full_access"); factory.setPassword("111111"); factory.setVirtualHost("test_host1"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = ((consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); System.out.println(" [x] Received [" + msg + "]"); }); channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } } 

connection和 channel 對象沒有使用try-with-resource自動釋放,factory.newConnection()之后程序就會保持運行,調用basicConsume方法來消費得到的消息。該方法第二個autoAck參數寫了false,這樣,消息就屬於未確認的狀態,每次啟動都會重復收到。

Work Queue 工作隊列

消息產生的速度大於消費的速度,該怎么辦?

每個http請求的時間不宜過長,所以可以把內部耗時的方法做成異步,然后用回調callback的方式實現。換個角度說就是consumer里面有比較耗時的任務,可以用thread.sleep()模擬一下。

DeliverCallback deliverCallback = ((consumerTag, message) -> {
    String msg = new String(message.getBody(), "UTF-8"); int i = new Random().nextInt(5); try { Thread.sleep(i * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(msg + "休眠了" + i + "秒"); }); 

不過這不是這節的重點,這里的重點是幾個參數。

消息確認

首先設置一下每次接收的消息數,每次一個channel.basicQos(1);。在客戶端沒有確認之前不會接收新的消息。channel.basicConsume方法的第二個參數autoAck表示自動確認。消息有兩種狀態,ready和unacked的。消息發送到queue→ready→consumer消費,但不確認→unacked→確認→結束,等待下一個。

public class NewTask { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("x.x.x.x"); factory.setPort(5672); factory.setUsername("full_access"); factory.setPassword("111111"); factory.setVirtualHost("test_host1"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1);//一次接收一個消息 DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); int i = new Random().nextInt(2); try { Thread.sleep(i * 1000); channel.basicAck(message.getEnvelope().getDeliveryTag(), false);//只確認這個tag對應的消息 System.out.println(msg + "執行了" + i + "秒,consumerTag=" + consumerTag + "並發送了確認"); } catch (InterruptedException e) { e.printStackTrace(); } }; boolean autoAck = false;//不自動確認 String str = channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { System.out.println(consumerTag); }); } } 

Message Durablity

Redis類似。

確保消息不丟。也就是確保未消費的消息在服務器意外宕機重啟之后消息不丟。RabbitMQ會以一定間隔把消息寫入磁盤,但不是實時【所以還是有一個短的時間間隔會產生消息的丟失情況】。為了解決這個問題,需要兩個配置

  1. 定義queue的時候設置durable參數為true。rabbitmq不允許queue name相同其他參數不同的兩個隊列,所以可以先刪以前的。

    boolean durable = true; channel.queueDeclare("hello", durable, false, false, null); 
  2. 發送的時候設置MessageProperties屬性。

    channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN,//持久化為文本 message.getBytes()); 

Fair Dispatch 公平分發

RabbitMQ的默認推送策略是把第N個消息推送給第N個客戶端,他不會管一個客戶端是否還有沒確認的消息,所以可能會導致某個客戶端非常的忙。解決方案:

調用basicOps設置prefetchCount為1,這樣一個客戶端在沒有確認當前消息之前不會收到下一個消息。

Publish/Subscribe 發布/訂閱

一次性給所有的Consumer發送消息

回顧一下前面的例子,基本的代碼流程是

  1. 創建ConnectionFactory→設置參數→創建Connection→創建Channel
  2. Producer聲明QueueName,往Exchanges=”“發送消息
  3. Consumer指定相同的QueueName,設置消息處理函數,讀取數據,發送確認。

Exchanges

RabbitMQ中有Exchange的概念。消息實際上不會直接發送給Queue,而是給Exchange,然后通過exchange轉發給queue,然后給Consumer消費。exchange為空字符表示系統內部默認的exchange。

X

[root@test]~# rabbitmqctl list_exchanges -p test_host1 Listing exchanges for vhost test_host1 ... name type amq.fanout fanout amq.direct direct amq.match headers amq.rabbitmq.trace topic direct amq.headers headers amq.topic topic 

amp.* 為系統自帶的exchange。

管理界面查看

pic

ExchangeType

Type決定一個Exchange怎么處理接收到的消息,廣播到所有隊列或者推送到特定的隊列或者直接丟棄消息。

內置的ExchangeType枚舉

public enum BuiltinExchangeType { DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers"); //...省略其他 } 

fanout

顧名思義,是一種廣播的處理方式,會發送到所有的queue。看個demo。

先看Send

//Send.java public class Send { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("x.x.x.x"); factory.setPort(5672); factory.setUsername("full_access"); factory.setPassword("111111"); factory.setVirtualHost("test_host1"); try (final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//Exchange聲明 final String msg = String.valueOf(LocalDateTime.now()); channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(Charset.defaultCharset()));//第二個routingKey留空待定 System.out.println("發送" + msg); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } 

定義一個FANOUT類型的Exchange,沒有定義Queue。調用basicPublish發送消息。

再看Receiver.java

//Receiver.java public class Receiver { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("x.x.x.x"); factory.setPort(5672); factory.setUsername("full_access"); factory.setPassword("111111"); factory.setVirtualHost("test_host1"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); final String queueName = channel.queueDeclare().getQueue();//獲取自動生成的queue, channel.queueBind(queueName, EXCHANGE_NAME, "");//綁定,最后一個參數待定 channel.basicConsume(queueName, true, (consumerTag, message) -> { final String s = new String(message.getBody(), Charset.defaultCharset()); System.out.println("收到" + s); }, consumerTag -> { }); } } 

定義一個和Send相同的Exchange。獲取創建的channel對應的系統自動生成的queue(結束之后會自動刪除,避免系統有太多隊列)。綁定queue和exchange。RabbitMQ會丟棄消息如果這個exchange下面沒有綁定queue的話。

queues

運行多個Receiver實例。因為ExchangeType是fanout,所以,每個實例都會收到廣播的消息。

對比前面例子中的默認Exchange,一個消息,發送到一個Exchange(默認的空字符串),因為queuename指定了是同一個,所以,只會有一個client收到消息。

而這個例子中,queue是自動生成的,所以會有多個自動刪除的queue,一個queue對應一個client。ExchangeType是fanout,所以,每個client都會收到。

Routing

有選擇性的接收消息

前面例子使用了fanout廣播的方式來發布消息,一條消息會被推送到所有的隊列,又因為隊列是自動生成的,一個隊列對應一個consumer,所以所有的consumer都會收到所有的消息。這無法實現某個consumer只關心某種類型的消息的需求。所以,這里引入exchangetype=direct的例子。

name 相同,type不同的exchange不合法,可以先在rabbitmq的管理平台界面刪除原先的exchange。

Binding

回顧前面的代碼。publish和queue綁定的時候都留空了routingKey參數。

Send.java

pic1

Consumer.java

pic2

Consumer的queueBind 和 Producer的basicPublish中routingKey需要匹配。fanout類型的exchange會忽略routingKey參數,所以我們直接留空。

direct Exchange

fanout的消息分發不太靈活,所以這里使用direct的Exchange。看下圖,如果Producer產生的routingKey為orange,那么只會發送給Q1,那么只有C1會收到消息。如果routingKey為black或者green,那么C2會收到消息。

pic

Multiple Bindings

多個隊列綁定同一個routingKey也是合理。下面的例子Q1和Q2都會收到black的消息,這種綁定本質上就退化成了一種前面的fanout Exchange。

pic

Demo

場景:Producer產生3種routingKey的message,Info,Error,Fault。定義兩個Consumer,C1接收Info的message,C2接收Error和Fault。

//Send.java public class Send { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("x.x.x.x");//省略其他設置 try (final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//指定exchange的類型 String messageType = args[0];//傳入routingKey String msg = messageType + " message" + LocalDateTime.now(); channel.basicPublish(EXCHANGE_NAME, messageType, null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("發送了" + msg); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } 
//Receiver.java public class Receive { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("x.x.x.x"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); final String queueName = channel.queueDeclare().getQueue(); for (String arg : args) {//遍歷所有的routingKey,綁定所有當前queue System.out.println("綁定routingKey:" + arg); channel.queueBind(queueName, EXCHANGE_NAME, arg); } channel.basicConsume(queueName, true, ((consumerTag, message) -> { final String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("收到" + msg); }), consumerTag -> { }); } } 

創建5個啟動配置,3個為Send,分別發送Info,Fault,Error消息;2個Receiv,第一個接收Info,第二個接收Error和Fault。

All Configuration

RevErrorFault

最終效果

pic

pic

pic

Topic

通過pattern模式來指定接收的消息。

前面例子使用的ExchangeType為direct,相對於fanout,是靈活了一些,但是還是有一些缺點,比如無法組合條件。比如有個consumer關心所有的error消息以及和a相關的info消息。這里就可以使用Topic的Exchange。然后都是通過routingKey參數來指定。

通配符

* 星號,代表一個詞

# 井號,代表0個或多個詞(包括一個)

以點分隔,組成routingKey。比如*.a.b.#

如果設置BuiltinExchangeType.TOPIC的exchangeType,但是沒有使用通配符,那么就和BuiltinExchangeType.DIRECT是一樣的。

未匹配任何模式的消息會被丟棄。

關鍵代碼

聲明exchange

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

發送消息

if (args[0].equals("info")) { s = "a's info message"; channel.basicPublish(EXCHANGE_NAME, "a.info", null, s.getBytes(StandardCharsets.UTF_8)); } else { s = "xxx.yyy's error message"; channel.basicPublish(EXCHANGE_NAME, "xxx.yyy.error", null, s.getBytes(StandardCharsets.UTF_8)); } 

使用通配符接收消息

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
final String queueName = channel.queueDeclare().getQueue();//臨時的queue String routingKey = args[0];//傳入的參數 比如*.info 或 #.error來匹配 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); System.out.println("綁定" + routingKey + "的隊列"); channel.basicConsume(queueName, true, ((consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("收到消息" + msg); }), consumerTag -> {}); 

RPC 遠程調用

遠程過程調用。

Client 調用 Server的服務。Client發送消息,Server消費消息。Server計算結果,發布一個消息到對應的隊列。Client消費隊列里面的消息。這一個過程Client和Server都是雙重身份。這個是和其他最主要的區別。

pic

關於RPC

RPC是一種常見的模式,但也存在一些爭議,這主要體現在如果開發者有意或無意的不去注意這是一個本地的方法還是比較耗時的遠程方法。RPC也增加了系統的調試復雜度。

開發RPC的幾個建議:

  1. 確保方法容易辨識是遠程還是本地
  2. 做好文檔
  3. 處理調用時候的異常

回調隊列

Client需要Server的計算結果,所以需要在消息里面帶上CallbackQueueName。根據AMQP 0-9-1協議,定義了14個屬性,除了4個比較常用,其他都很少用。

  • deliveryMode 設置消息的持久化,第二個例子中用過。
  • contentType 設置內容的mime-type ,建議application/json
  • replyTo 回復隊列名
  • correlationId 關聯id 因為消息是異步的,所以可以給每個消息帶上個id,用來關聯發送的消息。
final AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() .correlationId("uuid") .replyTo("xxx") .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); 

簡易版Client

public class Client { private static final String RPC_QUEUE_NAME = "rpc_queue";//rpc調用的queue,往里面發rpc調用參數 public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("x.xx.x.x"); factory.setPort(5672); factory.setUsername("full_access"); factory.setPassword("111111"); factory.setVirtualHost("test_host1"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); String msg = String.valueOf(3);//模擬調用參數 final String replyQueueName = channel.queueDeclare().getQueue(); String corrId = UUID.randomUUID().toString(); final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .replyTo(replyQueueName)//回復的隊列 .correlationId(corrId)//當前消息的uuid .build(); channel.basicPublish("", RPC_QUEUE_NAME, properties, msg.getBytes(StandardCharsets.UTF_8));//廣播的方式往rpc queue發布消息 System.out.println("發送計算[" + msg + "]的消息"); //等待消息回復 channel.basicConsume(replyQueueName, true, (consumerTag, message) -> { String revCorrId = message.getProperties().getCorrelationId(); if (corrId.equals(revCorrId)) {//拿到了回復 final String result = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("發送" + msg + "得到回復" + result); } else { System.out.println("收到correlationId:" + revCorrId); } }, consumerTag -> { }); } } 

channel.queueDeclare()用來聲明一個臨時隊列,為接收返回結果的隊列。代碼中只發布了一個計算請求,所以basicConsume中corrId判斷其實沒有必要。正常情況下可以在當前臨時隊列發布多個計算請求,每個的計算結果都傳入到當前的臨時隊列,所以需要判斷corrId的匹配情況。

簡易版Server

public class Server { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("hello from server"); final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("x.x.x.x"); factory.setPort(5672); factory.setUsername("full_access"); factory.setPassword("111111"); factory.setVirtualHost("test_host1"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);//聲明非排他的隊列,用來消費rpc_queue里面的計算請求。 channel.basicConsume(RPC_QUEUE_NAME, true,//自動回復,后面就不需要手動ack (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); String replyMsg = new String(msg + "Result");//簡單模擬計算結果。 System.out.println("收到" + msg + "開始計算,計算完成結果為:[" + replyMsg + "]"); //拿到需要回復properties String replyQueueName = message.getProperties().getReplyTo(); String correlationId = message.getProperties().getCorrelationId(); AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() .correlationId(correlationId)//correlationId返回去。 .build(); // 把計算結果發回去 channel.basicPublish("", replyQueueName, replyProps, replyMsg.getBytes(StandardCharsets.UTF_8)); }, consumerTag -> { }); } } 

消費RPC_QUEUE_NAME的計算請求,然后根據消息里面帶的getReplyTo()的值返回給客戶端。

Publisher Confirm 發布確認

可靠發布

啟用生產者確認

根據AMQP 0.9.1協議,這個確認默認是沒有啟用的,可以通過confirmSelect方法啟用。

Channel channel = connection.createChannel();
channel.confirmSelect();

這個方法是針對channel的,不是針對每個消息,所以,只要 在開啟channel之后調用一次就好。

確認每個消息

先是一個簡單的例子,每發完一個消息,都讓系統確認等待一下。

while (thereAreMessagesToPublish()) { byte[] body = ...; BasicProperties properties = ...; channel.basicPublish(exchange, queue, properties, body); // 5秒超時 channel.waitForConfirmsOrDie(5_000); } 

每次發完一個消息,都等待最多5秒鍾的一遍確認。這個很明顯會極大的影響系統的吞吐率。

批量確認

發送一個確認一個明顯會比較low,所以這里引入一種批量確認的方式。不過這只是一種自己業務代碼的確認機制,不是rabbitmq提供的。

int batchSize = 100;// int outstandingMessageCount = 0; while (thereAreMessagesToPublish()) { byte[] body = ...; BasicProperties properties = ...; channel.basicPublish(exchange, queue, properties, body); outstandingMessageCount++;//發送一個加1 if (outstandingMessageCount == batchSize) { ch.waitForConfirmsOrDie(5_000);//到達batchSize之后確認 outstandingMessageCount = 0; } } if (outstandingMessageCount > 0) { ch.waitForConfirmsOrDie(5_000);//確認剩下的 } 

這種確認吞吐量是上來了,不過最大的問題是當confirm出問題了之后是無法定位到具體哪個有問題。

ConcurrentSkipListMap 和 channel.getNextPublishSeqNo()

channel.getNextPublishSeqNo可以獲取發布的消息的下一個序號,有序遞增。ConcurrentSkipListMap有一個heapMap方法,可以返回key小於等於param的map子集。在發布消息之前先獲取序號,作為key放到map里面。

map.put(nextPublishSeqNo, byteMsg);
channel.basicPublish("", queueName, null, msgStr.substring(i, i + 1).getBytes(StandardCharsets.UTF_8)); 

異步確認

Producer只管發消息,然后注冊一個異步回調函數。rabbitmq提供了兩個回調函數。一個是發送成功的回調,一個是發送失敗的回調。兩個函數的參數是一樣的,兩個。

  • sequence number 序號。表示成功/失敗的消息編號
  • multiple 布爾值。false表示只有一個被確認。true表示小於等於當前序號的消息發送成功/失敗
channel.confirmSelect();//啟用消息確認 channel.addConfirmListener( (deliveryTag, multiple) -> { if (multiple) { System.out.println("序號" + deliveryTag + "的信息發送成功"); map.remove(deliveryTag); } else { System.out.println("序號小於" + deliveryTag + "的信息發送成功"); final ConcurrentNavigableMap<Long, Byte> confirmed = map.headMap(deliveryTag, true); confirmed.clear(); } }, (deliveryTag, multiple) -> { if (!multiple) { System.out.println("發送失敗的信息sequence number:" + deliveryTag); } else { System.out.println("序號小於" + deliveryTag + "的消息發送失敗"); } });


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM