### 准備
## 目標
了解 CachingConnectionFactory 在默認緩存模式下的工作原理
## 前置知識
《Spring AMQP 源碼分析 01 - Impatient》
## 測試代碼
同 《
Spring AMQP 源碼分析 01 - Impatient》
### 分析
## 流程分析
從 《
Spring AMQP 源碼分析 01》 可知,在 RabbitTemplate 的
execute(ChannelCallback action, ConnectionFactory connectionFactory)
方法中需要創建與銷毀連接和信道。execute 方法調用 doExecute 方法完成相關邏輯,代碼如下:

核心邏輯很簡單,第1430行通過
CachingConnectionFactory 的 createConnection 方法創建 org.springframework.amqp.rabbit.connection.Connection,第1435行通過 Connection 的 createChannel 方法創建 com.rabbitmq.client.Channel,第1455行將創建的 channel 回傳給回調函數,執行業務操作。最后在 finally 塊中釋放信道和連接(不在截圖中)。
## 創建連接
在看代碼前先了解一下
CachingConnectionFactory
的功能。默認情況下(緩存模式是
CacheMode.CHANNEL
),
CachingConnectionFactory
的
createConnection 方法
總是返回同一個連接
。
通過連接獲取的信道也是會被緩存的
,但是緩存的細節與文檔描述不一致,以實際代碼為准。
CachingConnectionFactory
有一個
屬性 ChannelCachingConnectionProxy connection,在緩存模式為
CacheMode.CHANNEL 時,用於緩存唯一的連接。
ChannelCachingConnectionProxy
包含兩個屬性
org.springframework.amqp.rabbit.connection.Connection target 和 AtomicBoolean closeNotified,target 代表真實的連接。ChannelCachingConnectionProxy 是 org.springframework.amqp.rabbit.connection.Connection 的代理類,根據代理模式的定義,它也實現了 Connection 接口。

CachingConnectionFactory
的 createConnection 方法會返回
ChannelCachingConnectionProxy connection
(Line 573)。
返回的代理連接需要保證
connection.target
不為 null(Line 564)。第一次調用 createConnection 方法時 connection.target 值為 null,因此會調用 createBareConnection 方法創建出 org.springframework.amqp.rabbit.connection.SimpleConnection
賦值給 connection.target(Line 565)
。SimpleConnection 擁有
com.rabbitmq.client.Connection delegate 屬性,持有真正的 RabbitMQ 連接(com.rabbitmq.client.impl.AMQConnection)。
createBareConnection
方法先通過
com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory
創建出
AMQConnection
,再創建一個 SimpleConnection 實例,將
AMQConnection 賦值給
delegate。

代碼第568行為當前 connection 綁定了一個 Semaphore,放在 Map<Connection, Semaphore> checkoutPermits 中。Semaphore 是非公平同步信號量,允許有 channelCacheSize(默認為25)個訪問許可。這和后面的信道緩存邏輯相關。
代碼第571行向所有 ConnectionListener 發布 onCreate 事件。
CachingConnectionFactory 擁有屬性
CompositeConnectionListener connectionListener ,是 ConnectionListener 的注冊中心,同時它也是事件源,這部分代碼是 Listener 模式的一個很好的例子。特別注意源碼中使用 CopyOnWriteArrayList 保存所有的 ConnectionListener,值得學習一下。
## 創建信道
創建信道是通過 ChannelCachingConnectionProxy 類的 createChannel 方法。首先判斷
channelCheckoutTimeout 參數值是否大於0,只有大於0的情況下才會通過 Semaphore 限制當前
連接下
可用的信道數量(不超過
Semaphore
的 permits 值,也就是
channelCacheSize
值
),由於 channelCheckoutTimeout 默認值為0,所以默認情況下不會限制一個連接下可以有多少個信道。
整個信道的復用是通過 LinkedList<ChannelProxy> channelList; 實現的。
CachingConnectionFactory
中有4個相關屬性分別用來緩存 CacheMode.CHANNEL 與
CacheMode.CONNECTION 兩種緩存模式下支持事務與不支持事務的信道,對於本例,用的是 LinkedList<ChannelProxy> cachedChannelsNonTransactional。
Spring AMQP 的緩存實現很普通:
使用 channelList 作為緩存隊列,所有對該隊列的操作都通過
channelList
自身作為
對象鎖進行同步。
首先嘗試從 channelList 中獲取可用的緩存信道。
在同步塊中,先判斷
channelList
是否為空,若不為空,則返回
隊列頭部緩存的 ChannelProxy(要從隊列中移除)。
如果沒有可用的緩存信道,則通過 getCachedChannelProxy 方法創建新的 ChannelProxy。創建 ChannelProxy 大致步驟如下:
- 先通過 com.rabbitmq.client.Connection delegate 創建出 com.rabbitmq.client.Channel (com.rabbitmq.client.impl.ChannelN)實例。(Line 492)
- 向所有 ChannelListener 發布 onCreate 事件。(Line 496)
- 創建動態代理。(Line 504)

本文不深入解釋動態代理,簡單來說,我們通過 Proxy.newProxyInstance 方法憑空創建了一個實例,這個實例實現了 ChannelProxy 接口,所有對該實例的方法調用都會轉交給CachedChannelInvocationHandler 的
invoke 方法處理。動態代理可以有效減少普通代理模式的代碼量(大量的委托實現不再需要),接口定義發生變化時 InvocationHandler 也可能無需變更。
## ChannelProxy
ChannelProxy 第一次被調用是在業務邏輯中:
DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
記住這兒的 channel 是
org.springframework.amqp.rabbit.connection.
ChannelProxy 的動態代理實例。對 queueDeclare 方法的調用,實際上是通過反射調用真正的信道(ChannelN)實例的相同方法完成的:
Object result = method.invoke(this.target, args);
## 關閉信道
關閉信道的代碼很直接:
channel.close();
魔法就在於這個 channel 是動態代理實例,close 方法在
CachedChannelInvocationHandler
中被重新實現。

第912行的 channelList 就是
CachingConnectionFactory
的
LinkedList<ChannelProxy> cachedChannelsNonTransactional
,一路被傳遞到
CachedChannelInvocationHandler
中。第914行判斷當前已緩存的信道數量是否已經達到閾值,保證緩存的信道數量不超過 channelCacheSize 設定的值。(第915行代碼目的是什么?)。
如果最終需要緩存信道,則讓 Semaphore 釋放 permits(如果 channelCheckoutTimeout > 0
),將
ChannelProxy
放到 channelList 隊尾。如果不需要緩存,則物理關閉信道,並讓
Semaphore 釋放 permits(如果 channelCheckoutTimeout > 0
)
。
整理一下,默認 channelCacheSize 為25,表示最多為同一個連接緩存25個信道。如果 channelCheckoutTimeout 值為0(默認值),實際上並不限制同一連接下能同時存在的信道數量;如果 channelCheckoutTimeout 值大於0,則通過 Semaphore 機制保證最多只有25個信道能夠同時被使用,超出數量的信道創建請求會拋出 AmqpTimeoutException 異常。
## 關閉連接
實際的連接類是
ChannelCachingConnectionProxy,在默認的模式下,實際上關閉連接沒有執行任何操作。