Mina學習


 
     Apache MINA 2 是一個開發高性能和高可伸縮性網絡應用程序的網絡應用框架。它提供了一個抽象的事件驅動的異步 API,可以使用 TCP/IP、UDP/IP、串口和虛擬機內部的管道等傳輸方式。Apache MINA 2 可以作為開發網絡應用程序的一個良好基礎
優點:
– 異步 
– 無阻塞 
– 事件驅動 
– 支持TCP, UDP, APR, 串口… 
– 通過 過濾器(Filters)實現擴展性 
– 同時提供協議框架
 

 

 
 
Server端應用
對socket通信來說,使用比較廣泛的是基於Server端的應用,尤其是並發規模達到一定程度后,頗具挑戰性。那么我們來看一下,基於MINA框架的Server端應用:
1、IOAcceptor 監聽指定的端口,處理新的網絡連接;一旦一個新的連接到達后,IOAcceptor 就產生一個session,后續所有從這個IP和端口發送過來的請求就將通過這個Session被處理。
2、Session創建后,后續所有的數據包都被人到過濾器鏈中,通過過濾器將原始的字節碼轉變成高層的對象,這個環節PacketEncoder/Decoder就十分有用。
3、最后數據包或對象被傳送給Handler做業務邏輯處理;
 
 
IoAcceptor
主要用於創建新的連接。MINA提供了多種實現,所以幾乎不需要我們自己再去實現:
NioSocketAcceptor:無阻塞的Socket 傳輸Acceptor,針對TCP
NioDatagramAcceptor : 無阻塞的Socket 傳輸Acceptor,針對UDP
AprSocketAcceptor : 阻塞的Socket 傳輸Acceptor,基於 APR
VmPipeSocketAcceptor : the in-VM Acceptor
 

IoConnector

針對Client端的Socket連接,有多種實現:
NioSocketConnector : 無阻塞的Socket 傳輸Connector,針對TCP 
NioDatagramConnector : 無阻塞的Socket 傳輸Connector,針對UDP 
AprSocketConnector : 阻塞的Socket 傳輸Connector,基於 APR 
ProxyConnector : 一個支持代理服務的 Connector ,通過截取連接的請求,並將終端指向代理設置的地址。
SerialConnector : 針對串口傳輸的Connector
VmPipeConnector : the in-VM * Connector*
 
 
Session
任何時候只要有新的連接到來,都會生成一個Session對象,並且一致保存在內存中,只到連接斷開;
 
Session有一系列狀態,如下:
Connected : session被創建,並有效 
Idle : session至少在一個空閑周期(見配置)內沒有處理過任何請求 
Idle for read : 在一個空閑周期內沒有做實際的讀操作
Idle for write : 在一個空閑周期內沒有做實際的寫操作
Idle for both : 在一個空閑周期內沒有做實際的讀和寫操作 
Closing :session正在被關閉
Closed : session已經被關閉
 
 
實現斷線重連 

一、斷線重連的方式

    1. 在創建Mina客戶端時增加一個監聽器,或者增加一個攔截器,當檢測到Session關閉時,自動進行重連。

 
    

 

 

    2. 在第1種方式的基礎上,增加客戶端的讀寫通道空閑檢查,當發生Session關閉或者讀寫空閑時,進行重連。

 
    
 
 
第一種方式比較傳統,優點是簡單方便,適合網絡穩定、數據量不大(1M帶寬以下)的環境;不過缺點是不能對系統級的連接斷開阻塞進行捕獲。
第二種方式更加精細,基本上能捕獲到應用、網絡、系統級的斷連。
 
 
二、重連目的:
        在使用Mina做為客戶端時,往往因為網絡、服務器、應用程序出現問題而導致連接斷開,而自動重連,就是解決連接斷開的唯一方式。如果網線斷開、服務器宕機、應用程序掛了,都是斷線的原因,這個時候,通過增加一個監聽器或者攔截器,就能實現重連。但是生產環境中,斷線的原因可能更復雜:網絡不穩定、延時、服務器負載高、服務器或者應用程序的發送或者接收緩沖區滿等等問題都可能導致數據傳輸過程出現類似於斷線的情況,這個時候,光檢測Session關閉是遠遠不夠的,這個時候就需要一種重連機制,比如讀寫空閑超過30秒,就進行重連。對於數據不間斷、實時性高、數據量大的應用場景,更是實用。  
 
三、代碼實現
package com.yitop.feng.service;

import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.service.IoService;
import org.apache.mina.core.service.IoServiceListener;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

/**
 * @author fengzp
 * @date 16/8/23
 * @email fengzp@gzyitop.com
 * @company 廣州易站通計算機科技有限公司
 */
public class MinaClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(MinaClient.class);

    private NioSocketConnector connector;
    private IoSession session;
    private String hostname = "127.0.0.1";
    private int port = 8899;

    public MinaClient(){
        init();
    }

    private void init(){
        connector = new NioSocketConnector();

        connector.getFilterChain().addLast("logger", new LoggingFilter());
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"), "]", "]")));

        /**
         * 一、使用監聽器或攔截器實現斷線重連
         * 二、在第1種方式的基礎上,增加客戶端的讀寫通道空閑檢查,當發生Session關閉或者讀寫空閑時,進行重連
         */
        //使用攔截器實現斷線重連
        connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter(){
            @Override
            public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
                reconnect();
            }
        });
        //使用監聽器實現斷線重連
        connector.addListener(new MyIoServiceListener());


        connector.getSessionConfig().setReceiveBufferSize(10240);   // 設置接收緩沖區的大小
        connector.getSessionConfig().setSendBufferSize(10240);// 設置輸出緩沖區的大小
        connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30000);  //讀寫都空閑時間:30秒
        connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40000);//讀(接收通道)空閑時間:40秒
        connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50000);//寫(發送通道)空閑時間:50秒

        //在數據處理器IoHandler中sessionIdle方法中加入Session會話關閉的代碼,這樣session關閉就能傳遞到攔截器或者監聽器中,然后實現重連。
        connector.setHandler(new MyIoHandlerConnector());

        connect();
    }

    private void connect(){
        ConnectFuture future = connector.connect(new InetSocketAddress(hostname, port));

        future.awaitUninterruptibly(10000);

        session = future.getSession();
    }

    private void reconnect(){
        while (true){
            try {
                Thread.sleep(3000);
                connect();
                if (session != null && session.isConnected()) {
                    LOGGER.info("斷線重連成功");
                    break;
                }
            }catch (Exception e){
                LOGGER.error("斷線重連失敗", e);
            }
        }
    }

    class MyIoServiceListener implements IoServiceListener{

        public void serviceActivated(IoService ioService) throws Exception {

        }

        public void serviceIdle(IoService ioService, IdleStatus idleStatus) throws Exception {

        }

        public void serviceDeactivated(IoService ioService) throws Exception {

        }

        public void sessionCreated(IoSession ioSession) throws Exception {

        }

        public void sessionDestroyed(IoSession ioSession) throws Exception {
            reconnect();
        }
    }

    class MyIoHandlerConnector extends IoHandlerAdapter{

        @Override
        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
            if(session != null){
                //這里關閉session就會觸發攔截器或者監聽器,從而實現讀寫空閑時重連
                session.close(true);
            }
        }

        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
        }

        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            String req = message==null?"":message.toString();
            System.err.println("Client receive: "+req);
        }

        @Override
        public void sessionCreated(IoSession session) throws Exception {
            IoSessionConfig config = session.getConfig();
            if (config instanceof SocketSessionConfig) {
                SocketSessionConfig sessionConfig = (SocketSessionConfig) config;
                sessionConfig.setKeepAlive(true);// 長連接
            }
            System.out.println("Client session create");
        }

        @Override
        public void sessionClosed(IoSession session) throws Exception {
            super.sessionClosed(session);
            System.out.println("Client session create");
        }
    }

    public static void main(String[] args) {
        new MinaClient();
    }
}
View Code

 

實現mina心跳驗證
一、心跳機制

簡單介紹下keepAlive的機制:

首先,需要搞清楚TCP keepalive是干什么用的。從名字理解就能夠知道,keepalive就是用來檢測一個tcp connection是否還連接正常。當一個tcp connection建立好之后,如果雙方都不發送數據的話,tcp協議本身是不會發送其它的任何數據的,也就是說,在一個idle的connection上,兩個socket之間不產生任何的數據交換。從另一個方面講,當一個connection建立之后,鏈接雙方可以長時間的不發送任何數據,比如幾天,幾星期甚至幾個月,但該connection仍然存在。

所以,這就可能出現一個問題。舉例來說,server和client建立了一個connection,server負責接收client的request。當connection建立好之后,client由於某種原因機器停機了。但server端並不知道,所以server就會一直監聽着這個connection,但其實這個connection已經失效了。

keepalive就是為這樣的場景准備的。當把一個socket設置成了keepalive,那么這個socket空閑一段時間后,它就會向對方發送數據來確認對方仍然存在。放在上面的例子中,如果client停機了,那么server所發送的keepalive數據就不會有response,這樣server就能夠確認client完蛋了(至少從表面上看是這樣)。

 

MINA本身提供了一個過濾器類:KeepAliveFilter ,該過濾器用於在IO空閑的時候發送並且反饋心跳包(keep-alive request/response)。 

說到KeepAliveFilter這個類有必要先說一說其構造函數,即實例化該類需要些什么,該類構造函數中參數有三個分別是: 
(1)KeepAvlieMessageFactory:   該實例引用用於判斷接受與發送的包是否是心跳包,以及心跳請求包的實現 
(2)IdleStatus:                              該過濾器所關注的空閑狀態,默認認為讀取空閑。 即當讀取通道空閑的時候發送心跳包 
(3)KeepAliveRequestTimeoutHandler: 心跳包請求后超時無反饋情況下的處理機制  默認為CLOSE  即關閉連接 

 

其實我們自己做下這兩個接口的實現類就搞定了KeepAvlieMessageFactory、KeepAliveRequestTimeoutHandler

KeepAvlieMessageFactoryImpl中主要是定義心跳包的內容

KeepAliveRequestTimeoutHandlerImpl 主要是定義超時后的處理方式,通常是多次超時后就斷開

 

二、測試例子

package com.yitop.feng.service;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

/**
 * @author fengzp
 * @date 16/8/23
 * @email fengzp@gzyitop.com
 * @company 廣州易站通計算機科技有限公司
 */
public class MinaServer {

    private NioSocketAcceptor acceptor;
    private int port = 8899;

    /** 30秒后超時 */
    private static final int IDELTIMEOUT = 30;
    /** 15秒發送一次心跳包 */
    private static final int HEARTBEATRATE = 15;

    public MinaServer() throws IOException {
        init();
    }

    private void init() throws IOException {
        acceptor = new NioSocketAcceptor();

        acceptor.getFilterChain().addLast("logger", new LoggingFilter());
        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"))));

        KeepAliveFilter kaf = new KeepAliveFilter(new MyKeepAliveMessageFactory(), IdleStatus.BOTH_IDLE);
        kaf .setRequestInterval(HEARTBEATRATE);//設置心跳頻率
        kaf.setRequestTimeout(IDELTIMEOUT);//設置空閑超時時間
        kaf.setForwardEvent(false); //idle事件回發, false表示當session進入idle狀態的時候,不在調用handler中的sessionIdle方法
        acceptor.getFilterChain().addLast("heart", kaf);

        acceptor.setHandler(new MyIoHandlerAdapter());

        acceptor.getSessionConfig().setReadBufferSize(2048);
        acceptor.setReuseAddress(true);// 端口可重用
        acceptor.bind(new InetSocketAddress(port));
    }

    class MyKeepAliveMessageFactory implements KeepAliveMessageFactory{

        String req = "+";
        String resp = "-";

        public MyKeepAliveMessageFactory(){
        }

        public boolean isRequest(IoSession ioSession, Object o) {
            boolean isReq = (o!=null && req.equals(o.toString()));

            System.out.println(o.toString() + " : isRequest : " + isReq);

            return isReq;
        }

        public boolean isResponse(IoSession ioSession, Object o) {
            boolean isResp = (o!=null && resp.equals(o.toString()));

            System.out.println(o.toString() + " : isResponse : " + isResp);

            return isResp;
        }

        public Object getRequest(IoSession ioSession) {
            System.out.println("getRequest");
            return req;
        }

        public Object getResponse(IoSession ioSession, Object o) {
            System.out.println("getResponse");
            return resp;
        }
    }

//    class MyKeepAliveRequestTimeoutHandler implements KeepAliveRequestTimeoutHandler {
//
//        public void keepAliveRequestTimedOut(KeepAliveFilter keepAliveFilter, IoSession ioSession) throws Exception {
//
//        }
//    }

    class MyIoHandlerAdapter extends IoHandlerAdapter{

        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            String req = message==null?"":message.toString();
            System.err.println("Server receive: "+req);

            session.write("hi client");
        }

        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            String resp = message==null?"":message.toString();
            System.err.println("Server sent: "+resp);
        }

        @Override
        public void sessionCreated(IoSession session) throws Exception {
            IoSessionConfig config = session.getConfig();
            if (config instanceof SocketSessionConfig) {
                SocketSessionConfig sessionConfig = (SocketSessionConfig) config;
                sessionConfig.setKeepAlive(true);// 長連接
            }
            System.out.println("Server session create");
        }

        @Override
        public void sessionClosed(IoSession session) throws Exception {
            super.sessionClosed(session);
            System.out.println("Server session close");
        }
    }

    public static void main(String[] args) throws IOException {
        new MinaServer();
    }
}

提供server后,分兩種情況:

1.接受客戶端心跳包,日志輸出:

+ : isRequest : true
getResponse
+ : isResponse : false
+ : isRequest : true
14:14:29.262 [NioProcessor-2] INFO o.a.m.filter.logging.LoggingFilter - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
- : isRequest : false
- : isResponse : true

2.主動發送心跳包到客戶端,日志輸出:

發送

getRequest

14:14:44.313 [NioProcessor-2] INFO o.a.m.filter.logging.LoggingFilter - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
+ : isRequest : true

 

收到客戶端響應

- : isRequest : false
- : isResponse : true
- : isRequest : false
- : isResponse : true

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM