JAVA NIO異步通信框架MINA選型和使用的幾個細節(概述入門,UDP, 心跳)


    Apache MINA 2 是一個開發高性能和高可伸縮性網絡應用程序的網絡應用框架。它提供了一個抽象的事件驅動的異步 API,可以使用 TCP/IP、UDP/IP、串口和虛擬機內部的管道等傳輸方式。Apache MINA 2 可以作為開發網絡應用程序的一個良好基礎。

    Apache MINA是非常著名的基於java nio的通信框架,以前都是自己直接使用udp編程,新項目選型中考慮到網絡通信可能會用到多種通信方式,因此使用了MINA。

     本文結構:

     (1)客戶端和服務器代碼 ;雖然是udp的,但是mina的優美的設計使得所有的通信方式能夠以統一的形式使用,perfect。當然注意的是,不同的通信方式,背后的機理和有效的變量、狀態是有區別的,所以要精通,那還是需要經驗積累和學習的。

     (2)超時 和Session的幾個實際問題

     (3)心跳 ,糾正幾個錯誤

 

     既然是使用,廢話少說,直接整個可用的例子。當然了,這些代碼也不是直接可用的,我們應用的邏輯有點復雜,不會這么簡單使用的。

請參考mina的example包和文檔http://mina.apache.org/udp-tutorial.html 。

 

版本2.0 RC1

1.1 服務器端

  1.                 NioDatagramAcceptor acceptor =  new  NioDatagramAcceptor();  
  2.                 acceptor.setHandler(new  MyIoHandlerAdapter()); //你的業務處理,最簡單的,可以extends IoHandlerAdapter   
  3.   
  4. DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();  
  5. chain.addLast("keep-alive" ,  new  HachiKeepAliveFilterInMina());  //心跳   
  6. chain.addLast("toMessageTyep" ,  new  MyMessageEn_Decoder());   
  7.               //將傳輸的數據轉換成你的業務數據格式。比如下面的是將數據轉換成一行行的文本   
  8.                 //acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));    
  9.   
  10. chain.addLast("logger" ,  new  LoggingFilter());  
  11. DatagramSessionConfig dcfg = acceptor.getSessionConfig();  
  12. dcfg.setReuseAddress(true );  
  13. acceptor.bind(new  InetSocketAddress(ClusterContext.getHeartBeatPort()));  

 

 

1.2 客戶端

  1.               NioDatagramConnector connector =  new  NioDatagramConnector();  
  2. connector.setConnectTimeoutMillis(60000L);  
  3. connector.setConnectTimeoutCheckInterval(10000 );  
  4. connector.setHandler(handler);  
  5.   
  6. DefaultIoFilterChainBuilder chain = connector.getFilterChain();  
  7. chain.addLast("keep-alive" ,  new  HachiKeepAliveFilterInMina()); //心跳   
  8. chain.addLast("toMessageTyep" ,  new  MyMessageEn_Decoder());  
  9. chain.addLast("logger" ,  new  LoggingFilter());  
  10. ConnectFuture connFuture = connector.connect(new  InetSocketAddress( "10.1.1.1" , 8001 ));  
  11. connFuture.awaitUninterruptibly();  
  12. IoSession session = connFuture.getSession();  
  13.                 //發送消息長整型 1000   
  14.               IoBuffer buffer = IoBuffer.allocate(8 );  
  15.               buffer.putLong(1000 );  
  16.               buffer.flip();  
  17.               session.write(buffer);  
  18.                  //關閉連接   
  19.                  session.getCloseFuture().awaitUninterruptibly();  
  20.  connector.dispose();  

 

2. 超時的幾個經驗總結:

    udp session默認是60秒鍾超時,此時狀態為closing,數據就發不出去了。

Session的接口是IoSession,udp的最終實現是NioSession。如果交互在60秒內不能處理完成,就需要使用Keep-alive機制,即心跳機制。

 

3. 心跳 機制

    在代碼中已經使用了心跳機制,是通過mina的filter實現的,mina自身帶的心跳機制好處在於,它附加了處理,讓心跳消息不會傳到業務層,在底層就完成了。

    在上面代碼實現中的HachiKeepAliveFilterInMina如下:

 

  1. public   class  HachiKeepAliveFilterInMina  extends  KeepAliveFilter {  
  2.     private   static   final   int  INTERVAL =  30 ; //in seconds   
  3.     private   static   final   int  TIMEOUT =  10 ;  //in seconds   
  4.       
  5.     public  HachiKeepAliveFilterInMina(KeepAliveMessageFactory messageFactory) {  
  6.         super (messageFactory, IdleStatus.BOTH_IDLE,  new  ExceptionHandler(), INTERVAL, TIMEOUT);  
  7.     }  
  8.       
  9.     public  HachiKeepAliveFilterInMina() {  
  10.         super ( new  KeepAliveMessageFactoryImpl(), IdleStatus.BOTH_IDLE,  new  ExceptionHandler(), INTERVAL, TIMEOUT);  
  11.         this .setForwardEvent( false );  //此消息不會繼續傳遞,不會被業務層看見   
  12.     }  
  13. }  
  14.   
  15. class  ExceptionHandler  implements  KeepAliveRequestTimeoutHandler {     
  16.     public   void  keepAliveRequestTimedOut(KeepAliveFilter filter, IoSession session)  throws  Exception {     
  17.         System.out.println("Connection lost, session will be closed" );     
  18.         session.close(true );   
  19.     }     
  20. }  
  21.   
  22. /**  
  23.  * 繼承於KeepAliveMessageFactory,當心跳機制啟動的時候,需要該工廠類來判斷和定制心跳消息  
  24.  * @author Liu Liu  
  25.  *  
  26.  */   
  27. class  KeepAliveMessageFactoryImpl  implements  KeepAliveMessageFactory {  
  28.     private   static   final   byte  int_req = - 1 ;  
  29.     private   static   final   byte  int_rep = - 2 ;   
  30.     private   static   final  IoBuffer KAMSG_REQ = IoBuffer.wrap( new   byte []{int_req});     
  31.     private   static   final  IoBuffer KAMSG_REP = IoBuffer.wrap( new   byte []{int_rep});    
  32.          
  33.     public  Object getRequest(IoSession session) {     
  34.         return  KAMSG_REQ.duplicate();     
  35.     }     
  36.   
  37.     public  Object getResponse(IoSession session, Object request) {     
  38.         return  KAMSG_REP.duplicate();     
  39.     }     
  40.   
  41.     public   boolean  isRequest(IoSession session, Object message) {    
  42.         if (!(message  instanceof  IoBuffer))  
  43.             return   false ;  
  44.         IoBuffer realMessage = (IoBuffer)message;  
  45.         if (realMessage.limit() !=  1 )  
  46.             return   false ;  
  47.           
  48.         boolean  result = (realMessage.get() == int_req);  
  49.         realMessage.rewind();  
  50.         return  result;  
  51.     }     
  52.   
  53.     public   boolean  isResponse(IoSession session, Object message) {      
  54.         if (!(message  instanceof  IoBuffer))  
  55.             return   false ;  
  56.         IoBuffer realMessage = (IoBuffer)message;  
  57.         if (realMessage.limit() !=  1 )  
  58.             return   false ;  
  59.           
  60.         boolean  result = (realMessage.get() == int_rep);     
  61.         realMessage.rewind();  
  62.         return  result;  
  63.     }     
  64. }  

 

  有人說:心跳機制的filter只需要服務器端具有即可——這是錯誤 的,拍着腦袋想一想,看看factory,你就知道了。心跳需要通信兩端的實現 。

  另外,版本2.0 RC1中,經過測試,當心跳的時間間隔INTERVAL設置為60s(Session的存活時間)的時候心跳會失效,所以最好需要小於60s的間隔。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM