Spring AMQP 源碼分析 07 - MessageListenerAdapter


### 准備

## 目標

了解 Spring AMQP 如何用 POJO 處理消息

## 前置知識

《Spring AMQP 源碼分析 04 - MessageListener

## 相關資源

Sample code:< https://github.com/gordonklg/study>,rabbitmq module
源碼版本:Spring AMQP 1.7.3.RELEASE

## 測試代碼

gordon.study.rabbitmq.springamqp.AsyncConsumerWithAdapter.java
 

### 分析

## MessageListenerAdapter

MessageListenerAdapter 利用反射機制使普通的 POJO 就能處理消息。
 
MessageListenerAdapter 本身實現了  ChannelAwareMessageListener 接口,整個邏輯的核心就在 onMessage 方法中。
 
 
第269行獲取實際處理消息的對象 delegate,本例中即為  CommonPrintBean 實例。
 
接下來判斷 delegate 是否為  MessageListener  或  ChannelAwareMessageListener  接口,如果是,則調用 onMessage 方法處理。也就是說,MessageListenerAdapter 的委托實例可以 是 MessageListener 或 ChannelAwareMessageListener
對於本例這種 POJO 委托類,第288行先抽取消息。 extractMessage 方法會嘗試獲取 MessageConverterMessageListenerAdapter   默認的消息轉化器是 SimpleMessageConverter。如果存在 MessageConverter,則調用其 fromMessage 方法將消息轉化為對象。否則直接返回 Message 本身。注意,Spring AMQP 默認的 SimpleMessageConverter 很容易坑人,請在腦海中留下印象:消息在被對應的方法消費前,會被 MessageConverter 做一次轉換!
 
第289行,根據原始的 message 信息,通過 getListenerMethodName 方法確定該消息應該被哪個方法消費。核心屬性是  MessageListenerAdapter 的  Map<String, String> queueOrTagToMethodName,其 key 為隊列名或 consumer tag,值為方法名。也就是說, 我們可以為不同的隊列設置不同的方法,也可以為不同的 Consumer 設置不同的方法如果沒有匹配的方法,則使用默認方法 handleMessage
 
第297行,根據 convertedMessage 創建參數列表。 MessageListenerAdapter  創建的參數列表永遠是長度為1的數組,也就是說,POJO 中合理的消息處理方法必然都是只有一個參數的
 
第298行,利用反射機制調用對應方法消費消息。顯然, convertedMessage   的類型決定了反射會調用哪個同名方法。
 

## 示例代碼分析

示例代碼中  CommonPrintBean 提供了三個不同的 printMessage 方法。考慮到默認使用  SimpleMessageConverterconvertedMessage 類型為 String,所以會調用 String 參數版本的   printMessage 方法。
 
如果打開第22行注釋,將 MessageConverter 設置為 null,則會調用 Message 參數版本的  printMessage 方法。
 
一般來說,不會用到 Object 參數版本的   printMessage 方法,但是提供這個方法可以確保在 MessageListenerAdapter 的委托 POJO 中一定能夠找到消息處理方法(打個錯誤日志也好)。
 

## 異常分析

業務異常與直接使用 MessageListener 接口完全一致。代碼第45行拋出的 AmqpRejectAndDontRequeueException 異常會引導框架拒絕消息並使之不重新入隊。
 
如果期望的消息消費方法不存在,會拋出被 ListenerExecutionFailedException 包裝的 NoSuchMethodException,由於  NoSuchMethodException  是 DefaultExceptionStrategy 的 fatal 異常,因此異常會被  AmqpRejectAndDontRequeueException 再次包裝。AsyncMessageProcessingConsumer 的 run 方法循環消費消息邏輯中,遇到  AsyncMessageProcessingConsumer  直接靜默處理。所以,如果沒有對應的方法,框架最終會把所有的消息都轉到死信隊列中去
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM