Mina框架(實戰詳解)


Apache Mina Server 是一個網絡通信應用框架,為開發高性能和高可用性的網絡應用程序提供了非常便利的框架。

特點:異步的NIO框架,將UDP當成"面向連接"的協議

一、組件管理

Mina的底層依賴的主要是Java NIO庫,上層提供的是基於事件的異步接口
(1)IoService(最底層[起點])
作用:隱藏底層IO的細節,對上提供統一的基於事件的異步IO接口
IOSocketAcceptor和IOSocketChannel,分別對應TCP協議下的服務端和客戶端的IOService,low-level IO經過IOService層后變成IO Event
(2)IoProcessor
作用:負責檢查是否有數據在通道上讀寫
IoProcessor 負責調用注冊在IoService 上的過濾器,並在過濾器鏈之后調用IoHandler
(3)IoFilter(攔截器[關鍵])
作用:日志輸出、黑名單過濾、數據的編碼(write 方向)與解碼(read 方向)等功能
(4)IoHandler(業務邏輯處理[終點])
作用:接收、發送數據
每個IoService都需要指定一個IoHandler
(5)IoSession
作用:是對底層連接(服務器與客戶端的特定連接,該連接由服務器地址、端口以及客戶端地址、端口來決定)的封裝

二、服務端流程:

1、通過SocketAcceptor 同客戶端建立連接;
2、連接建立之后 I/O的讀寫交給了I/O Processor線程,I/O Processor是多線程的;
3、通過I/O Processor 讀取的數據經過IoFilterChain里所有配置的IoFilter,IoFilter進行消息的過濾,格式的轉換,在這個層面可以制定一些自定義的協議;
4、最后IoFilter將數據交給 Handler  進行業務處理,完成了整個讀取的過程;
寫入過程也是類似,只是剛好倒過來,通過IoSession.write 寫出數據,然后Handler進行寫入的業務處理,處理完成后交給IoFilterChain,進行消息過濾和協議的轉換,最后通過 I/O Processor 將數據寫出到 socket 通道

三、服務端代碼實現

1、編寫IoService(主要功能:設置過濾器,設置handler,綁定端口)

public class MinaServer {
    private static Logger logger = Logger.getLogger(MinaServer.class);
    public static void main(String[] args) {
            Configuration config = Configuration.getInstance();
            try {
                config.readConfiguration();
                config.initDataSource();
                config.initServiceDef();

                // 啟動FTP讀文件,PE積分后台執行
                //此處為內部邏輯,通過FTP讀取文件,跳過
            } catch (Exception e1) {
                e1.printStackTrace();
                System.exit(1);
            }
            try {
                config.initInterfaceConfig();
                // 啟動接口調用互斥map維護線程
                InterfaceBusThread interfaceBusThread = new InterfaceBusThread();
                interfaceBusThread.start();
            } catch (Exception e) {
                logger.info("InterfaceBus error!", e);
            }

            NioSocketAcceptor acceptor = null; // 創建連接
            try {
                // 創建一個非阻塞的server端的Socket
                acceptor = new NioSocketAcceptor(Runtime.getRuntime()
                        .availableProcessors() + 1);// ioprocesser線程數,一般為cpu數量+1

                // 建立工作線程池
                Executor threadPool = Executors.newFixedThreadPool(Constant.threadCount);// 設置20個handler線程

                // 解決在LINUX服務器上kill掉了,但是端口仍然被占用,要過一段時間才能釋放
                acceptor.setReuseAddress(true);

                // 添加消息過濾器
                acceptor.getFilterChain().addLast("codec",
                        new ProtocolCodecFilter(new XMLObjectCodecFactory()));
                //
                // 設置服務器能夠接收的最大連接數
                acceptor.setBacklog(Constant.maxSocketConn);
                acceptor.getFilterChain().addLast("executor",
                        new ExecutorFilter(threadPool));
                // 設置讀取數據的緩沖區大小
                acceptor.getSessionConfig().setReadBufferSize(2048);
                // 讀寫通道10秒內無操作進入空閑狀態
                acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
                // 綁定邏輯處理器
                acceptor.setHandler(new MinaServerHandler()); // 添加業務處理
                // 綁定端口
                acceptor.bind(new InetSocketAddress(Constant.minaServerPort));
                logger.info("server start success... port:"
                        + Constant.minaServerPort);

            } catch (Exception e) {
                logger.error("server start error....", e);
                e.printStackTrace();
                System.exit(1);
            }
    }
}

2、編寫過濾器(即上面代碼的添加消息過濾器,這里使用了Mina自帶的換行符編解碼器工廠,注意:要在acceptor.bind()方法之前執行)

3、編寫IoHandler

public class MinaServerHandler extends IoHandlerAdapter {
    private final static Logger log = LoggerFactory.getLogger(TCPServerHandler.class);

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        //業務代碼在這里編寫處理
        String str = message.toString();
        System.out.println("The message received is [" + str + "]");
        if (str.endsWith("quit")) {
            session.close(true);
            return;
        }
    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out.println("server session created");
        super.sessionCreated(session);
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        System.out.println("server session Opened");
        super.sessionOpened(session);
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out.println("server session Closed");
        super.sessionClosed(session);
    }
    
}

4、將IoHandler 注冊到IoService(即上面代碼的綁定邏輯處理器,注意:要在acceptor.bind()方法之前執行)

5、運行MinaServer中的main 方法,可以看到控制台一直處於阻塞狀態,等待客戶端連接

四、Maven的pom.xml配置

<!-- MINA集成 -->
<dependency>
    <groupId>org.apache.mina</groupId>
    <artifactId>mina-core</artifactId>
    <version>2.0.7</version>
</dependency>
<dependency>
    <groupId>org.apache.mina</groupId>
    <artifactId>mina-integration-spring</artifactId>
    <version>1.1.7</version>
</dependency>

五、測試類(模擬客戶端請求)

public class AutoTest {
    private final static String IP = "localhost";
    private final static int PORT = 9002;
    public static void main(String[] args) {
        String msg = "<root><check>testEquityInsertRev</check><method>equityinsertrev</method><param><usernumber>13632599010</usernumber><equityid>1</equityid><statedate>20130620</statedate></param></root>";
        System.out.println("testEquityInsertRev request msg:"+msg);
        System.out.println("testEquityInsertRev response msg:"+testApi(msg));
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM