簡介
session類圖
Mina每建立一個連接同時會創建一個session對象,用於保存這次讀寫需要用到的所有信息。從抽象類AbstractIoSession中可以看出session具有如下功能:
1、從attributes成員可以看出session可以存放用戶關心的鍵值對
2、注意到WriteRequestQueue,這是一個寫請求隊列,processor中調用flush或者flushNow方法時會將用戶寫入的數據包裝成一個writeRequest對象,並加入這個隊列中。
3、提供了大量的統計功能,比如接收到了多少消息、最后讀取時間等
在代碼中一般是這樣使用session的
// 創建服務器監聽 IoAcceptor acceptor = new NioSocketAcceptor(); // 設置buffer的長度 acceptor.getSessionConfig().setReadBufferSize(2048); // 設置連接超時時間 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
session做為一個連接的具體對象,緩存當前連接用戶的一些信息。
創建好acceptor或者connector之后,通過IoSessionConfig對session對行配置。
用得最多的是通過session寫入數據,這是調用了IoSession的write方法
WriteFuture write(Object message);
WriteFuture write(Object message, SocketAddress destination);
下面着重分析創建過程以及session的狀態
創建與初始化
每建立一個連接,就會創建一個session,IoAcceptor的accept方法的返回值正是一個session。見
NioSocketAcceptor
.accept(
IoProcessor
<
NioSession
> processor,
ServerSocketChannel
handle)方法:
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) { return null; } // accept the connection from the client SocketChannel ch = handle.accept(); if (ch == null) { return null; } return new NioSocketSession(this, processor, ch); }
由以上代碼可知,
session包含了對眾多對象的引用,比如processor,socketChannel,SelectionKey,IoFilter等。
session在創建好后,緊接着就會對其進行初始化。
AbstractIoService
.initSession(
IoSession
session,
IoFuture
future,
IoSessionInitializer
sessionInitializer)方法如下:
protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) { // Update lastIoTime if needed. if (stats.getLastReadTime() == 0) { stats.setLastReadTime(getActivationTime()); } if (stats.getLastWriteTime() == 0) { stats.setLastWriteTime(getActivationTime()); } // Every property but attributeMap should be set now. // Now initialize the attributeMap. The reason why we initialize // the attributeMap at last is to make sure all session properties // such as remoteAddress are provided to IoSessionDataStructureFactory. try { ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory() .getAttributeMap(session)); } catch (IoSessionInitializationException e) { throw e; } catch (Exception e) { throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e); } try { ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory() .getWriteRequestQueue(session)); } catch (IoSessionInitializationException e) { throw e; } catch (Exception e) { throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e); } if ((future != null) && (future instanceof ConnectFuture)) { // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now). session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future); } if (sessionInitializer != null) { sessionInitializer.initializeSession(session, future); } finishSessionInitialization0(session, future); }
設置上次讀寫時間,初始化屬性map和寫請求隊列等。
session被初始化好之后會加入到processor中,processor中有一個隊列專門存放session:
例如:AbstractPollingIoProcessor.java中有:
private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>(); /** A queue used to store the sessions to be removed */ private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>(); /** A queue used to store the sessions to be flushed */ private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>(); /** * A queue used to store the sessions which have a trafficControl to be * updated */ private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>(); /** The processor thread : it handles the incoming messages */ private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
加入隊列之后,processor就會從隊列中取出session,以下是processor的run方法關鍵代碼:
-
private class Processor implements Runnable { public void run() { for (;;) { long t0 = System.currentTimeMillis(); int selected = select(SELECT_TIMEOUT); long t1 = System.currentTimeMillis(); long delta = (t1 - t0); if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) { if (isBrokenConnection()) { wakeupCalled.getAndSet(false); continue; } else { registerNewSelector(); } wakeupCalled.getAndSet(false); continue; } nSessions += handleNewSessions(); updateTrafficMask(); if (selected > 0) { process(); } nSessions -= removeSessions(); } } }
2、從newSessions隊列中取出這些session,並將其負責的通道注冊到selector上
3、處理准備就緒的session
AbstractPollingIoProcessor.java
private void process(S session) { // Process Reads if (isReadable(session) && !session.isReadSuspended()) { read(session); } // Process writes if (isWritable(session) && !session.isWriteSuspended()) { // add the session to the queue, if it's not already there if (session.setScheduledForFlush(true)) { flushingSessions.add(session); } } }
總結一下創建與初始化過程:連接到來創建一個session,初始化好之后加入到processor負責的一個隊列中。processor線程會把隊列中的session對應的通道都注冊到它自己的selector上,然后這個selector輪詢這些通道是否准備就緒,一旦准備就緒就調用對應方法進行處理(read or flushNow)。
IoFilter與IoHandler就是在這些狀態上面加以干預,下面重點看一下IDLE狀態,它分三種:
狀態
Mina中的session具有狀態,且狀態之間是可以相互轉化的
Connected:session被創建時處於這種狀態
Idle:沒有請求可以處理(可配置)
Closing:正處於關閉狀態(可能正在做一些清理工作)
Closed:關閉狀態
下圖是這幾種狀態之間的轉化圖:

Idle for read:在規定時間內沒有數據可讀
Idle for write:在規定時間內沒有數據可寫
Idle for both:在規定時間內沒有數據可讀和可寫
這三種狀態分別對應IdleStatus類的三個常量:READER_IDLE、WRITER_IDLE、BOTH_IDLE
前面session的用法中有如下設置:
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
acceptor的run方法中調用了notifyIdleSessions
-
private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status, long lastIoTime) { if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) { session.getFilterChain().fireSessionIdle(status); } }
public void fireSessionIdle(IdleStatus status) { session.increaseIdleCount(status, System.currentTimeMillis()); Entry head = this.head; callNextSessionIdle(head, session, status); }