Apache Mina Server 是一個網絡通信應用框架,為開發高性能和高可用性的網絡應用程序提供了非常便利的框架。
特點:異步的NIO框架,將UDP當成"面向連接"的協議
一、組件管理
Mina的底層依賴的主要是Java NIO庫,上層提供的是基於事件的異步接口
(1)IoService(最底層[起點])
作用:隱藏底層IO的細節,對上提供統一的基於事件的異步IO接口
IOSocketAcceptor和IOSocketChannel,分別對應TCP協議下的服務端和客戶端的IOService,low-level IO經過IOService層后變成IO Event
(2)IoProcessor
作用:負責檢查是否有數據在通道上讀寫
IoProcessor 負責調用注冊在IoService 上的過濾器,並在過濾器鏈之后調用IoHandler
(3)IoFilter(攔截器[關鍵])
作用:日志輸出、黑名單過濾、數據的編碼(write 方向)與解碼(read 方向)等功能
(4)IoHandler(業務邏輯處理[終點])
作用:接收、發送數據
每個IoService都需要指定一個IoHandler
(5)IoSession
作用:是對底層連接(服務器與客戶端的特定連接,該連接由服務器地址、端口以及客戶端地址、端口來決定)的封裝
二、服務端流程:
1、通過SocketAcceptor 同客戶端建立連接;
2、連接建立之后 I/O的讀寫交給了I/O Processor線程,I/O Processor是多線程的;
3、通過I/O Processor 讀取的數據經過IoFilterChain里所有配置的IoFilter,IoFilter進行消息的過濾,格式的轉換,在這個層面可以制定一些自定義的協議;
4、最后IoFilter將數據交給 Handler 進行業務處理,完成了整個讀取的過程;
寫入過程也是類似,只是剛好倒過來,通過IoSession.write 寫出數據,然后Handler進行寫入的業務處理,處理完成后交給IoFilterChain,進行消息過濾和協議的轉換,最后通過 I/O Processor 將數據寫出到 socket 通道
三、服務端代碼實現
1、編寫IoService(主要功能:設置過濾器,設置handler,綁定端口)
public class MinaServer { private static Logger logger = Logger.getLogger(MinaServer.class); public static void main(String[] args) { Configuration config = Configuration.getInstance(); try { config.readConfiguration(); config.initDataSource(); config.initServiceDef(); // 啟動FTP讀文件,PE積分后台執行 //此處為內部邏輯,通過FTP讀取文件,跳過 } catch (Exception e1) { e1.printStackTrace(); System.exit(1); } try { config.initInterfaceConfig(); // 啟動接口調用互斥map維護線程 InterfaceBusThread interfaceBusThread = new InterfaceBusThread(); interfaceBusThread.start(); } catch (Exception e) { logger.info("InterfaceBus error!", e); } NioSocketAcceptor acceptor = null; // 創建連接 try { // 創建一個非阻塞的server端的Socket acceptor = new NioSocketAcceptor(Runtime.getRuntime() .availableProcessors() + 1);// ioprocesser線程數,一般為cpu數量+1 // 建立工作線程池 Executor threadPool = Executors.newFixedThreadPool(Constant.threadCount);// 設置20個handler線程 // 解決在LINUX服務器上kill掉了,但是端口仍然被占用,要過一段時間才能釋放 acceptor.setReuseAddress(true); // 添加消息過濾器 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new XMLObjectCodecFactory())); // // 設置服務器能夠接收的最大連接數 acceptor.setBacklog(Constant.maxSocketConn); acceptor.getFilterChain().addLast("executor", new ExecutorFilter(threadPool)); // 設置讀取數據的緩沖區大小 acceptor.getSessionConfig().setReadBufferSize(2048); // 讀寫通道10秒內無操作進入空閑狀態 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); // 綁定邏輯處理器 acceptor.setHandler(new MinaServerHandler()); // 添加業務處理 // 綁定端口 acceptor.bind(new InetSocketAddress(Constant.minaServerPort)); logger.info("server start success... port:" + Constant.minaServerPort); } catch (Exception e) { logger.error("server start error....", e); e.printStackTrace(); System.exit(1); } } }
2、編寫過濾器(即上面代碼的添加消息過濾器,這里使用了Mina自帶的換行符編解碼器工廠,注意:要在acceptor.bind()方法之前執行)
3、編寫IoHandler
public class MinaServerHandler extends IoHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(TCPServerHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { //業務代碼在這里編寫處理 String str = message.toString(); System.out.println("The message received is [" + str + "]"); if (str.endsWith("quit")) { session.close(true); return; } } @Override public void sessionCreated(IoSession session) throws Exception { System.out.println("server session created"); super.sessionCreated(session); } @Override public void sessionOpened(IoSession session) throws Exception { System.out.println("server session Opened"); super.sessionOpened(session); } @Override public void sessionClosed(IoSession session) throws Exception { System.out.println("server session Closed"); super.sessionClosed(session); } }
4、將IoHandler 注冊到IoService(即上面代碼的綁定邏輯處理器,注意:要在acceptor.bind()方法之前執行)
5、運行MinaServer中的main 方法,可以看到控制台一直處於阻塞狀態,等待客戶端連接
四、Maven的pom.xml配置
<!-- MINA集成 --> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.0.7</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-integration-spring</artifactId> <version>1.1.7</version> </dependency>
五、測試類(模擬客戶端請求)
public class AutoTest { private final static String IP = "localhost"; private final static int PORT = 9002; public static void main(String[] args) { String msg = "<root><check>testEquityInsertRev</check><method>equityinsertrev</method><param><usernumber>13632599010</usernumber><equityid>1</equityid><statedate>20130620</statedate></param></root>"; System.out.println("testEquityInsertRev request msg:"+msg); System.out.println("testEquityInsertRev response msg:"+testApi(msg)); }