在mina的源碼,整個框架最核心的幾個包是 :
- org.apache.mina.core.service :IoService、IoProcessor、IoHandler、IoAcceptor、IoConnector
- org.apache.mina.core.session
- org.apache.mina.core.polling
- org.apache.mina.transport.socket
IoService :Base interface for all IoAcceptors and IoConnectors that provide I/O service and manage IoSessions.
它是所有 IoAcceptor 和 IoConnector 的基接口,對於一個 IoService,有哪些信息需要我們關注呢?
- 底層的元數據信息 TransportMetadata,比如底層的網絡服務提供者(NIO,ARP,RXTX等)。(除此方法之外,其它方法在 AbstractIoService 得以實現。)
- 通過這個服務創建一個新會話時,新會話的默認配置 IoSessionConfig。
- 此服務所管理的所有會話。
- 與這個服務相關所產生的事件所對應的監聽者(IoServiceListener)。
- 處理這個服務所管理的所有連接的處理器(IoHandler)。
- 每個會話都有一個過濾器鏈(IoFilterChain),每個過濾器鏈通過其對應的 IoFilterChainBuilder來負責構建。
- 由於此服務管理了一系列會話,因此可以通過廣播的方式向所有會話發送消息,返回結果是一個WriteFuture集,后者是一種表示未來預期結果的數據結構。
- 服務創建的會話(IoSession)相關的數據通過 IoSessionDataStructureFactory來提供。
- 發送消息時有一個寫緩沖隊列。
- 服務的閑置狀態有三種:讀端空閑,寫端空閑,雙端空閑。
- 還提供服務的一些統計信息,比如時間,數據量等。
IoProcessor<S extends IoSession> :An internal interface to represent an 'I/O processor' that performs actual I/O operations for IoSessions. It abstracts existing reactor frameworks such as Java NIO once again to simplify transport implementations.
一個內部接口代表一個I/O處理器,它為 IoSession 執行實際的I/O操作。它抽象現有反應器框架,如 Java NIO 再一次簡化傳輸實現。
1、IoAcceptor :服務器端接口,接受客戶端訪問的請求。
2、AbstractIoService :設定處理函數、Filter、IoServiceListener,實現了 IoService 所有方法,除了 getTransportMetadata() 。
3、IoServiceListener :可以用於打印日志,主要有Service啟動、停止、空閑等監聽方法。
4、處理函數是一個Executor或者是Executor的包裝。
5、AbstractIoAcceptor完成綁定監聽端口。
6、AbstractPollingIoAcceptor執行具體的監聽連接以及監聽I/O事件,但真正實現的是NioSocketAcceptor open() 方法。
7、NioSocketAcceptor用JAVA NIO的方式實現了具體的連接方法ServerSocketChannel,例如open,accept等
以下代碼的流程:
IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.setHandler(new IoHandlerAdapter()); acceptor.bind(new InetSocketAddress(6969));
1)NioSocketAcceptor 構造函數:
public NioSocketAcceptor() { super(new DefaultSocketSessionConfig(), NioProcessor.class); ((DefaultSocketSessionConfig) getSessionConfig()).init(this); }
定義了SessionConfig,並把this傳遞到SessionConfig當中,指定NioProcessor。
AbstractPollingIoAcceptor 構造函數:
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) { this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true); }
private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor, boolean createdProcessor) { super(sessionConfig, executor); if (processor == null) { throw new IllegalArgumentException("processor"); } this.processor = processor; this.createdProcessor = createdProcessor; try { // Initialize the selector init(); // The selector is now ready, we can switch the // flag to true so that incoming connection can be accepted selectable = true; } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to initialize.", e); } finally { if (!selectable) { try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } }
public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) { this(processorType, executor, DEFAULT_SIZE); // int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1; }
public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size) { if (processorType == null) { throw new IllegalArgumentException("processorType"); } if (size <= 0) { throw new IllegalArgumentException("size: " + size + " (expected: positive integer)"); } // Create the executor if none is provided createdExecutor = (executor == null); if (createdExecutor) { this.executor = Executors.newCachedThreadPool(); // Set a default reject handler ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } else { this.executor = executor; } pool = new IoProcessor[size]; boolean success = false; Constructor<? extends IoProcessor<S>> processorConstructor = null; boolean usesExecutorArg = true; try { // We create at least one processor try { try { processorConstructor = processorType.getConstructor(ExecutorService.class); pool[0] = processorConstructor.newInstance(this.executor); } catch (NoSuchMethodException e1) { // To the next step... try { processorConstructor = processorType.getConstructor(Executor.class); pool[0] = processorConstructor.newInstance(this.executor); } catch (NoSuchMethodException e2) { // To the next step... try { processorConstructor = processorType.getConstructor(); usesExecutorArg = false; pool[0] = processorConstructor.newInstance(); } catch (NoSuchMethodException e3) { // To the next step... } } } } catch (RuntimeException re) { LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage()); throw re; } catch (Exception e) { String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage(); LOGGER.error(msg, e); throw new RuntimeIoException(msg, e); } if (processorConstructor == null) { // Raise an exception if no proper constructor is found. String msg = String.valueOf(processorType) + " must have a public constructor with one " + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one " + Executor.class.getSimpleName() + " parameter or a public default constructor."; LOGGER.error(msg); throw new IllegalArgumentException(msg); } // Constructor found now use it for all subsequent instantiations for (int i = 1; i < pool.length; i++) { try { if (usesExecutorArg) { pool[i] = processorConstructor.newInstance(this.executor); } else { pool[i] = processorConstructor.newInstance(); } } catch (Exception e) { // Won't happen because it has been done previously } } success = true; } finally { if (!success) { dispose(); } } }
public NioProcessor(Executor executor) { super(executor); try { // Open a new selector selector = Selector.open(); } catch (IOException e) { throw new RuntimeIoException("Failed to open a selector.", e); } }
new SimpleIoProcessorPool<S>(processorClass) :是把NioProcessor包裝成了pool.看類圖IoProcessor就很好理解了,這是一個組成模式。
init()的調用實際上是NioSocketAcceptor的,init() 源碼:selector = Selector.open(); 值得注意的是,服務端在創建NioSocketAcceptor實現時,會生成一個線程池(AbstractIoService.executor),此線程池用來執行一個接受請求的任務,這個任務是AbstractPollingIoAcceptor的Acceptor,Acceptor會開一個Selector,用來監聽NIO中的ACCEPT事件。任務初始化時並沒有執行,而在調用NioSocketAcceptor實例的bind方法時,則會啟動對指定端口的ACCEPT事件的監聽。
SimpleIoProcessorPool是在NioSocketAcceptor實例化時創建的,其上有N+1(N=CPU的個數)個NIOProcessor(IoProcessor<S>[] pool)來處理實際IO的讀寫事件,每個pool都是從NioSocketAcceptor構造函數傳過去的NioProcessor實例,並在NioProcessor構造函數傳入一個線程池。
每個NIOProcessor都會對應一個Selector和 NioSocketAcceptor.init() 中的Selector 一起構成了Mina獨有的雙Selector模型,這種設計的優點是不會導致阻塞),來監聽Socket中的讀寫事件。實際對讀寫的操作也是在一個SimpleIoProcessorPool實例化好的一個線程池中以任務的形式執行,這個任務叫Processor(可以在AbstractPollingIoProcessor類中找到其實現)
AbstractIoAcceptor 構造函數:
protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) { super(sessionConfig, executor); defaultLocalAddresses.add(null); }
AbstractIoAcceptor主要用來綁定監聽端口。這個構造函數沒有干其他的事情。
protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) { if (sessionConfig == null) { throw new IllegalArgumentException("sessionConfig"); } if (getTransportMetadata() == null) { throw new IllegalArgumentException("TransportMetadata"); } if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) { throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: " + getTransportMetadata().getSessionConfigType() + ")"); } // Create the listeners, and add a first listener : a activation listener // for this service, which will give information on the service state. listeners = new IoServiceListenerSupport(this); listeners.add(serviceActivationListener); // Stores the given session configuration this.sessionConfig = sessionConfig; // Make JVM load the exception monitor before some transports // change the thread context class loader. ExceptionMonitor.getInstance(); if (executor == null) { this.executor = Executors.newCachedThreadPool(); createdExecutor = true; } else { this.executor = executor; createdExecutor = false; } threadName = getClass().getSimpleName() + '-' + id.incrementAndGet(); }
這個構造函數還有一個監聽器,或者叫監聽池(可以包含多個監聽器)。用來監聽service創建、連接、斷開等動作,當上述動作發生地時候,會調用listener。里面可以寫自己的一些方法。
一次請求的過程如下 :
Client通過Socket連接服務器,先是由Acceptor接收到請求連接的事件(即ACCEPT事件)。此事件由Acceptor進行處理,會創建一條Socket連接,並將此連接和一個NIOProcessor關聯,這個過程通過圖中的連接分配器進行,連接分配器會均衡的將Socket和不同的NIOProcessor綁定(輪流分配),綁定完成后,會在NIOProcessor上進行讀寫事件的監聽,而讀寫的實際處理則分配給Processor任務完成。當有讀寫事件發生時,就會通知到對應的Processor進行數據處理。