要了解 connection 和 session 的概念,可以先從 ConnectionState 和 SessionState 入手:
// 省略部分代碼 public class ConnectionState { ConnectionInfo info; private final ConcurrentHashMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>(); private final ConcurrentHashMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>(); private final List<DestinationInfo> tempDestinations = Collections.synchronizedList(new ArrayList<DestinationInfo>()); private final AtomicBoolean shutdown = new AtomicBoolean(false); private boolean connectionInterruptProcessingComplete = true; private HashMap<ConsumerId, ConsumerInfo> recoveringPullConsumers; public ConnectionState(ConnectionInfo info) { this.info = info; // Add the default session id. addSession(new SessionInfo(info, -1)); } }
從代碼可以看出,連接里有事務集合、會話集合、臨時隊列集合等,這說明:
1. 事務屬於一個連接; 2. 會話屬於一個連接; 3. 臨時隊列的生存期是連接的有效期
// 省略部分代碼 public class SessionState { final SessionInfo info; private final Map<ProducerId, ProducerState> producers = new ConcurrentHashMap<ProducerId, ProducerState>(); private final Map<ConsumerId, ConsumerState> consumers = new ConcurrentHashMap<ConsumerId, ConsumerState>(); private final AtomicBoolean shutdown = new AtomicBoolean(false); public SessionState(SessionInfo info) { this.info = info; } }
從上面能看出,producer 和 consumer 是屬於某個會話的,producer 和 consumer 都有唯一的 ID 。
// 省略部分代碼 public class ProducerState { final ProducerInfo info; private TransactionState transactionState; } public class ConsumerState { final ConsumerInfo info; }
ProducerState 和 ConsumerState 只是做了簡單的封裝。
其中 ConnectionInfo, SessionInfo, ProducerInfo, ConsumerInfo 都是消息類型,均繼承自 BaseCommand 接口。