
IoConnector
NioDatagramConnector : 無阻塞的Socket 傳輸Connector,針對UDP
Idle : session至少在一個空閑周期(見配置)內沒有處理過任何請求
一、斷線重連的方式;
1. 在創建Mina客戶端時增加一個監聽器,或者增加一個攔截器,當檢測到Session關閉時,自動進行重連。
2. 在第1種方式的基礎上,增加客戶端的讀寫通道空閑檢查,當發生Session關閉或者讀寫空閑時,進行重連。


package com.yitop.feng.service; import org.apache.mina.core.filterchain.IoFilterAdapter; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.service.IoService; import org.apache.mina.core.service.IoServiceListener; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.session.IoSessionConfig; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.SocketSessionConfig; import org.apache.mina.transport.socket.nio.NioSocketConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.nio.charset.Charset; /** * @author fengzp * @date 16/8/23 * @email fengzp@gzyitop.com * @company 廣州易站通計算機科技有限公司 */ public class MinaClient { private static final Logger LOGGER = LoggerFactory.getLogger(MinaClient.class); private NioSocketConnector connector; private IoSession session; private String hostname = "127.0.0.1"; private int port = 8899; public MinaClient(){ init(); } private void init(){ connector = new NioSocketConnector(); connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"), "]", "]"))); /** * 一、使用監聽器或攔截器實現斷線重連 * 二、在第1種方式的基礎上,增加客戶端的讀寫通道空閑檢查,當發生Session關閉或者讀寫空閑時,進行重連 */ //使用攔截器實現斷線重連 connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter(){ @Override public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { reconnect(); } }); //使用監聽器實現斷線重連 connector.addListener(new MyIoServiceListener()); connector.getSessionConfig().setReceiveBufferSize(10240); // 設置接收緩沖區的大小 connector.getSessionConfig().setSendBufferSize(10240);// 設置輸出緩沖區的大小 connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30000); //讀寫都空閑時間:30秒 connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40000);//讀(接收通道)空閑時間:40秒 connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50000);//寫(發送通道)空閑時間:50秒 //在數據處理器IoHandler中sessionIdle方法中加入Session會話關閉的代碼,這樣session關閉就能傳遞到攔截器或者監聽器中,然后實現重連。 connector.setHandler(new MyIoHandlerConnector()); connect(); } private void connect(){ ConnectFuture future = connector.connect(new InetSocketAddress(hostname, port)); future.awaitUninterruptibly(10000); session = future.getSession(); } private void reconnect(){ while (true){ try { Thread.sleep(3000); connect(); if (session != null && session.isConnected()) { LOGGER.info("斷線重連成功"); break; } }catch (Exception e){ LOGGER.error("斷線重連失敗", e); } } } class MyIoServiceListener implements IoServiceListener{ public void serviceActivated(IoService ioService) throws Exception { } public void serviceIdle(IoService ioService, IdleStatus idleStatus) throws Exception { } public void serviceDeactivated(IoService ioService) throws Exception { } public void sessionCreated(IoSession ioSession) throws Exception { } public void sessionDestroyed(IoSession ioSession) throws Exception { reconnect(); } } class MyIoHandlerConnector extends IoHandlerAdapter{ @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { if(session != null){ //這里關閉session就會觸發攔截器或者監聽器,從而實現讀寫空閑時重連 session.close(true); } } @Override public void messageSent(IoSession session, Object message) throws Exception { } @Override public void messageReceived(IoSession session, Object message) throws Exception { String req = message==null?"":message.toString(); System.err.println("Client receive: "+req); } @Override public void sessionCreated(IoSession session) throws Exception { IoSessionConfig config = session.getConfig(); if (config instanceof SocketSessionConfig) { SocketSessionConfig sessionConfig = (SocketSessionConfig) config; sessionConfig.setKeepAlive(true);// 長連接 } System.out.println("Client session create"); } @Override public void sessionClosed(IoSession session) throws Exception { super.sessionClosed(session); System.out.println("Client session create"); } } public static void main(String[] args) { new MinaClient(); } }
實現mina心跳驗證
簡單介紹下keepAlive的機制:
首先,需要搞清楚TCP keepalive是干什么用的。從名字理解就能夠知道,keepalive就是用來檢測一個tcp connection是否還連接正常。當一個tcp connection建立好之后,如果雙方都不發送數據的話,tcp協議本身是不會發送其它的任何數據的,也就是說,在一個idle的connection上,兩個socket之間不產生任何的數據交換。從另一個方面講,當一個connection建立之后,鏈接雙方可以長時間的不發送任何數據,比如幾天,幾星期甚至幾個月,但該connection仍然存在。
所以,這就可能出現一個問題。舉例來說,server和client建立了一個connection,server負責接收client的request。當connection建立好之后,client由於某種原因機器停機了。但server端並不知道,所以server就會一直監聽着這個connection,但其實這個connection已經失效了。
keepalive就是為這樣的場景准備的。當把一個socket設置成了keepalive,那么這個socket空閑一段時間后,它就會向對方發送數據來確認對方仍然存在。放在上面的例子中,如果client停機了,那么server所發送的keepalive數據就不會有response,這樣server就能夠確認client完蛋了(至少從表面上看是這樣)。
MINA本身提供了一個過濾器類:KeepAliveFilter ,該過濾器用於在IO空閑的時候發送並且反饋心跳包(keep-alive request/response)。
說到KeepAliveFilter這個類有必要先說一說其構造函數,即實例化該類需要些什么,該類構造函數中參數有三個分別是:
(1)KeepAvlieMessageFactory: 該實例引用用於判斷接受與發送的包是否是心跳包,以及心跳請求包的實現
(2)IdleStatus: 該過濾器所關注的空閑狀態,默認認為讀取空閑。 即當讀取通道空閑的時候發送心跳包
(3)KeepAliveRequestTimeoutHandler: 心跳包請求后超時無反饋情況下的處理機制 默認為CLOSE 即關閉連接
其實我們自己做下這兩個接口的實現類就搞定了KeepAvlieMessageFactory、KeepAliveRequestTimeoutHandler
KeepAvlieMessageFactoryImpl中主要是定義心跳包的內容
KeepAliveRequestTimeoutHandlerImpl 主要是定義超時后的處理方式,通常是多次超時后就斷開
二、測試例子
package com.yitop.feng.service; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.session.IoSessionConfig; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.keepalive.KeepAliveFilter; import org.apache.mina.filter.keepalive.KeepAliveMessageFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.SocketSessionConfig; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; /** * @author fengzp * @date 16/8/23 * @email fengzp@gzyitop.com * @company 廣州易站通計算機科技有限公司 */ public class MinaServer { private NioSocketAcceptor acceptor; private int port = 8899; /** 30秒后超時 */ private static final int IDELTIMEOUT = 30; /** 15秒發送一次心跳包 */ private static final int HEARTBEATRATE = 15; public MinaServer() throws IOException { init(); } private void init() throws IOException { acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast("logger", new LoggingFilter()); acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8")))); KeepAliveFilter kaf = new KeepAliveFilter(new MyKeepAliveMessageFactory(), IdleStatus.BOTH_IDLE); kaf .setRequestInterval(HEARTBEATRATE);//設置心跳頻率 kaf.setRequestTimeout(IDELTIMEOUT);//設置空閑超時時間 kaf.setForwardEvent(false); //idle事件回發, false表示當session進入idle狀態的時候,不在調用handler中的sessionIdle方法 acceptor.getFilterChain().addLast("heart", kaf); acceptor.setHandler(new MyIoHandlerAdapter()); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.setReuseAddress(true);// 端口可重用 acceptor.bind(new InetSocketAddress(port)); } class MyKeepAliveMessageFactory implements KeepAliveMessageFactory{ String req = "+"; String resp = "-"; public MyKeepAliveMessageFactory(){ } public boolean isRequest(IoSession ioSession, Object o) { boolean isReq = (o!=null && req.equals(o.toString())); System.out.println(o.toString() + " : isRequest : " + isReq); return isReq; } public boolean isResponse(IoSession ioSession, Object o) { boolean isResp = (o!=null && resp.equals(o.toString())); System.out.println(o.toString() + " : isResponse : " + isResp); return isResp; } public Object getRequest(IoSession ioSession) { System.out.println("getRequest"); return req; } public Object getResponse(IoSession ioSession, Object o) { System.out.println("getResponse"); return resp; } } // class MyKeepAliveRequestTimeoutHandler implements KeepAliveRequestTimeoutHandler { // // public void keepAliveRequestTimedOut(KeepAliveFilter keepAliveFilter, IoSession ioSession) throws Exception { // // } // } class MyIoHandlerAdapter extends IoHandlerAdapter{ @Override public void messageReceived(IoSession session, Object message) throws Exception { String req = message==null?"":message.toString(); System.err.println("Server receive: "+req); session.write("hi client"); } @Override public void messageSent(IoSession session, Object message) throws Exception { String resp = message==null?"":message.toString(); System.err.println("Server sent: "+resp); } @Override public void sessionCreated(IoSession session) throws Exception { IoSessionConfig config = session.getConfig(); if (config instanceof SocketSessionConfig) { SocketSessionConfig sessionConfig = (SocketSessionConfig) config; sessionConfig.setKeepAlive(true);// 長連接 } System.out.println("Server session create"); } @Override public void sessionClosed(IoSession session) throws Exception { super.sessionClosed(session); System.out.println("Server session close"); } } public static void main(String[] args) throws IOException { new MinaServer(); } }
提供server后,分兩種情況:
1.接受客戶端心跳包,日志輸出:
+ : isRequest : true
getResponse
+ : isResponse : false
+ : isRequest : true
14:14:29.262 [NioProcessor-2] INFO o.a.m.filter.logging.LoggingFilter - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
- : isRequest : false
- : isResponse : true
2.主動發送心跳包到客戶端,日志輸出:
發送
getRequest
14:14:44.313 [NioProcessor-2] INFO o.a.m.filter.logging.LoggingFilter - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
+ : isRequest : true
收到客戶端響應
- : isRequest : false
- : isResponse : true
- : isRequest : false
- : isResponse : true