Apache MINA 2 是一個開發高性能和高可伸縮性網絡應用程序的網絡應用框架。它提供了一個抽象的事件驅動的異步 API,可以使用 TCP/IP、UDP/IP、串口和虛擬機內部的管道等傳輸方式。Apache MINA 2 可以作為開發網絡應用程序的一個良好基礎。
Apache MINA是非常著名的基於java nio的通信框架,以前都是自己直接使用udp編程,新項目選型中考慮到網絡通信可能會用到多種通信方式,因此使用了MINA。
本文結構:
(1)客戶端和服務器代碼 ;雖然是udp的,但是mina的優美的設計使得所有的通信方式能夠以統一的形式使用,perfect。當然注意的是,不同的通信方式,背后的機理和有效的變量、狀態是有區別的,所以要精通,那還是需要經驗積累和學習的。
(2)超時 和Session的幾個實際問題
(3)心跳 ,糾正幾個錯誤
既然是使用,廢話少說,直接整個可用的例子。當然了,這些代碼也不是直接可用的,我們應用的邏輯有點復雜,不會這么簡單使用的。
請參考mina的example包和文檔http://mina.apache.org/udp-tutorial.html 。
版本2.0 RC1
- NioDatagramAcceptor acceptor = new NioDatagramAcceptor();
- acceptor.setHandler(new MyIoHandlerAdapter()); //你的業務處理,最簡單的,可以extends IoHandlerAdapter
- DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
- chain.addLast("keep-alive" , new HachiKeepAliveFilterInMina()); //心跳
- chain.addLast("toMessageTyep" , new MyMessageEn_Decoder());
- //將傳輸的數據轉換成你的業務數據格式。比如下面的是將數據轉換成一行行的文本
- //acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
- chain.addLast("logger" , new LoggingFilter());
- DatagramSessionConfig dcfg = acceptor.getSessionConfig();
- dcfg.setReuseAddress(true );
- acceptor.bind(new InetSocketAddress(ClusterContext.getHeartBeatPort()));
1.2 客戶端
- NioDatagramConnector connector = new NioDatagramConnector();
- connector.setConnectTimeoutMillis(60000L);
- connector.setConnectTimeoutCheckInterval(10000 );
- connector.setHandler(handler);
- DefaultIoFilterChainBuilder chain = connector.getFilterChain();
- chain.addLast("keep-alive" , new HachiKeepAliveFilterInMina()); //心跳
- chain.addLast("toMessageTyep" , new MyMessageEn_Decoder());
- chain.addLast("logger" , new LoggingFilter());
- ConnectFuture connFuture = connector.connect(new InetSocketAddress( "10.1.1.1" , 8001 ));
- connFuture.awaitUninterruptibly();
- IoSession session = connFuture.getSession();
- //發送消息長整型 1000
- IoBuffer buffer = IoBuffer.allocate(8 );
- buffer.putLong(1000 );
- buffer.flip();
- session.write(buffer);
- //關閉連接
- session.getCloseFuture().awaitUninterruptibly();
- connector.dispose();
udp session默認是60秒鍾超時,此時狀態為closing,數據就發不出去了。
Session的接口是IoSession,udp的最終實現是NioSession。如果交互在60秒內不能處理完成,就需要使用Keep-alive機制,即心跳機制。
在代碼中已經使用了心跳機制,是通過mina的filter實現的,mina自身帶的心跳機制好處在於,它附加了處理,讓心跳消息不會傳到業務層,在底層就完成了。
在上面代碼實現中的HachiKeepAliveFilterInMina如下:
- public class HachiKeepAliveFilterInMina extends KeepAliveFilter {
- private static final int INTERVAL = 30 ; //in seconds
- private static final int TIMEOUT = 10 ; //in seconds
- public HachiKeepAliveFilterInMina(KeepAliveMessageFactory messageFactory) {
- super (messageFactory, IdleStatus.BOTH_IDLE, new ExceptionHandler(), INTERVAL, TIMEOUT);
- }
- public HachiKeepAliveFilterInMina() {
- super ( new KeepAliveMessageFactoryImpl(), IdleStatus.BOTH_IDLE, new ExceptionHandler(), INTERVAL, TIMEOUT);
- this .setForwardEvent( false ); //此消息不會繼續傳遞,不會被業務層看見
- }
- }
- class ExceptionHandler implements KeepAliveRequestTimeoutHandler {
- public void keepAliveRequestTimedOut(KeepAliveFilter filter, IoSession session) throws Exception {
- System.out.println("Connection lost, session will be closed" );
- session.close(true );
- }
- }
- /**
- * 繼承於KeepAliveMessageFactory,當心跳機制啟動的時候,需要該工廠類來判斷和定制心跳消息
- * @author Liu Liu
- *
- */
- class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory {
- private static final byte int_req = - 1 ;
- private static final byte int_rep = - 2 ;
- private static final IoBuffer KAMSG_REQ = IoBuffer.wrap( new byte []{int_req});
- private static final IoBuffer KAMSG_REP = IoBuffer.wrap( new byte []{int_rep});
- public Object getRequest(IoSession session) {
- return KAMSG_REQ.duplicate();
- }
- public Object getResponse(IoSession session, Object request) {
- return KAMSG_REP.duplicate();
- }
- public boolean isRequest(IoSession session, Object message) {
- if (!(message instanceof IoBuffer))
- return false ;
- IoBuffer realMessage = (IoBuffer)message;
- if (realMessage.limit() != 1 )
- return false ;
- boolean result = (realMessage.get() == int_req);
- realMessage.rewind();
- return result;
- }
- public boolean isResponse(IoSession session, Object message) {
- if (!(message instanceof IoBuffer))
- return false ;
- IoBuffer realMessage = (IoBuffer)message;
- if (realMessage.limit() != 1 )
- return false ;
- boolean result = (realMessage.get() == int_rep);
- realMessage.rewind();
- return result;
- }
- }
有人說:心跳機制的filter只需要服務器端具有即可——這是錯誤 的,拍着腦袋想一想,看看factory,你就知道了。心跳需要通信兩端的實現 。
另外,版本2.0 RC1中,經過測試,當心跳的時間間隔INTERVAL設置為60s(Session的存活時間)的時候心跳會失效,所以最好需要小於60s的間隔。