Spring AMQP 源碼分析 02 - CachingConnectionFactory


### 准備

## 目標

了解 CachingConnectionFactory 在默認緩存模式下的工作原理
 

## 前置知識

 
《Spring AMQP 源碼分析 01 - Impatient》
 

## 測試代碼

同 《 Spring AMQP 源碼分析 01 - Impatient》
 

### 分析

## 流程分析

從 《 Spring AMQP 源碼分析 01》 可知,在 RabbitTemplate 的    execute(ChannelCallback action, ConnectionFactory connectionFactory)  方法中需要創建與銷毀連接和信道。execute 方法調用 doExecute 方法完成相關邏輯,代碼如下:
 
 
核心邏輯很簡單,第1430行通過  CachingConnectionFactory 的 createConnection 方法創建 org.springframework.amqp.rabbit.connection.Connection,第1435行通過 Connection 的 createChannel 方法創建 com.rabbitmq.client.Channel,第1455行將創建的 channel 回傳給回調函數,執行業務操作。最后在 finally 塊中釋放信道和連接(不在截圖中)。
 

## 創建連接

在看代碼前先了解一下  CachingConnectionFactory  的功能。默認情況下(緩存模式是  CacheMode.CHANNEL ), CachingConnectionFactory  的  createConnection 方法 總是返回同一個連接 通過連接獲取的信道也是會被緩存的 ,但是緩存的細節與文檔描述不一致,以實際代碼為准
 
CachingConnectionFactory  有一個 屬性 ChannelCachingConnectionProxy connection,在緩存模式為  CacheMode.CHANNEL 時,用於緩存唯一的連接。 ChannelCachingConnectionProxy   包含兩個屬性  org.springframework.amqp.rabbit.connection.Connection target 和 AtomicBoolean closeNotified,target 代表真實的連接。ChannelCachingConnectionProxy 是 org.springframework.amqp.rabbit.connection.Connection 的代理類,根據代理模式的定義,它也實現了 Connection 接口
 


CachingConnectionFactory  的 createConnection 方法會返回  ChannelCachingConnectionProxy connection  (Line 573)。 返回的代理連接需要保證  connection.target  不為 null(Line 564)。第一次調用 createConnection 方法時 connection.target 值為 null,因此會調用 createBareConnection 方法創建出 org.springframework.amqp.rabbit.connection.SimpleConnection  賦值給 connection.target(Line 565) 。SimpleConnection 擁有  com.rabbitmq.client.Connection delegate 屬性,持有真正的 RabbitMQ 連接(com.rabbitmq.client.impl.AMQConnection)。 createBareConnection  方法先通過  com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory   創建出  AMQConnection ,再創建一個 SimpleConnection 實例,將  AMQConnection 賦值給 delegate。
 
 
代碼第568行為當前 connection 綁定了一個 Semaphore,放在 Map<Connection, Semaphore> checkoutPermits 中。Semaphore 是非公平同步信號量,允許有 channelCacheSize(默認為25)個訪問許可。這和后面的信道緩存邏輯相關。
 
代碼第571行向所有 ConnectionListener 發布 onCreate 事件。 CachingConnectionFactory 擁有屬性   CompositeConnectionListener connectionListener ,是 ConnectionListener 的注冊中心,同時它也是事件源,這部分代碼是 Listener 模式的一個很好的例子。特別注意源碼中使用 CopyOnWriteArrayList 保存所有的 ConnectionListener,值得學習一下。
 

## 創建信道

創建信道是通過 ChannelCachingConnectionProxy 類的 createChannel 方法。首先判斷  channelCheckoutTimeout 參數值是否大於0,只有大於0的情況下才會通過 Semaphore 限制當前 連接下 可用的信道數量(不超過  Semaphore  的 permits 值,也就是  channelCacheSize  ),由於 channelCheckoutTimeout 默認值為0,所以默認情況下不會限制一個連接下可以有多少個信道
 
整個信道的復用是通過 LinkedList<ChannelProxy> channelList; 實現的。 CachingConnectionFactory  中有4個相關屬性分別用來緩存 CacheMode.CHANNEL 與  CacheMode.CONNECTION 兩種緩存模式下支持事務與不支持事務的信道,對於本例,用的是 LinkedList<ChannelProxy> cachedChannelsNonTransactional Spring AMQP 的緩存實現很普通: 使用 channelList 作為緩存隊列,所有對該隊列的操作都通過  channelList  自身作為 對象鎖進行同步
 
首先嘗試從 channelList 中獲取可用的緩存信道。 在同步塊中,先判斷  channelList  是否為空,若不為空,則返回 隊列頭部緩存的 ChannelProxy(要從隊列中移除) 如果沒有可用的緩存信道,則通過 getCachedChannelProxy 方法創建新的 ChannelProxy。創建 ChannelProxy 大致步驟如下:
 
  1. 先通過 com.rabbitmq.client.Connection delegate 創建出 com.rabbitmq.client.Channelcom.rabbitmq.client.impl.ChannelN)實例。(Line 492)
  2. 向所有 ChannelListener 發布 onCreate 事件。(Line 496)
  3. 創建動態代理。(Line 504)
 
 
 
本文不深入解釋動態代理,簡單來說,我們通過 Proxy.newProxyInstance 方法憑空創建了一個實例,這個實例實現了 ChannelProxy 接口,所有對該實例的方法調用都會轉交給CachedChannelInvocationHandler 的  invoke 方法處理。動態代理可以有效減少普通代理模式的代碼量(大量的委托實現不再需要),接口定義發生變化時 InvocationHandler 也可能無需變更
 

## ChannelProxy

ChannelProxy 第一次被調用是在業務邏輯中:
        DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),  queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
記住這兒的 channel 是 org.springframework.amqp.rabbit.connection. ChannelProxy 的動態代理實例。對 queueDeclare 方法的調用,實際上是通過反射調用真正的信道(ChannelN)實例的相同方法完成的:
        Object result = method.invoke(this.target, args);
 

## 關閉信道

關閉信道的代碼很直接:
        channel.close();
 
魔法就在於這個 channel 是動態代理實例,close 方法在  CachedChannelInvocationHandler  中被重新實現。
 
 
第912行的 channelList 就是  CachingConnectionFactory  的  LinkedList<ChannelProxy> cachedChannelsNonTransactional ,一路被傳遞到  CachedChannelInvocationHandler  中。第914行判斷當前已緩存的信道數量是否已經達到閾值,保證緩存的信道數量不超過 channelCacheSize 設定的值。(第915行代碼目的是什么?)。 如果最終需要緩存信道,則讓 Semaphore 釋放 permits(如果 channelCheckoutTimeout > 0 ),將  ChannelProxy  放到 channelList 隊尾。如果不需要緩存,則物理關閉信道,並讓  Semaphore 釋放 permits(如果 channelCheckoutTimeout > 0
 
整理一下,默認 channelCacheSize 為25,表示最多為同一個連接緩存25個信道。如果 channelCheckoutTimeout 值為0(默認值),實際上並不限制同一連接下能同時存在的信道數量;如果 channelCheckoutTimeout 值大於0,則通過 Semaphore 機制保證最多只有25個信道能夠同時被使用,超出數量的信道創建請求會拋出 AmqpTimeoutException 異常
 

## 關閉連接

實際的連接類是  ChannelCachingConnectionProxy,在默認的模式下,實際上關閉連接沒有執行任何操作。
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM