### 准备
## 目标
了解 CachingConnectionFactory 在默认缓存模式下的工作原理
## 前置知识
《Spring AMQP 源码分析 01 - Impatient》
## 测试代码
同 《
Spring AMQP 源码分析 01 - Impatient》
### 分析
## 流程分析
从 《
Spring AMQP 源码分析 01》 可知,在 RabbitTemplate 的
execute(ChannelCallback action, ConnectionFactory connectionFactory)
方法中需要创建与销毁连接和信道。execute 方法调用 doExecute 方法完成相关逻辑,代码如下:

核心逻辑很简单,第1430行通过
CachingConnectionFactory 的 createConnection 方法创建 org.springframework.amqp.rabbit.connection.Connection,第1435行通过 Connection 的 createChannel 方法创建 com.rabbitmq.client.Channel,第1455行将创建的 channel 回传给回调函数,执行业务操作。最后在 finally 块中释放信道和连接(不在截图中)。
## 创建连接
在看代码前先了解一下
CachingConnectionFactory
的功能。默认情况下(缓存模式是
CacheMode.CHANNEL
),
CachingConnectionFactory
的
createConnection 方法
总是返回同一个连接
。
通过连接获取的信道也是会被缓存的
,但是缓存的细节与文档描述不一致,以实际代码为准。
CachingConnectionFactory
有一个
属性 ChannelCachingConnectionProxy connection,在缓存模式为
CacheMode.CHANNEL 时,用于缓存唯一的连接。
ChannelCachingConnectionProxy
包含两个属性
org.springframework.amqp.rabbit.connection.Connection target 和 AtomicBoolean closeNotified,target 代表真实的连接。ChannelCachingConnectionProxy 是 org.springframework.amqp.rabbit.connection.Connection 的代理类,根据代理模式的定义,它也实现了 Connection 接口。

CachingConnectionFactory
的 createConnection 方法会返回
ChannelCachingConnectionProxy connection
(Line 573)。
返回的代理连接需要保证
connection.target
不为 null(Line 564)。第一次调用 createConnection 方法时 connection.target 值为 null,因此会调用 createBareConnection 方法创建出 org.springframework.amqp.rabbit.connection.SimpleConnection
赋值给 connection.target(Line 565)
。SimpleConnection 拥有
com.rabbitmq.client.Connection delegate 属性,持有真正的 RabbitMQ 连接(com.rabbitmq.client.impl.AMQConnection)。
createBareConnection
方法先通过
com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory
创建出
AMQConnection
,再创建一个 SimpleConnection 实例,将
AMQConnection 赋值给
delegate。

代码第568行为当前 connection 绑定了一个 Semaphore,放在 Map<Connection, Semaphore> checkoutPermits 中。Semaphore 是非公平同步信号量,允许有 channelCacheSize(默认为25)个访问许可。这和后面的信道缓存逻辑相关。
代码第571行向所有 ConnectionListener 发布 onCreate 事件。
CachingConnectionFactory 拥有属性
CompositeConnectionListener connectionListener ,是 ConnectionListener 的注册中心,同时它也是事件源,这部分代码是 Listener 模式的一个很好的例子。特别注意源码中使用 CopyOnWriteArrayList 保存所有的 ConnectionListener,值得学习一下。
## 创建信道
创建信道是通过 ChannelCachingConnectionProxy 类的 createChannel 方法。首先判断
channelCheckoutTimeout 参数值是否大于0,只有大于0的情况下才会通过 Semaphore 限制当前
连接下
可用的信道数量(不超过
Semaphore
的 permits 值,也就是
channelCacheSize
值
),由于 channelCheckoutTimeout 默认值为0,所以默认情况下不会限制一个连接下可以有多少个信道。
整个信道的复用是通过 LinkedList<ChannelProxy> channelList; 实现的。
CachingConnectionFactory
中有4个相关属性分别用来缓存 CacheMode.CHANNEL 与
CacheMode.CONNECTION 两种缓存模式下支持事务与不支持事务的信道,对于本例,用的是 LinkedList<ChannelProxy> cachedChannelsNonTransactional。
Spring AMQP 的缓存实现很普通:
使用 channelList 作为缓存队列,所有对该队列的操作都通过
channelList
自身作为
对象锁进行同步。
首先尝试从 channelList 中获取可用的缓存信道。
在同步块中,先判断
channelList
是否为空,若不为空,则返回
队列头部缓存的 ChannelProxy(要从队列中移除)。
如果没有可用的缓存信道,则通过 getCachedChannelProxy 方法创建新的 ChannelProxy。创建 ChannelProxy 大致步骤如下:
- 先通过 com.rabbitmq.client.Connection delegate 创建出 com.rabbitmq.client.Channel (com.rabbitmq.client.impl.ChannelN)实例。(Line 492)
- 向所有 ChannelListener 发布 onCreate 事件。(Line 496)
- 创建动态代理。(Line 504)

本文不深入解释动态代理,简单来说,我们通过 Proxy.newProxyInstance 方法凭空创建了一个实例,这个实例实现了 ChannelProxy 接口,所有对该实例的方法调用都会转交给CachedChannelInvocationHandler 的
invoke 方法处理。动态代理可以有效减少普通代理模式的代码量(大量的委托实现不再需要),接口定义发生变化时 InvocationHandler 也可能无需变更。
## ChannelProxy
ChannelProxy 第一次被调用是在业务逻辑中:
DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
记住这儿的 channel 是
org.springframework.amqp.rabbit.connection.
ChannelProxy 的动态代理实例。对 queueDeclare 方法的调用,实际上是通过反射调用真正的信道(ChannelN)实例的相同方法完成的:
Object result = method.invoke(this.target, args);
## 关闭信道
关闭信道的代码很直接:
channel.close();
魔法就在于这个 channel 是动态代理实例,close 方法在
CachedChannelInvocationHandler
中被重新实现。

第912行的 channelList 就是
CachingConnectionFactory
的
LinkedList<ChannelProxy> cachedChannelsNonTransactional
,一路被传递到
CachedChannelInvocationHandler
中。第914行判断当前已缓存的信道数量是否已经达到阈值,保证缓存的信道数量不超过 channelCacheSize 设定的值。(第915行代码目的是什么?)。
如果最终需要缓存信道,则让 Semaphore 释放 permits(如果 channelCheckoutTimeout > 0
),将
ChannelProxy
放到 channelList 队尾。如果不需要缓存,则物理关闭信道,并让
Semaphore 释放 permits(如果 channelCheckoutTimeout > 0
)
。
整理一下,默认 channelCacheSize 为25,表示最多为同一个连接缓存25个信道。如果 channelCheckoutTimeout 值为0(默认值),实际上并不限制同一连接下能同时存在的信道数量;如果 channelCheckoutTimeout 值大于0,则通过 Semaphore 机制保证最多只有25个信道能够同时被使用,超出数量的信道创建请求会抛出 AmqpTimeoutException 异常。
## 关闭连接
实际的连接类是
ChannelCachingConnectionProxy,在默认的模式下,实际上关闭连接没有执行任何操作。