大並發量socket 通信框架MINA介紹


Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 組織一個較新的項目,它為開發高性能和高可用性的網絡應用程序提供了非常便利的框架。當前發行的 MINA 版本支持基於 Java NIO 技術的TCP/UDP 應用程序開發、串口通訊程序。 

Mina 的應用層: 



一個設計成熟的開源框架,總是會僅可能的減少侵入性,並在整個項目中找到合適的位置,而不應對整個項目的構架設計產生過多的影響,圖 1 就是 MINA 的應用層示意圖。從圖中和上節的 DEMO 中我們可以看到, MINA很好的把業務代碼和底層的通信隔離了開來,我們要做的僅僅是建立好監聽,然后寫上我們需要實現的業務邏輯就OK 了。 

MINA 的內部流程: 



(1) IoService :這個接口在一個線程上負責套接字的建立,擁有自己的 Selector ,監聽是否有連接被建立。 

(2) IoProcessor :這個接口在另一個線程上負責檢查是否有數據在通道上讀寫,也就是說它也擁有自己的Selector ,這是與我們使用 JAVA NIO 編碼時的一個不同之處,通常在 JAVA NIO 編碼中,我們都是使用一個 Selector ,也就是不區分 IoService 與 IoProcessor 兩個功能接口。另外, IoProcessor 也是 MINA 框架的核心組件之一 . 在 MINA 框架啟動時,會用一個線程池來專門生成線程,來負責調用注冊在 IoService 上的過濾器,並在過濾器鏈之后調用 IoHandler 。在默認情況 IoProcessor 會用N+1 個線程來輪流詢問監視的端口是否有數據傳送,其中 n 為 cpu 的內核個數。按一般的多線程設計概念來說,IoProcessor 的線程數是越多越好,但實際上並非如此,因為大家都知道, IO 的操作是非常占用資源的,所以項目中的 IoProcessor 的線程數應該根據實際需要來定,而這個數字可以在生成 IoAcceptor 對象時進行設定。 EgIoAcceptor acceptor = new NioSocketAcceptor( N ); 

(3.) IoFilter :這個接口定義一組攔截器,這些攔截器可以包括日志輸出、黑名單過濾,甚至是在過濾器鏈中利用 AOP 寫上權限控制。數據的編碼( write 方向)與解碼( read 方向)等功能,其中數據的 encode 與 decode 是最為重要的、也是您在使用 Mina 時最主要關注的地方。 

(4.) IoHandler :這個接口負責編寫業務邏輯,也就是接收、發送數據的地方。如果大家把業務處理寫好,並寫好業務接口,真正要用時,只需要在此處替換即可,再次見證了 MINA 分層的徹底。 

其中 IoService 接口會專門起一個線程來輪詢是否有新的連接產生,一旦有連接產生則通知 IoProcessor, 而IoProcessor 則起 n+1 個線程來檢查連接是否有數據在上面讀寫。一旦有連接產生,並有數據讀寫,則通知 decode 或 encode ,進行報文的解碼或編碼,將處理后的報文再交給業務類進行業務處理。其中IoProcessor 是處理請求的分配,包括選擇 Selector ,超時驗證,狀態記錄等。總之這個類和 IoService 一起配合工作,封裝了 NIO 底層的實現以及 MINA 框架內部的功能的支持 . 

結合實例,並根據以上的圖文講解,我們可以很輕易的總結出利用 MINA 編程的幾個大致步驟: 

創建一個實現了 IoService 接口的類 

設置一個實現了 IoFilter 接口的過濾器(如果有需要的情況下) 

設置一個 IoHandler 接口實現的處理類,用於處理事件(必須) 

對 IoService 綁定一個端口開始工作 


注:這一點請特別注意,因 IoProcessor 也是相當於輪詢機制,這導致在報文過長時,或其它原因導致報文不能一次傳輸完畢的情況下,必須保存同一連接 ( 在 MINA 中是以 IoSession 類生成的對象 ) 的上一次狀態,這樣才能截取到一個完成的報文,而這也是 decode( 編碼器 ) 需要做的核心工作 。 


Mina 的使用: 

//Mina TCP 服務端 
package com.mina.test; 

import java.util.Date; 

import org.apache.mina.core.service.IoHandlerAdapter; 
import org.apache.mina.core.session.IdleStatus; 
import org.apache.mina.core.session.IoSession; 

public class TimeServerHandler extends IoHandlerAdapter { 
@Override 
public void sessionCreated(IoSession session) throws Exception { 
System.out.println("服務端與客戶端創建連接..."); 
super.sessionCreated(session); 


@Override 
public void sessionOpened(IoSession session) throws Exception { 
System.out.println("服務端與客戶端連接打開..."); 
super.sessionOpened(session); 


@Override 
public void sessionClosed(IoSession session) throws Exception { 
System.out.println("服務端與客戶端連接關閉..."); 
super.sessionClosed(session); 


@Override 
public void messageSent(IoSession session, Object message) throws Exception { 
System.out.println("服務端發送信息成功..."+message.toString()); 
super.messageSent(session, message); 


public void exceptionCaught(IoSession session, Throwable cause) 
throws Exception { 
System.out.println("服務端發送異常..."+cause.getMessage()); 
cause.printStackTrace(); 


public void messageReceived(IoSession session, Object message) 
throws Exception { 
String strMsg = message.toString(); 
System.out.println("服務端接收到的數據為: "+strMsg); 
if (strMsg.trim().equalsIgnoreCase("quit")) { 
session.close(); 
return; 
}
Date date = new Date(); 
session.write(date.toString()); 


public void sessionIdle(IoSession session, IdleStatus status) 
throws Exception { 
System.out.println("服務端進入空閑狀態... " + session.getIdleCount(status)); 




package com.mina.test; 

import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.charset.Charset; 

import org.apache.mina.core.service.IoAcceptor; 
import org.apache.mina.core.session.IdleStatus; 
import org.apache.mina.filter.codec.ProtocolCodecFilter; 
import org.apache.mina.filter.codec.textline.TextLineCodecFactory; 
import org.apache.mina.filter.logging.LoggingFilter; 
import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 

public class MinaTimeServer { 

private static final int PORT = 6488; 

public static void main(String[] args) throws IOException { 

//監聽即將到來的TCP連接,建立監控器 
IoAcceptor acceptor = new NioSocketAcceptor(); 
//設置攔截器 
acceptor.getFilterChain().addLast("logger", new LoggingFilter()); 
acceptor.getFilterChain().addLast( 
"codec", 
new ProtocolCodecFilter(new TextLineCodecFactory(Charset 
.forName("GBK")))); 
//設置處理類 
acceptor.setHandler(new TimeServerHandler()); 
    //設置配置 
acceptor.getSessionConfig().setReadBufferSize(2048); 
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 

//綁定的監聽端口,可多次綁定,也可同時綁定多個。 
acceptor.bind(new InetSocketAddress(PORT)); 
System.out.println("服務端啟動成功......端口號為: "+PORT); 





//Mina TCP客戶端 
package com.mina.test; 

import org.apache.mina.core.buffer.IoBuffer; 
import org.apache.mina.core.service.IoHandlerAdapter; 
import org.apache.mina.core.session.IdleStatus; 
import org.apache.mina.core.session.IoSession; 
import org.apache.mina.proxy.utils.ByteUtilities; 


public class TimeClientHandler extends IoHandlerAdapter { 

@Override 
public void messageReceived(IoSession iosession, Object message) 
throws Exception { 
IoBuffer bbuf = (IoBuffer) message; 
byte[] byten = new byte[bbuf.limit()]; 
bbuf.get(byten, bbuf.position(), bbuf.limit()); 
System.out.println("客戶端收到消息" + ByteUtilities.asHex(byten)); 

@Override 
public void exceptionCaught(IoSession session, Throwable cause) 
throws Exception { 
System.out.println("客戶端異常"); 
super.exceptionCaught(session, cause); 

@Override 
public void messageSent(IoSession iosession, Object obj) throws Exception { 
System.out.println("客戶端消息發送"); 
super.messageSent(iosession, obj); 

@Override 
public void sessionClosed(IoSession iosession) throws Exception { 
System.out.println("客戶端會話關閉"); 
super.sessionClosed(iosession); 

@Override 
public void sessionCreated(IoSession iosession) throws Exception { 
System.out.println("客戶端會話創建"); 
super.sessionCreated(iosession); 

@Override 
public void sessionIdle(IoSession iosession, IdleStatus idlestatus) 
throws Exception { 
System.out.println("客戶端會話休眠"); 
super.sessionIdle(iosession, idlestatus); 

@Override 
public void sessionOpened(IoSession iosession) throws Exception { 
System.out.println("客戶端會話打開"); 
super.sessionOpened(iosession); 





package com.mina.test; 

import java.net.InetSocketAddress; 

import org.apache.mina.core.buffer.IoBuffer; 
import org.apache.mina.core.future.ConnectFuture; 
import org.apache.mina.core.service.IoConnector; 
import org.apache.mina.core.session.IoSession; 
import org.apache.mina.transport.socket.nio.NioSocketConnector; 

public class MinaTimeClient { 

private static final int PORT = 6488; 
private static IoConnector connector; 
private static IoSession session; 

public static void main(String[] args) throws Exception { 

TimeClientHandler clientHandler=new TimeClientHandler(); 
connector = new NioSocketConnector(); 
//設置處理類 
connector.setHandler(clientHandler); 
ConnectFuture connFuture = connector.connect(new InetSocketAddress("localhost", PORT)); 
connFuture.awaitUninterruptibly(); 
session = connFuture.getSession(); 
clientHandler.sessionOpened(session); 
System.out.println("TCP 客戶端啟動"); 

for(int j=0;j<5;j++){ // 發送兩遍 
byte[] bts = new byte[20]; 
for (int i = 0; i < 20; i++) { 
bts[i] = (byte) i; 

IoBuffer buffer = IoBuffer.allocate(20); 
// 自動擴容 
buffer.setAutoExpand(true); 
// 自動收縮 
buffer.setAutoShrink(true); 
buffer.put(bts); 
buffer.flip(); 
session.write(buffer); 
Thread.sleep(2000); 

// 關閉會話,待所有線程處理結束后 
connector.dispose(true); 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM