### 准備
## 目標
了解 Spring AMQP 如何實現異步消息投遞(推模式)
## 前置知識
《RabbitMQ入門_05_多線程消費同一隊列
》
## 相關資源
Quick Tour for the impatient:<
http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#async-consumer>
Sample code:<
https://github.com/gordonklg/study>,rabbitmq module
源碼版本:Spring AMQP 1.7.3.RELEASE
## 測試代碼
gordon.study.rabbitmq.springamqp.AsyncConsumer.java

### 分析
## MessageListener
org.springframework.amqp.core.MessageListener 是 Spring AMQP 異步消息投遞的監聽器接口,它只有一個方法 onMessage,用於處理消息隊列推送來的消息。
MessageListener 大概對應 amqp client 中的 Consumer 類。onMessage 方法大概對應 Consumer 類的 handleDelivery 方法。
從這也可以看出,Spring AMQP 的 Message 類至少包含 consumer tag、envelope、basic properties 和 message body 等信息
## MessageListenerContainer
org.springframework.amqp.rabbit.listener.MessageListenerContainer 可以看作 message linstener 的容器。但這個 Container 的語義並不是指它包含多個 message listener,實際上從方法注釋和實現代碼可以看出,
MessageListenerContainer 只包含一個 MessageListener 。那 Container 的語義是什么呢?
一方面,Container 是指
雖然只有一個 MessageListener 指定消息消費的邏輯,但是可以生成多個線程使用相同的
MessageListener
同時消費消息。代碼第19行 setConcurrentConsumers 方法就是用來指定並發消費者的數量。可以把 MessageListenerContainer 看成下圖中的 Subscriber group
另一方面,Container
代表生命周期管理的職責。MessageListener 僅僅實現消息消費邏輯,而整個消息消費何時開始、何時結束、如何設置、狀態怎樣等等問題全都是由 MessageListenerContainer(及其實現類)負責的。實際上,
MessageListenerContainer
繼承自
SmartLifecycle 接口,該接口是 Spring 容器提供的與生命周期管理相關的接口,實現該接口的類一般情況下會由 Spring 容器負責啟動與停止。由於本例沒有啟用 Spring 容器環境,所以代碼第26行需要主動調用 start 方法,消息消費才會開始執行。
## 內部實現思路
我們知道,amqp client 中的 Consumer 接口實際上只定義了回調方法,我們在回調方法(主要是
handleDelivery
方法
)中實現自己的業務邏輯(對消息的消費)。Consumer 接口的回調方法實際上是在一個獨立線程中執行的,當我們調用 Channel 的 basicConsume 方法時,amqp client 會創建線程處理消息、創建隊列緩存從 broker 推送來的消息。然而這些內部實現並沒有暴露出來,導致 Spring AMQP 必須自己重新編寫一套類似的實現以獲得最大的靈活度。
按照前面的分析,我們可以想象 Spring AMQP 為了實現自己的 message listener,需要哪些組件:
- MessageListenerContainer 的實現類,即 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer。它作為整個異步消息投遞的核心類存在。
- 因為 MessageListenerContainer 實際上管理了一個消費者線程組,因此需要相關線程類與線程調度類。Spring AMQP 中該線程類為 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer,調度類當然就是 SimpleMessageListenerContainer,其 start 方法會啟動線程
- 消息隊列推送過來的消息需要一個本地隊列緩存。
- 需要實現 amqp client 的 Consumer 接口。在該接口實現類中,我們簡單的把消息放到本地隊列中。org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer 負責這件事
- 根據單一職責原則,線程類只負責異步消費者的創建與(無限循環)消息消費;InternalConsumer 只負責實現 amqp client 的 Consumer 接口,與 amqp client 原生的異步消息投遞實現對接,將消息放入本地隊列。那么,我們還需要一個真正的異步消費者模型,用來管理消費行為與狀態。org.springframework.amqp.rabbit.listener.BlockingQueueConsumer 承擔這部分責任。從名字可以看出,BlockingQueueConsumer 采用 BlockingQueue 作為本地隊列緩存消息。
- 用戶的業務邏輯是在 MessageListener 接口中實現的,框架的主要處理過程為:創建合適的連接與信道,從 amqp client 中獲取消息暫存到本地緩存,從本地緩存讀取消息並調用 MessageListener 接口的 onMessage 方法消費消息。
## 內部流程分析
SimpleMessageListenerContainer 的
start 方法會根據
int concurrentConsumers 的值創建對應數量的
BlockingQueueConsumer 實例,並放入
Set<BlockingQueueConsumer> consumers 中。接着為每一個
BlockingQueueConsumer
創建對應的消息處理線程
AsyncMessageProcessingConsumer(實現了 Runnable 接口),並通過 Executor taskExecutor = new SimpleAsyncTaskExecutor()
這個自實現的線程池啟動每一個 AsyncMessageProcessingConsumer 線程。最后通過判斷每一個 AsyncMessageProcessingConsumer 的 FatalListenerStartupException startupException 屬性是否有值來判斷 SimpleMessageListenerContainer 是否正常啟動了所有的消息監聽器。
構建 BlockingQueueConsumer 的構造函數參數很多,其中 ConnectionFactory 是代碼第17行創建的 Caching
ConnectionFactory,AcknowledgeMode 默認為 AUTO
。
org.springframework.amqp.core.AcknowledgeMode
定義了三種確認模式:
- NONE:不確認,相當於 amqp client 中 Channel.basicConsume 方法中 autoAck 參數值設為 true
- MANUAL:用戶通過 channel aware listener 手動控制消息確認
- AUTO:Spring AMQP 框架根據 message listener 的 onMessage 執行過程中是否拋出異常來決定是否確認消息消費
AsyncMessageProcessingConsumer 的 run 方法比較復雜,粗略解讀一下
- 調用 BlockingQueueConsumer 的 start 方法(不是 Runnable 接口)。
- start 方法先通過 ConnectionFactoryUtils.getTransactionalResourceHolder 靜態方法創建出供該線程使用的 channel,該方法返回類型是 RabbitResourceHolder。這部分代碼涉及到事務,很復雜,但是本文的測試代碼不涉及事務,目前只要了解多個 AsyncMessageProcessingConsumer 會生成多個 RabbitResourceHolder 實例,但是由於使用了 CachingConnectionFactory 的默認緩存模式,所以這些 RabbitResourceHolder 實例共用同一個(AMQP)連接,每個 AsyncMessageProcessingConsumer 獨享該連接創建的一個(AMQP)信道即可
- start 方法接着創建 InternalConsumer 實例,並調用剛創建的 AMQP 信道的 basicQos 和 basicConsume 方法開始接受消息。這樣,當隊列接受到消息時,amqp client 會主動調用 InternalConsumer 的 handleDelivery 方法。該方法調用 BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body)); 將消息放到 BlockingQueueConsumer 的 BlockingQueue<Delivery> queue 中。org.springframework.amqp.rabbit.support.Delivery 類封裝了 amqp client 通過 handleDelivery 方法回送過來的所有參數。
有兩個細節值得說一下:第一,BlockingQueueConsumer 可以同時消費多個隊列,對每個隊列,都會調用 basicConsume 方法讓 InternalConsumer 監聽當前隊列(即同一個信道,同一個 Consumer ,不同的隊列);其二,可以通過 ConsumerTagStrategy tagStrategy 設定 Tag 命名規則。 - 接着,在 while 循環中調用 SimpleMessageListenerContainer 的 receiveAndExecute 方法,不停的嘗試從 queue 中獲取 Delivery 實例,將之轉化為 Message,然后執行 MessageListener 的 onMessage 回調方法。
- 如果執行成功,則調用 AMQP 信道的 basicAck 方法確認消息消費成功。
- 如果執行過程中發生異常,則將異常轉化為 ListenerExecutionFailedException 拋出。默認情況下,Spring AMQP 處理用戶自定義異常的邏輯非常簡單:調用 AMQP 信道的 basicReject 方法將消息退回隊列,打印 warning 級別的日志,但不會打破 AsyncMessageProcessingConsumer 線程的 while 循環,消息消費繼續進行。這部分內容下篇文章分析。
