Apache Mina Server 是一個網絡通信應用框架,也就是說,它主要是對基於TCP/IP、UDP/IP協議棧的通信框架(當然,也可以提供JAVA 對象的序列化服務、虛擬機管道通信服務等),Mina 可以幫助我們快速開發高性能、高擴展性的網絡通信應用,Mina 提供了事件驅動、異步(Mina 的異步IO 默認使用的是JAVA NIO 作為底層支持)操作的編程模型。
Mina 主要有1.x 和2.x 兩個分支,這里我們講解最新版本2.0,如果你使用的是Mina 1.x,
那么可能會有一些功能並不適用。學習本文檔,需要你已掌握JAVA IO、JAVA NIO、JAVA
Socket、JAVA 線程及並發庫(java.util.concurrent.*)的知識。
Mina 同時提供了網絡通信的Server 端、Client 端的封裝,無論是哪端,Mina 在整個網通
通信結構中都處於如下的位置:
可見Mina 的API 將真正的網絡通信與我們的應用程序隔離開來,你只需要關心你要發送、
接收的數據以及你的業務邏輯即可。
同樣的,無論是哪端,Mina 的執行流程如下所示:
(1.) IoService:這個接口在一個線程上負責套接字的建立,擁有自己的Selector,監
聽是否有連接被建立。
(2.) IoProcessor:這個接口在另一個線程上負責檢查是否有數據在通道上讀寫,也就是
說它也擁有自己的Selector,這是與我們使用JAVA NIO 編碼時的一個不同之處,
通常在JAVA NIO 編碼中,我們都是使用一個Selector,也就是不區分IoService
與IoProcessor 兩個功能接口。另外,IoProcessor 負責調用注冊在IoService 上
的過濾器,並在過濾器鏈之后調用IoHandler。
(3.) IoFilter:這個接口定義一組攔截器,這些攔截器可以包括日志輸出、黑名單過濾、
數據的編碼(write 方向)與解碼(read 方向)等功能,其中數據的encode 與decode
是最為重要的、也是你在使用Mina 時最主要關注的地方。
(4.) IoHandler:這個接口負責編寫業務邏輯,也就是接收、發送數據的地方。
1. 簡單的TCPServer:
(1.) 第一步:編寫IoService
按照上面的執行流程,我們首先需要編寫IoService,IoService 本身既是服務端,又是客
戶端,我們這里編寫服務端,所以使用IoAcceptor 實現,由於IoAcceptor 是與協議無關的,
因為我們要編寫TCPServer,所以我們使用IoAcceptor 的實現NioSocketAcceptor,實際上
底層就是調用java.nio.channels.ServerSocketChannel 類。當然,如果你使用了Apache 的
APR 庫,那么你可以選擇使用AprSocketAcceptor 作為TCPServer 的實現,據傳說Apache APR庫的性能比JVM 自帶的本地庫高出很多。
那么IoProcessor 是由指定的IoService 內部創建並調用的,我們並不需要關心。
1 public class MyServer { 2 //main方法: 3 IoAcceptor acceptor=new NioSocketAcceptor(); 4 acceptor.getSessionConfig().setReadBufferSize(2048); 5 acceptor.getSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,10); 6 acceptor.bind(new InetSocketAddress(9123)); 7 }
這段代碼我們初始化了服務端的TCP/IP 的基於NIO 的套接字,然后調用IoSessionConfig
設置讀取數據的緩沖區大小、讀寫通道均在10 秒內無任何操作就進入空閑狀態。
(2.) 第二步:編寫過濾器
這里我們處理最簡單的字符串傳輸,Mina 已經為我們提供了TextLineCodecFactory 編解碼
器工廠來對字符串進行編解碼處理。
1 acceptor.getFilterChain().addLast("codec", 2 new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), 3 LineDelimeter.WINDOWS.getValue(), 4 LineDelimiter. WINDOWS.getValue())));
這段代碼要在acceptor.bind()方法之前執行,因為綁定套接字之后就不能再做這些准備工
作了。
這里先不用清楚編解碼器是如何工作的,這個是后面重點說明的內容,這里你只需要清楚,
我們傳輸的以換行符為標識的數據,所以使用了Mina 自帶的換行符編解碼器工廠。
(3.) 第三步:編寫IoHandler
這里我們只是簡單的打印Client 傳說過來的數據
1 public class MyIoHandler extends IoHandlerAdapter { 2 // 這里我們使用的SLF4J作為日志門面,至於為什么在后面說明。 3 private final static Logger log = LoggerFactory 4 .getLogger(MyIoHandler.class); 5 @Override 6 public void messageReceived(IoSession session, Object message) 7 throws Exception { 8 String str = message.toString(); 9 log.info("The message received is [" + str + "]"); 10 if (str.endsWith("quit")) { 11 session.close(true); 12 return; 13 } 14 } 15 }
然后我們把這個IoHandler 注冊到IoService:
acceptor.setHandler(new MyIoHandler());
當然這段代碼也要在acceptor.bind()方法之前執行。
然后我們運行MyServer 中的main 方法,你可以看到控制台一直處於阻塞狀態,此時,我們
用telnet 127.0.0.1 9123 訪問,然后輸入一些內容,當按下回車鍵,你會發現數據在
Server 端被輸出,但要注意不要輸入中文,因為Windows 的命令行窗口不會對傳輸的數據
進行UTF-8 編碼。當輸入quit 結尾的字符串時,連接被斷開。
這里注意你如果使用的操作系統,或者使用的Telnet 軟件的換行符是什么,如果不清楚,
可以刪掉第二步中的兩個紅色的參數,使用TextLineCodec 內部的自動識別機制。
_______________________________________________________________________________
2. 簡單的TCPClient:
這里我們實現Mina 中的TCPClient,因為前面說過無論是Server 端還是Client 端,在Mina
中的執行流程都是一樣的。唯一不同的就是IoService 的Client 端實現是IoConnector。
(1.) 第一步:編寫IoService並注冊過濾器
1 public class MyClient { 2 main方法: 3 IoConnector connector=new NioSocketConnector(); 4 connector.setConnectTimeoutMillis(30000); 5 connector.getFilterChain().addLast("codec", 6 new ProtocolCodecFilter( 7 new TextLineCodecFactory( 8 Charset.forName("UTF-8"), 9 LineDelimiter.WINDOWS.getValue(), 10 LineDelimiter.WINDOWS.getValue() 11 ) 12 ) 13 ); 14 connector.connect(new InetSocketAddress("localhost", 9123)); 15 }
(2.) 第三步:編寫IoHandler
1 public class ClientHandler extends IoHandlerAdapter { 2 private final static Logger LOGGER = LoggerFactory 3 .getLogger(ClientHandler.class); 4 private final String values; 5 public ClientHandler(String values) { 6 this.values = values; 7 } 8 @Override 9 public void sessionOpened(IoSession session) { 10 session.write(values); 11 } 12 }
注冊IoHandler:
connector.setHandler(new ClientHandler("你好!\r\n 大家好!"));
然后我們運行MyClient,你會發現MyServer 輸出如下語句:
The message received is [你好!]
The message received is [大家好!]
我們看到服務端是按照收到兩條消息輸出的,因為我們用的編解碼器是以換行符判斷數據是
否讀取完畢的。
_______________________________________________________________________________
3. 介紹Mina的TCP的主要接口:
通過上面的兩個示例,你應該對Mina 如何編寫TCP/IP 協議棧的網絡通信有了一些感性的認
識。
(1.)IoService:
這個接口是服務端IoAcceptor、客戶端IoConnector 的抽象,提供IO 服務和管理IoSession
的功能,它有如下幾個常用的方法:
A. TransportMetadata getTransportMetadata():
這個方法獲取傳輸方式的元數據描述信息,也就是底層到底基於什么的實現,譬如:nio、
apr 等。
B. void addListener(IoServiceListener listener):
這個方法可以為IoService 增加一個監聽器,用於監聽IoService 的創建、活動、失效、空
閑、銷毀,具體可以參考IoServiceListener 接口中的方法,這為你參與IoService 的生命
周期提供了機會。
C. void removeListener(IoServiceListener listener):
這個方法用於移除上面的方法添加的監聽器。
D. void setHandler(IoHandler handler):
這個方法用於向IoService 注冊IoHandler,同時有getHandler()方法獲取Handler。
E. Map<Long,IoSession> getManagedSessions():
這個方法獲取IoService 上管理的所有IoSession,Map 的key 是IoSession 的id。
F. IoSessionConfig getSessionConfig():
這個方法用於獲取IoSession 的配置對象,通過IoSessionConfig 對象可以設置Socket 連
接的一些選項。
_______________________________________________________________________________
(2.)IoAcceptor:
這個接口是TCPServer 的接口,主要增加了void bind()監聽端口、void unbind()解除對
套接字的監聽等方法。這里與傳統的JAVA 中的ServerSocket 不同的是IoAcceptor 可以多
次調用bind()方法(或者在一個方法中傳入多個SocketAddress 參數)同時監聽多個端口。
_______________________________________________________________________________
(3.)IoConnector:
這個接口是TCPClient 的接口, 主要增加了ConnectFuture connect(SocketAddress
remoteAddress,SocketAddress localAddress)方法,用於與Server 端建立連接,第二個參
數如果不傳遞則使用本地的一個隨機端口訪問Server 端。這個方法是異步執行的,同樣的,
也可以同時連接多個服務端。
_______________________________________________________________________________
(4.)IoSession:
這個接口用於表示Server 端與Client 端的連接,IoAcceptor.accept()的時候返回實例。
這個接口有如下常用的方法:
A. WriteFuture write(Object message):
這個方法用於寫數據,該操作是異步的。
B. CloseFuture close(boolean immediately):
這個方法用於關閉IoSession,該操作也是異步的,參數指定true 表示立即關閉,否則就
在所有的寫操作都flush 之后再關閉。
C. Object setAttribute(Object key,Object value):
這個方法用於給我們向會話中添加一些屬性,這樣可以在會話過程中都可以使用,類似於
HttpSession 的setAttrbute()方法。IoSession 內部使用同步的HashMap 存儲你添加的自
定義屬性。
D. SocketAddress getRemoteAddress():
這個方法獲取遠端連接的套接字地址。
E. void suspendWrite():
這個方法用於掛起寫操作,那么有void resumeWrite()方法與之配對。對於read()方法同
樣適用。
F. ReadFuture read():
這個方法用於讀取數據, 但默認是不能使用的, 你需要調用IoSessionConfig 的
setUseReadOperation(true)才可以使用這個異步讀取的方法。一般我們不會用到這個方法,
因為這個方法的內部實現是將數據保存到一個BlockingQueue,假如是Server 端,因為大
量的Client 端發送的數據在Server 端都這么讀取,那么可能會導致內存泄漏,但對於
Client,可能有的時候會比較便利。
G. IoService getService():
這個方法返回與當前會話對象關聯的IoService 實例。
關於TCP連接的關閉:
無論在客戶端還是服務端,IoSession 都用於表示底層的一個TCP 連接,那么你會發現無論
是Server 端還是Client 端的IoSession 調用close()方法之后,TCP 連接雖然顯示關閉, 但
主線程仍然在運行,也就是JVM 並未退出,這是因為IoSession 的close()僅僅是關閉了TCP
的連接通道,並沒有關閉Server 端、Client 端的程序。你需要調用IoService 的dispose()
方法停止Server 端、Client 端。
_______________________________________________________________________________
(5.)IoSessionConfig:
這個方法用於指定此次會話的配置,它有如下常用的方法:
A. void setReadBufferSize(int size):
這個方法設置讀取緩沖的字節數,但一般不需要調用這個方法,因為IoProcessor 會自動調
整緩沖的大小。你可以調用setMinReadBufferSize()、setMaxReadBufferSize()方法,這
樣無論IoProcessor 無論如何自動調整,都會在你指定的區間。
B. void setIdleTime(IdleStatus status,int idleTime):
這個方法設置關聯在通道上的讀、寫或者是讀寫事件在指定時間內未發生,該通道就進入空
閑狀態。一旦調用這個方法,則每隔idleTime 都會回調過濾器、IoHandler 中的sessionIdle()
方法。
C. void setWriteTimeout(int time):
這個方法設置寫操作的超時時間。
D. void setUseReadOperation(boolean useReadOperation):
這個方法設置IoSession 的read()方法是否可用,默認是false。
_______________________________________________________________________________
(6.)IoHandler:
這個接口是你編寫業務邏輯的地方,從上面的示例代碼可以看出,讀取數據、發送數據基本
都在這個接口總完成,這個實例是綁定到IoService 上的,有且只有一個實例(沒有給一個
IoService 注入一個IoHandler 實例會拋出異常)。它有如下幾個方法:
A. void sessionCreated(IoSession session):
這個方法當一個Session 對象被創建的時候被調用。對於TCP 連接來說,連接被接受的時候
調用,但要注意此時TCP 連接並未建立,此方法僅代表字面含義,也就是連接的對象
IoSession 被創建完畢的時候,回調這個方法。
對於UDP 來說,當有數據包收到的時候回調這個方法,因為UDP 是無連接的。
B. void sessionOpened(IoSession session):
這個方法在連接被打開時調用,它總是在sessionCreated()方法之后被調用。對於TCP 來
說,它是在連接被建立之后調用,你可以在這里執行一些認證操作、發送數據等。
對於UDP 來說,這個方法與sessionCreated()沒什么區別,但是緊跟其后執行。如果你每
隔一段時間,發送一些數據,那么sessionCreated()方法只會在第一次調用,但是
sessionOpened()方法每次都會調用。
C. void sessionClosed(IoSession session) :
對於TCP 來說,連接被關閉時,調用這個方法。
對於UDP 來說,IoSession 的close()方法被調用時才會毀掉這個方法。
D. void sessionIdle(IoSession session, IdleStatus status) :
這個方法在IoSession 的通道進入空閑狀態時調用,對於UDP 協議來說,這個方法始終不會
被調用。
E. void exceptionCaught(IoSession session, Throwable cause) :
這個方法在你的程序、Mina 自身出現異常時回調,一般這里是關閉IoSession。
F. void messageReceived(IoSession session, Object message) :
接收到消息時調用的方法,也就是用於接收消息的方法,一般情況下,message 是一個
IoBuffer 類,如果你使用了協議編解碼器,那么可以強制轉換為你需要的類型。通常我們
都是會使用協議編解碼器的, 就像上面的例子, 因為協議編解碼器是
TextLineCodecFactory,所以我們可以強制轉message 為String 類型。
G. void messageSent(IoSession session, Object message) :
當發送消息成功時調用這個方法,注意這里的措辭,發送成功之后,也就是說發送消息是不
能用這個方法的。
發送消息的時機:
發送消息應該在sessionOpened()、messageReceived()方法中調用IoSession.write()方法
完成。因為在sessionOpened()方法中,TCP 連接已經真正打開,同樣的在messageReceived()
方法TCP 連接也是打開狀態,只不過兩者的時機不同。sessionOpened()方法是在TCP 連接
建立之后,接收到數據之前發送;messageReceived()方法是在接收到數據之后發送,你可
以完成依據收到的內容是什么樣子,決定發送什么樣的數據。
因為這個接口中的方法太多,因此通常使用適配器模式的IoHandlerAdapter,覆蓋你所感
興趣的方法即可。
_______________________________________________________________________________
(7.)IoBuffer:
這個接口是對JAVA NIO 的ByteBuffer 的封裝,這主要是因為ByteBuffer 只提供了對基本
數據類型的讀寫操作,沒有提供對字符串等對象類型的讀寫方法,使用起來更為方便,另外,
ByteBuffer 是定長的,如果想要可變,將很麻煩。IoBuffer 的可變長度的實現類似於
StringBuffer。IoBuffer 與ByteBuffer 一樣,都是非線程安全的。本節的一些內容如果不
清楚,可以參考java.nio.ByteBuffer 接口。
這個接口有如下常用的方法:
A. static IoBuffer allocate(int capacity,boolean useDirectBuffer):
這個方法內部通過SimpleBufferAllocator 創建一個實例,第一個參數指定初始化容量,第
二個參數指定使用直接緩沖區還是JAVA 內存堆的緩存區,默認為false。
B. void free():
釋放緩沖區,以便被一些IoBufferAllocator 的實現重用,一般沒有必要調用這個方法,除
非你想提升性能(但可能未必效果明顯)。
C. IoBuffer setAutoExpand(boolean autoExpand):
這個方法設置IoBuffer 為自動擴展容量,也就是前面所說的長度可變,那么可以看出長度
可變這個特性默認是不開啟的。
D. IoBuffer setAutoShrink(boolean autoShrink):
這個方法設置IoBuffer 為自動收縮,這樣在compact()方法調用之后,可以裁減掉一些沒
有使用的空間。如果這個方法沒有被調用或者設置為false,你也可以通過調用shrink()
方法手動收縮空間。
E. IoBuffer order(ByteOrder bo):
這個方法設置是Big Endian 還是Little Endian,JAVA 中默認是Big Endian,C++和其他
語言一般是Little Endian。
F. IoBuffer asReadOnlyBuffer():
這個方法設置IoBuffer 為只讀的。
G. Boolean prefixedDataAvailable(int prefixLength,int maxDataLength):
這個方法用於數據的最開始的1、2、4 個字節表示的是數據的長度的情況,prefixLentgh
表示這段數據的前幾個字節(只能是1、2、4 的其中一個)的代表的是這段數據的長度,
maxDataLength 表示最多要讀取的字節數。返回結果依賴於等式
remaining()-prefixLength>=maxDataLength,也就是總的數據-表示長度的字節,剩下的字
節數要比打算讀取的字節數大或者相等。
H. String getPrefixedString(int prefixLength,CharsetDecoder decoder):
如果上面的方法返回true,那么這個方法將開始讀取表示長度的字節之后的數據,注意要
保持這兩個方法的prefixLength 的值是一樣的。
G、H 兩個方法在后面講到的PrefixedStringDecoder 中的內部實現使用。
IoBuffer 剩余的方法與ByteBuffer 都是差不多的,額外增加了一些便利的操作方法,例如:
IoBuffer putString(String value,CharsetEncoder encoder)可以方便的以指定的編碼方
式存儲字符串、InputStream asInputStream()方法從IoBuffer 剩余的未讀的數據中轉為
輸入流等。
_______________________________________________________________________________
(8.)IoFuture:
在Mina 的很多操作中,你會看到返回值是XXXFuture,實際上他們都是IoFuture 的子類,
看到這樣的返回值,這個方法就說明是異步執行的,主要的子類有ConnectFuture、
CloseFuture 、ReadFuture 、WriteFuture 。這個接口的大部分操作都和
java.util.concurrent.Future 接口是類似的,譬如:await()、awaitUninterruptibly()
等,一般我們常用awaitUninterruptibly()方法可以等待異步執行的結果返回。
這個接口有如下常用的方法:
A. IoFuture addListener(IoFutureListener<?> listener):
這個方法用於添加一個監聽器, 在異步執行的結果返回時監聽器中的回調方法
operationComplete(IoFuture future),也就是說,這是替代awaitUninterruptibly()方
法另一種等待異步執行結果的方法,它的好處是不會產生阻塞。
B. IoFuture removeListener(IoFutureListener<?> listener):
這個方法用於移除指定的監聽器。
C. IoSession getSession():
這個方法返回當前的IoSession。
舉個例子,我們在客戶端調用connect()方法訪問Server 端的時候,實際上這就是一個異
步執行的方法,也就是調用connect()方法之后立即返回,執行下面的代碼,而不管是否連
接成功。那么如果我想在連接成功之后執行一些事情(譬如:獲取連接成功后的IoSession
對象),該怎么辦呢?按照上面的說明,你有如下兩種辦法:
第一種:
ConnectFuture future = connector.connect(new InetSocketAddress(
HOSTNAME, PORT));
// 等待是否連接成功,相當於是轉異步執行為同步執行。
future.awaitUninterruptibly();
// 連接成功后獲取會話對象。如果沒有上面的等待,由於connect()方法是異步的,session
可能會無法獲取。
session = future.getSession();
第二種:
ConnectFuture future = connector.connect(new InetSocketAddress(
HOSTNAME, PORT));
future.addListener(new IoFutureListener<ConnectFuture>() {
@Override
public void operationComplete(ConnectFuture future) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
IoSession session = future.getSession();
System.out.println("++++++++++++++++++++++++++++");
}
});
System.out.println("*************");
為了更好的看清楚使用監聽器是異步的,而不是像awaitUninterruptibly()那樣會阻塞主
線程的執行,我們在回調方法中暫停5 秒鍾,然后輸出+++,在最后輸出***。我們執行代碼
之后,你會發現首先輸出***(這證明了監聽器是異步執行的),然后IoSession 對象Created,
系統暫停5 秒,然后輸出+++,最后IoSession 對象Opened,也就是TCP 連接建立。
_______________________________________________________________________________
4.日志配置:
前面的示例代碼中提到了使用SLF4J 作為日志門面,這是因為Mina 內部使用的就是SLF4J,
你也使用SLF4J 可以與之保持一致性。
Mina 如果想啟用日志跟蹤Mina 的運行細節,你可以配置LoggingFilter 過濾器,這樣你可
以看到Session 建立、打開、空閑等一系列細節在日志中輸出,默認SJF4J 是按照DEBUG
級別輸出跟蹤信息的,如果你想給某一類別的Mina 運行信息輸出指定日志輸出級別,可以
調用LoggingFilter 的setXXXLogLevel(LogLevel.XXX)。
例:
LoggingFilter lf = new LoggingFilter();
lf.setSessionOpenedLogLevel(LogLevel.ERROR);
acceptor.getFilterChain().addLast("logger", lf);
這里IoSession 被打開的跟蹤信息將以ERROR 級別輸出到日志。
_______________________________________________________________________________
5.過濾器:
前面我們看到了LoggingFilter、ProtocolCodecFilter 兩個過濾器,一個負責日志輸出,
一個負責數據的編解碼,通過最前面的Mina 執行流程圖,在IoProcessor 與IoHandler 之
間可以有很多的過濾器,這種設計方式為你提供可插拔似的擴展功能提供了非常便利的方
式,目前的Apache CXF、Apache Struts2 中的攔截器也都是一樣的設計思路。
Mina 中的IoFilter 是單例的,這與CXF、Apache Struts2 沒什么區別。
IoService 實例上會綁定一個DefaultIoFilterChainBuilder 實例,
DefaultIoFilterChainBuilder 會把使用內部的EntryImpl 類把所有的過濾器按照順序連在
一起,組成一個過濾器鏈。
DefaultIoFilterChainBuilder 類如下常用的方法:
A. void addFirst(String name,IoFilter filter):
這個方法把過濾器添加到過濾器鏈的頭部,頭部就是IoProcessor 之后的第一個過濾器。同
樣的addLast()方法把過濾器添加到過濾器鏈的尾部。
B. void addBefore(String baseName,String name,IoFilter filter):
這個方法將過濾器添加到baseName 指定的過濾器的前面,同樣的addAfter()方法把過濾器
添加到baseName 指定的過濾器的后面。這里要注意無論是那種添加方法,每個過濾器的名
字(參數name)必須是唯一的。
C. IoFilter remove(Stirng name):
這個方法移除指定名稱的過濾器,你也可以調用另一個重載的remove()方法,指定要移除
的IoFilter 的類型。
D. List<Entry> getAll():
這個方法返回當前IoService 上注冊的所有過濾器。
默認情況下,過濾器鏈中是空的,也就是getAll()方法返回長度為0 的List,但實際Mina
內部有兩個隱藏的過濾器:HeadFilter、TailFilter,分別在List 的最開始和最末端,很
明顯,TailFilter 在最末端是為了調用過濾器鏈之后,調用IoHandler。但這兩個過濾器對
你來說是透明的,可以忽略它們的存在。
編寫一個過濾器很簡單,你需要實現IoFilter 接口,如果你只關注某幾個方法,可以繼承
IoFilterAdapter 適配器類。IoFilter 接口中主要包含兩類方法,一類是與IoHandler 中的
方法名一致的方法,相當於攔截IoHandler 中的方法,另一類是IoFilter 的生命周期回調
方法,這些回調方法的執行順序和解釋如下所示:
(1.)init()在首次添加到鏈中的時候被調用,但你必須將這個IoFilter 用
ReferenceCountingFilter 包裝起來,否則init()方法永遠不會被調用。
(2.)onPreAdd()在調用添加到鏈中的方法時被調用,但此時還未真正的加入到鏈。
(3.)onPostAdd()在調用添加到鏈中的方法后被調,如果在這個方法中有異常拋出,則過濾
器會立即被移除,同時destroy()方法也會被調用(前提是使用ReferenceCountingFilter
包裝)。
(4.)onPreRemove()在從鏈中移除之前調用。
(5.)onPostRemove()在從鏈中移除之后調用。
(6.)destory()在從鏈中移除時被調用,使用方法與init()要求相同。
無論是哪個方法,要注意必須在實現時調用參數nextFilter 的同名方法,否則,過濾器鏈
的執行將被中斷,IoHandler 中的同名方法一樣也不會被執行,這就相當於Servlet 中的
Filter 必須調用filterChain.doFilter(request,response)才能繼續前進是一樣的道理。
示例:
1 public class MyIoFilter implements IoFilter { 2 @Override 3 public void destroy() throws Exception { 4 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy"); 5 } 6 @Override 7 public void exceptionCaught(NextFilter nextFilter, IoSession 8 session, 9 Throwable cause) throws Exception { 10 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught"); 11 nextFilter.exceptionCaught(session, cause); 12 } 13 @Override 14 public void filterClose(NextFilter nextFilter, IoSession session) 15 throws Exception { 16 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose"); 17 nextFilter.filterClose(session); 18 } 19 @Override 20 public void filterWrite(NextFilter nextFilter, IoSession session, 21 WriteRequest writeRequest) throws Exception { 22 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite"); 23 nextFilter.filterWrite(session, writeRequest); 24 } 25 @Override 26 27 public void init() throws Exception { 28 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init"); 29 } 30 31 @Override 32 33 public void messageReceived(NextFilter nextFilter, IoSession 34 session, 35 Object message) throws Exception { 36 37 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived"); 38 nextFilter.messageReceived(session, message); 39 } 40 41 @Override 42 43 public void messageSent(NextFilter nextFilter, IoSession session, 44 45 WriteRequest writeRequest) throws Exception { 46 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent"); 47 nextFilter.messageSent(session, writeRequest); 48 49 } 50 51 @Override 52 53 public void onPostAdd(IoFilterChain parent, String name, 54 NextFilter nextFilter) throws Exception { 55 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd"); 56 } 57 58 @Override 59 60 public void onPostRemove(IoFilterChain parent, String name, 61 NextFilter nextFilter) throws Exception { 62 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove"); 63 } 64 65 @Override 66 67 public void onPreAdd(IoFilterChain parent, String name, 68 NextFilter nextFilter) throws Exception { 69 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd"); 70 } 71 72 @Override 73 74 public void onPreRemove(IoFilterChain parent, String name, 75 NextFilter nextFilter) throws Exception { 76 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove"); 77 } 78 79 @Override 80 public void sessionClosed(NextFilter nextFilter, IoSession session) 81 throws Exception { 82 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed"); 83 nextFilter.sessionClosed(session); 84 } 85 @Override 86 public void sessionCreated(NextFilter nextFilter, IoSession session) 87 throws Exception { 88 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated"); 89 nextFilter.sessionCreated(session); 90 } 91 @Override 92 public void sessionIdle(NextFilter nextFilter, IoSession session, 93 IdleStatus status) throws Exception { 94 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle"); 95 nextFilter.sessionIdle(session, status); 96 } 97 @Override 98 public void sessionOpened(NextFilter nextFilter, IoSession session) 99 throws Exception { 100 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened"); 101 nextFilter.sessionOpened(session); 102 } 103 }
我們將這個攔截器注冊到上面的TCPServer 的IoAcceptor 的過濾器鏈中的最后一個:
acceptor.getFilterChain().addLast("myIoFilter",
new ReferenceCountingFilter(new MyIoFilter()));
這里我們將MyIoFilter 用ReferenceCountingFilter 包裝起來,這樣你可以看到init()、
destroy()方法調用。我們啟動客戶端訪問,然后關閉客戶端,你會看到執行順序如下所示:
init onPreAdd onPostAdd sessionCreated sessionOpened messageReceived filt
erClose sessionClosed onPreRemove onPostRemove destroy。
IoHandler 的對應方法會跟在上面的對應方法之后執行,這也就是說從橫向(單獨的看一個
過濾器中的所有方法的執行順序)上看,每個過濾器的執行順序是上面所示的順序;從縱向
(方法鏈的調用)上看,如果有filter1、filter2 兩個過濾器,sessionCreated()方法的
執行順序如下所示:
filter1-sessionCreated filter2-sessionCreated IoHandler-sessionCreated。
這里你要注意init、onPreAdd、onPostAdd 三個方法並不是在Server 啟動時調用的,而是
IoSession 對象創建之前調用的,也就是說IoFilterChain.addXXX()方法僅僅負責初始化過
濾器並注冊過濾器,但並不調用任何方法,包括init()初始化方法也是在IoProcessor 開
始工作的時候被調用。
IoFilter 是單例的,那么init()方法是否只被執行一次呢?這個是不一定的,因為IoFilter
是被IoProcessor 調用的,而每個IoService 通常是關聯多個IoProcessor,所以IoFilter
的init()方法是在每個IoProcessor 線程上只執行一次。關於Mina 的線程問題,我們后面
會詳細討論,這里你只需要清楚,init()與destroy()的調用次數與IoProceesor 的個數有
關,假如一個IoService 關聯了3 個IoProcessor,有五個並發的客戶端請求,那么你會看
到三次init()方法被調用,以后將不再會調用。
Mina中自帶的過濾器:
過濾器 說明
BlacklistFilter 設置一些IP 地址為黑名單,不允許訪問。
BufferedWriteFilter 設置輸出時像BufferedOutputStream 一樣進行緩
沖。
CompressionFilter 設置在輸入、輸出流時啟用JZlib 壓縮。
ConnectionThrottleFilter 這個過濾器指定同一個IP 地址(不含端口號)上
的請求在多長的毫秒值內可以有一個請求,如果
小於指定的時間間隔就有連續兩個請求,那么第
二個請求將被忽略(IoSession.close())。正如
Throttle 的名字一樣,調節訪問的頻率。這個過
濾器最好放在過濾器鏈的前面。
FileRegionWriteFilter 如果你想使用File 對象進行輸出,請使用這個過
濾器。要注意,你需要使用WriteFuture 或者在
messageSent() 方法中關閉File 所關聯的
FileChannel 通道。
StreamWriteFilter 如果你想使用InputStream 對象進行輸出,請使
用這個過濾器。要注意,你需要使用WriteFuture
或者在messageSent()方法中關閉File 所關聯的
FileChannel 通道。
NoopFilter 這個過濾器什么也不做,如果你想測試過濾器鏈
是否起作用,可以用它來測試。
ProfilerTimerFilter 這個過濾器用於檢測每個事件方法執行的時間,
所以最好放在過濾器鏈的前面。
ProxyFilter 這個過濾器在客戶端使用ProxyConnector 作為實
現時,會自動加入到過濾器鏈中,用於完成代理
功能。
RequestResponseFilter 暫不知曉。
SessionAttributeInitializingFilter 這個過濾器在IoSession 中放入一些屬性(Map),
通常放在過濾器的前面,用於放置一些初始化的
信息。
MdcInjectionFilter 針對日志輸出做MDC 操作,可以參考LOG4J 的MDC、
NDC 的文檔。
WriteRequestFilter CompressionFilter、RequestResponseFilter 的
基類,用於包裝寫請求的過濾器。
還有一些過濾器,會在各節中詳細討論,這里沒有列出,譬如:前面的LoggingFilger 日志
過濾器。
_______________________________________________________________________________
6.協議編解碼器:
前面說過,協議編解碼器是在使用Mina 的時候你最需要關注的對象,因為在網絡傳輸的數
據都是二進制數據(byte),而你在程序中面向的是JAVA 對象,這就需要你實現在發送數據
時將JAVA 對象編碼二進制數據,而接收數據時將二進制數據解碼為JAVA 對象(這個可不是
JAVA 對象的序列化、反序列化那么簡單的事情)。
Mina 中的協議編解碼器通過過濾器ProtocolCodecFilter 構造,這個過濾器的構造方法需
要一個ProtocolCodecFactory,這從前面注冊TextLineCodecFactory 的代碼就可以看出來。
ProtocolCodecFactory 中有如下兩個方法:
public interface ProtocolCodecFactory {
ProtocolEncoder getEncoder(IoSession session) throws Exception;
ProtocolDecoder getDecoder(IoSession session) throws Exception;
}
因此,構建一個ProtocolCodecFactory 需要ProtocolEncoder、ProtocolDecoder 兩個實例。
你可能要問JAVA 對象和二進制數據之間如何轉換呢?這個要依據具體的通信協議,也就是
Server 端要和Client 端約定網絡傳輸的數據是什么樣的格式,譬如:第一個字節表示數據
長度,第二個字節是數據類型,后面的就是真正的數據(有可能是文字、有可能是圖片等等),
然后你可以依據長度從第三個字節向后讀,直到讀取到指定第一個字節指定長度的數據。
簡單的說,HTTP 協議就是一種瀏覽器與Web 服務器之間約定好的通信協議,雙方按照指定
的協議編解碼數據。我們再直觀一點兒說,前面一直使用的TextLine 編解碼器就是在讀取
網絡上傳遞過來的數據時,只要發現哪個字節里存放的是ASCII 的10、13 字符(\r、\n),
就認為之前的字節就是一個字符串(默認使用UTF-8 編碼)。
以上所說的就是各種協議實際上就是網絡七層結構中的應用層協議,它位於網絡層(IP)、
傳輸層(TCP)之上,Mina 的協議編解碼器就是讓你實現一套自己的應用層協議棧。
_______________________________________________________________________________
(6-1.)簡單的編解碼器示例:
下面我們舉一個模擬電信運營商短信協議的編解碼器實現,假設通信協議如下所示:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
這里的第一行表示狀態行,一般表示協議的名字、版本號等,第二行表示短信的發送號碼,
第三行表示短信接收的號碼,第四行表示短信的字節數,最后的內容就是短信的內容。
上面的每一行的末尾使用ASC II 的10(\n)作為換行符,因為這是純文本數據,協議要
求雙方使用UTF-8 對字符串編解碼。
實際上如果你熟悉HTTP 協議,上面的這個精簡的短信協議和HTTP 協議的組成是非常像
的,第一行是狀態行,中間的是消息報頭,最后面的是消息正文。
在解析這個短信協議之前,你需要知曉TCP 的一個事項,那就是數據的發送沒有規模性,
所謂的規模性就是作為數據的接收端,不知道到底什么時候數據算是讀取完畢,所以應用層
協議在制定的時候,必須指定數據讀取的截至點。一般來說,有如下三種方式設置數據讀取
的長度:
(1.)使用分隔符,譬如:TextLine 編解碼器。你可以使用\r、\n、NUL 這些ASC II 中的
特殊的字符來告訴數據接收端,你只要遇見分隔符,就表示數據讀完了,不用在那里傻等着
不知道還有沒有數據沒讀完啊?我可不可以開始把已經讀取到的字節解碼為指定的數據類
型了啊?
(2.)定長的字節數,這種方式是使用長度固定的數據發送,一般適用於指令發送,譬如:數
據發送端規定發送的數據都是雙字節,AA 表示啟動、BB 表示關閉等等。
(3.)在數據中的某個位置使用一個長度域,表示數據的長度,這種處理方式最為靈活,上面
的短信協議中的那個L 就是短信文字的字節數,其實HTTP 協議的消息報頭中的
Content-Length 也是表示消息正文的長度,這樣數據的接收端就知道我到底讀到多長的
字節數就表示不用再讀取數據了。
相比較解碼(字節轉為JAVA 對象,也叫做拆包)來說,編碼(JAVA 對象轉為字節,也叫做
打包)就很簡單了,你只需要把JAVA 對象轉為指定格式的字節流,write()就可以了。
下面我們開始對上面的短信協議進行編解碼處理。
第一步,協議對象:
1 public class SmsObject { 2 private String sender;// 短信發送者 3 private String receiver;// 短信接受者 4 private String message;// 短信內容 5 public String getSender() { 6 return sender; 7 } 8 public void setSender(String sender) { 9 this.sender = sender; 10 } 11 public String getReceiver() { 12 return receiver; 13 } 14 public void setReceiver(String receiver) { 15 this.receiver = receiver; 16 } 17 public String getMessage() { 18 return message; 19 } 20 public void setMessage(String message) { 21 this.message = message; 22 } 23 }
第二步,編碼器:
在Mina 中編寫編碼器可以實現ProtocolEncoder,其中有encode()、dispose()兩個方法需
要實現。這里的dispose()方法用於在銷毀編碼器時釋放關聯的資源,由於這個方法一般我
們並不關心,所以通常我們直接繼承適配器ProtocolEncoderAdapter。
1 public class CmccSipcEncoder extends ProtocolEncoderAdapter { 2 private final Charset charset; 3 public CmccSipcEncoder(Charset charset) { 4 this.charset = charset; 5 } 6 @Override 7 public void encode(IoSession session, Object message, 8 ProtocolEncoderOutput out) throws Exception { 9 SmsObject sms = (SmsObject) message; 10 CharsetEncoder ce = charset.newEncoder(); 11 IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); 12 String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0"; 13 String sender = sms.getSender(); 14 String receiver = sms.getReceiver(); 15 String smsContent = sms.getMessage(); 16 buffer.putString(statusLine + '\n', ce); 17 buffer.putString("S: " + sender + '\n', ce); 18 buffer.putString("R: " + receiver + '\n', ce); 19 buffer 20 .putString("L: " + (smsContent.getBytes(charset).length) 21 + "\n", 22 ce); 23 buffer.putString(smsContent, ce); 24 buffer.flip(); 25 out.write(buffer); 26 } 27 }
這里我們依據傳入的字符集類型對message 對象進行編碼,編碼的方式就是按照短信協議拼
裝字符串到IoBuffer 緩沖區,然后調用ProtocolEncoderOutput 的write()方法輸出字節
流。這里要注意生成短信內容長度時的紅色代碼,我們使用String 類與Byte[]類型之間的
轉換方法獲得轉為字節流后的字節數。
解碼器的編寫有以下幾個步驟:
A. 將 encode()方法中的message 對象強制轉換為指定的對象類型;
B. 創建IoBuffer 緩沖區對象,並設置為自動擴展;
C. 將轉換后的message 對象中的各個部分按照指定的應用層協議進行組裝,並put()到
IoBuffer 緩沖區;
D. 當你組裝數據完畢之后,調用flip()方法,為輸出做好准備,切記在write()方法之前,
要調用IoBuffer 的flip()方法,否則緩沖區的position 的后面是沒有數據可以用來輸
出的,你必須調用flip()方法將position 移至0,limit 移至剛才的position。這個
flip()方法的含義請參看java.nio.ByteBuffer。
E. 最后調用ProtocolEncoderOutput 的write()方法輸出IoBuffer 緩沖區實例。
第三步,解碼器:
在Mina 中編寫解碼器,可以實現ProtocolDecoder 接口,其中有decode()、finishDecode()、
dispose()三個方法。這里的finishDecode()方法可以用於處理在IoSession 關閉時剩余的
未讀取數據,一般這個方法並不會被使用到,除非協議中未定義任何標識數據什么時候截止
的約定,譬如:Http 響應的Content-Length 未設定,那么在你認為讀取完數據后,關閉TCP
連接(IoSession 的關閉)后,就可以調用這個方法處理剩余的數據,當然你也可以忽略調
剩余的數據。同樣的,一般情況下,我們只需要繼承適配器ProtocolDecoderAdapter,關
注decode()方法即可。
但前面說過解碼器相對編碼器來說,最麻煩的是數據發送過來的規模,以聊天室為例,一個
TCP 連接建立之后,那么隔一段時間就會有聊天內容發送過來,也就是decode()方法會被往
復調用,這樣處理起來就會非常麻煩。那么Mina 中幸好提供了CumulativeProtocolDecoder
類,從名字上可以看出累積性的協議解碼器,也就是說只要有數據發送過來,這個類就會去
讀取數據,然后累積到內部的IoBuffer 緩沖區,但是具體的拆包(把累積到緩沖區的數據
解碼為JAVA 對象)交由子類的doDecode()方法完成,實際上CumulativeProtocolDecoder
就是在decode()反復的調用暴漏給子類實現的doDecode()方法。
具體執行過程如下所示:
A. 你的doDecode()方法返回true 時,CumulativeProtocolDecoder 的decode()方法會首
先判斷你是否在doDecode()方法中從內部的IoBuffer 緩沖區讀取了數據,如果沒有,
則會拋出非法的狀態異常,也就是你的doDecode()方法返回true 就表示你已經消費了
本次數據(相當於聊天室中一個完整的消息已經讀取完畢),進一步說,也就是此時你
必須已經消費過內部的IoBuffer 緩沖區的數據(哪怕是消費了一個字節的數據)。如果
驗證過通過,那么CumulativeProtocolDecoder 會檢查緩沖區內是否還有數據未讀取,
如果有就繼續調用doDecode()方法,沒有就停止對doDecode()方法的調用,直到有新
的數據被緩沖。
B. 當你的doDecode()方法返回false 時,CumulativeProtocolDecoder 會停止對doDecode()
方法的調用,但此時如果本次數據還有未讀取完的,就將含有剩余數據的IoBuffer 緩
沖區保存到IoSession 中,以便下一次數據到來時可以從IoSession 中提取合並。如果
發現本次數據全都讀取完畢,則清空IoBuffer 緩沖區。
簡而言之,當你認為讀取到的數據已經夠解碼了,那么就返回true,否則就返回false。這
個CumulativeProtocolDecoder 其實最重要的工作就是幫你完成了數據的累積,因為這個工
作是很煩瑣的。
1 public class CmccSipcDecoder extends CumulativeProtocolDecoder { 2 private final Charset charset; 3 public CmccSipcDecoder(Charset charset) { 4 this.charset = charset; 5 } 6 @Override 7 protected boolean doDecode(IoSession session, IoBuffer in, 8 ProtocolDecoderOutput out) throws Exception { 9 IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); 10 CharsetDecoder cd = charset.newDecoder(); 11 int matchCount = 0; 12 String statusLine = "", sender = "", receiver = "", length = "", 13 sms = ""; 14 int i = 1; 15 while (in.hasRemaining()) { 16 byte b = in.get(); 17 buffer.put(b); 18 if (b == 10 && i < 5) { 19 matchCount++; 20 if (i == 1) { 21 buffer.flip(); 22 statusLine = buffer.getString(matchCount, cd); 23 statusLine = statusLine.substring(0, 24 statusLine.length() - 1); 25 matchCount = 0; 26 buffer.clear(); 27 } 28 if (i == 2) { 29 buffer.flip(); 30 sender = buffer.getString(matchCount, cd); 31 sender = sender.substring(0, sender.length() -1); 32 matchCount = 0; 33 buffer.clear(); 34 35 } 36 37 if (i == 3) { 38 buffer.flip(); 39 receiver = buffer.getString(matchCount, cd); 40 receiver = receiver.substring(0, receiver.length() 41 42 1); 43 matchCount = 0; 44 buffer.clear(); 45 46 } 47 48 if (i == 4) { 49 buffer.flip(); 50 length = buffer.getString(matchCount, cd); 51 length = length.substring(0, length.length() -1); 52 matchCount = 0; 53 buffer.clear(); 54 55 } 56 i++; 57 58 } else if (i == 5) { 59 matchCount++; 60 if (matchCount == Long.parseLong(length.split(": ")[1])) 61 62 { 63 buffer.flip(); 64 sms = buffer.getString(matchCount, cd); 65 i++; 66 break; 67 68 } 69 } else { 70 matchCount++; 71 72 } 73 } 74 SmsObject smsObject = new SmsObject(); 75 smsObject.setSender(sender.split(": ")[1]); 76 smsObject.setReceiver(receiver.split(": ")[1]); 77 smsObject.setMessage(sms); 78 out.write(smsObject); 79 return false; 80 81 } 82 }
我們的這個短信協議解碼器使用\n(ASCII 的10 字符)作為分解點,一個字節一個字節的
讀取,那么第一次發現\n 的字節位置之前的部分,必然就是短信協議的狀態行,依次類推,
你就可以解析出來發送者、接受者、短信內容長度。然后我們在解析短信內容時,使用獲取
到的長度進行讀取。全部讀取完畢之后, 然后構造SmsObject 短信對象, 使用
ProtocolDecoderOutput 的write()方法輸出,最后返回false,也就是本次數據全部讀取
完畢,告知CumulativeProtocolDecoder 在本次數據讀取中不需要再調用doDecode()方法
了。
這里需要注意的是兩個狀態變量i、matchCount,i 用於記錄解析到了短信協議中的哪一行
(\n),matchCount 記錄在當前行中讀取到了哪一個字節。狀態變量在解碼器中經常被使用,
我們這里的情況比較簡單,因為我們假定短信發送是在一次數據發送中完成的,所以狀態變
量的使用也比較簡單。假如數據的發送被拆成了多次(譬如:短信協議的短信內容、消息報
頭被拆成了兩次數據發送),那么上面的代碼勢必就會存在問題,因為當第二次調用
doDecode()方法時,狀態變量i、matchCount 勢必會被重置,也就是原來的狀態值並沒有被
保存。那么我們如何解決狀態保存的問題呢?
答案就是將狀態變量保存在IoSession 中或者是Decoder 實例自身,但推薦使用前者,因為
雖然Decoder 是單例的,其中的實例變量保存的狀態在Decoder 實例銷毀前始終保持,但
Mina 並不保證每次調用doDecode()方法時都是同一個線程(這也就是說第一次調用
doDecode()是IoProcessor-1 線程,第二次有可能就是IoProcessor-2 線程),這就會產生
多線程中的實例變量的可視性(Visibility,具體請參考JAVA 的多線程知識)問題。IoSession
中使用一個同步的HashMap 保存對象,所以你不需要擔心多線程帶來的問題。
使用IoSession 保存解碼器的狀態變量通常的寫法如下所示:
A. 在解碼器中定義私有的內部類Context,然后將需要保存的狀態變量定義在Context 中
存儲。
B. 在解碼器中定義方法獲取這個Context 的實例,這個方法的實現要優先從IoSession 中
獲取Context。
具體代碼示例如下所示:
// 上下文作為保存狀態的內部類的名字,意思很明顯,就是讓狀態跟隨上下文,在整個調
用過程中都可以被保持。
1 public class XXXDecoder extends CumulativeProtocolDecoder{ 2 private final AttributeKey CONTEXT = 3 new AttributeKey(getClass(), "context" ); 4 public Context getContext(IoSession session){ 5 Context ctx=(Context)session.getAttribute(CONTEXT); 6 if(ctx==null){ 7 ctx=new Context(); 8 session.setAttribute(CONTEXT,ctx); 9 } 10 } 11 private class Context { 12 //狀態變量 13 } 14 }
注意這里我們使用了Mina 自帶的AttributeKey 類來定義保存在IoSession 中的對象的鍵
值,這樣可以有效的防止鍵值重復。另外,要注意在全部處理完畢之后,狀態要復位,譬如:
聊天室中的一條消息讀取完畢之后,狀態變量要變為初始值,以便下次處理時重新使用。
第四步,編解碼工廠:
1 public class CmccSipcCodecFactory implements ProtocolCodecFactory { 2 private final CmccSipcEncoder encoder; 3 private final CmccSipcDecoder decoder; 4 public CmccSipcCodecFactory() { 5 this(Charset.defaultCharset()); 6 } 7 public CmccSipcCodecFactory(Charset charSet) { 8 this.encoder = new CmccSipcEncoder(charSet); 9 this.decoder = new CmccSipcDecoder(charSet); 10 } 11 @Override 12 public ProtocolDecoder getDecoder(IoSession session) throws 13 Exception { 14 return decoder; 15 } 16 @Override 17 public ProtocolEncoder getEncoder(IoSession session) throws 18 Exception { 19 return encoder; 20 } 21 }
實際上這個工廠類就是包裝了編碼器、解碼器,通過接口中的getEncoder()、getDecoder()
方法向ProtocolCodecFilter 過濾器返回編解碼器實例,以便在過濾器中對數據進行編解碼
處理。
第五步,運行示例:
下面我們修改最一開始的示例中的MyServer、MyClient 的代碼,如下所示:
1 acceptor.getFilterChain().addLast( 2 "codec", 3 new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset 4 .forName("UTF-8")))); 5 connector.getFilterChain().addLast( 6 "codec", 7 new ProtocolCodecFilter(new 8 CmccSipcCodecFactory( 9 Charset.forName("UTF-8")))); 10 然后我們在ClientHandler 中發送一條短信: 11 public void sessionOpened(IoSession session) { 12 SmsObject sms = new SmsObject(); 13 sms.setSender("15801012253"); 14 sms.setReceiver("18869693235"); 15 sms.setMessage("你好!Hello World!"); 16 session.write(sms); 17 } 18 最后我們在MyIoHandler 中接收這條短信息: 19 public void messageReceived(IoSession session, Object message) 20 throws Exception { 21 SmsObject sms = (SmsObject) message; 22 log.info("The message received is [" + sms.getMessage() + "]"); 23 } 24 你會看到Server 端的控制台輸出如下信息: 25 The message received is [你好!Hello World!]
(6-2.)復雜的解碼器:
下面我們講解一下如何在解碼器中保存狀態變量,也就是真正的實現上面所說的Context。
我們假設這樣一種情況,有兩條短信:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
他們按照上面的顏色標識發送,也就是說紅色部分、藍色部分、綠色部分分別發送(調用三
次IoSession.write()方法),那么如果你還用上面的CmccSipcDecoder,將無法工作,因為
第一次數據流(紅色部分)發送過取時,數據是不完整的,無法解析出一條短信息,當二次
數據流(藍色部分)發送過去時,已經可以解析出第一條短信息了,但是第二條短信還是不
完整的,需要等待第三次數據流(綠色部分)的發送。
注意:由於模擬數據發送的規模性問題很麻煩,所以這里采用了這種極端的例子說明問題,
雖不具有典型性,但很能說明問題,這就足夠了,所以不要追究這種發送消息是否在真實環
境中存在,更不要追究其合理性。
CmccSispcDecoder 類改為如下的寫法:
1 public class CmccSipcDecoder extends CumulativeProtocolDecoder { 2 private final Charset charset; 3 private final AttributeKey CONTEXT = new AttributeKey(getClass(), 4 "context"); 5 public CmccSipcDecoder(Charset charset) { 6 this.charset = charset; 7 } 8 @Override 9 protected boolean doDecode(IoSession session, IoBuffer in, 10 ProtocolDecoderOutput out) throws Exception { 11 Context ctx = getContext(session); 12 CharsetDecoder cd = charset.newDecoder(); 13 int matchCount = ctx.getMatchCount(); 14 int line = ctx.getLine(); 15 IoBuffer buffer = ctx.innerBuffer; 16 String statusLine = ctx.getStatusLine(), 17 sender = ctx.getSender(), 18 receiver = ctx.getReceiver(), 19 length = ctx.getLength(), 20 sms = ctx.getSms(); 21 while (in.hasRemaining()) { 22 byte b = in.get(); 23 matchCount++; 24 buffer.put(b); 25 if (line < 4 && b == 10) { 26 if (line == 0) { 27 buffer.flip(); 28 statusLine = buffer.getString(matchCount, cd); 29 statusLine = statusLine.substring(0, 30 statusLine.length() - 1); 31 matchCount = 0; 32 buffer.clear(); 33 ctx.setStatusLine(statusLine); 34 } 35 if (line == 1) { 36 buffer.flip(); 37 sender = buffer.getString(matchCount, cd); 38 sender = sender.substring(0, sender.length() - 1); 39 matchCount = 0; 40 buffer.clear(); 41 ctx.setSender(sender); 42 } 43 if (line == 2) { 44 buffer.flip(); 45 receiver = buffer.getString(matchCount, cd); 46 receiver = receiver.substring(0, receiver.length() - 47 1); 48 matchCount = 0; 49 buffer.clear(); 50 ctx.setReceiver(receiver); 51 } 52 if (line == 3) { 53 buffer.flip(); 54 length = buffer.getString(matchCount, cd); 55 length = length.substring(0, length.length() - 1); 56 matchCount = 0; 57 buffer.clear(); 58 ctx.setLength(length); 59 } 60 line++; 61 } else if (line == 4) { 62 if (matchCount == Long.parseLong(length.split(": ")[1])) 63 { 64 buffer.flip(); 65 sms = buffer.getString(matchCount, cd); 66 ctx.setSms(sms); 67 // 由於下面的break,這里需要調用else外面的兩行代碼 68 ctx.setMatchCount(matchCount); 69 ctx.setLine(line); 70 break; 71 } 72 } 73 ctx.setMatchCount(matchCount); 74 ctx.setLine(line); 75 } 76 if (ctx.getLine() == 4 77 && Long.parseLong(ctx.getLength().split(": ")[1]) == ctx 78 .getMatchCount()) { 79 SmsObject smsObject = new SmsObject(); 80 smsObject.setSender(sender.split(": ")[1]); 81 smsObject.setReceiver(receiver.split(": ")[1]); 82 smsObject.setMessage(sms); 83 out.write(smsObject); 84 ctx.reset(); 85 return true; 86 87 } else { 88 return false; 89 } 90 } 91 92 private Context getContext(IoSession session) { 93 Context context = (Context) session.getAttribute(CONTEXT); 94 if (context == null){ 95 96 context = new Context(); 97 98 session.setAttribute(CONTEXT, context); 99 } 100 return context; 101 102 } 103 104 private class Context { 105 106 private final IoBuffer innerBuffer; 107 108 private String statusLine = ""; 109 110 private String sender = ""; 111 112 private String receiver = ""; 113 114 private String length = ""; 115 116 private String sms = ""; 117 118 public Context() { 119 innerBuffer = IoBuffer.allocate(100).setAutoExpand(true); 120 } 121 122 private int matchCount = 0; 123 124 private int line = 0; 125 126 public int getMatchCount() { 127 return matchCount; 128 } 129 130 public void setMatchCount(int matchCount) { 131 this.matchCount = matchCount; 132 133 } 134 135 public int getLine() { 136 return line; 137 138 } 139 140 public void setLine(int line) { 141 this.line = line; 142 143 } 144 145 public String getStatusLine() { 146 return statusLine; 147 148 } 149 150 public void setStatusLine(String statusLine) { 151 this.statusLine = statusLine; 152 153 } 154 155 public String getSender() { 156 return sender; 157 158 } 159 160 public void setSender(String sender) { 161 this.sender = sender; 162 163 } 164 165 public String getReceiver() { 166 return receiver; 167 168 } 169 170 public void setReceiver(String receiver) { 171 this.receiver = receiver; 172 173 } 174 175 public String getLength() { 176 return length; 177 178 } 179 180 public void setLength(String length) { 181 this.length = length; 182 183 } 184 185 public String getSms() { 186 return sms; 187 } 188 public void setSms(String sms) { 189 this.sms = sms; 190 } 191 public void reset() { 192 this.innerBuffer.clear(); 193 this.matchCount = 0; 194 this.line = 0; 195 this.statusLine = ""; 196 this.sender = ""; 197 this.receiver = ""; 198 this.length = ""; 199 this.sms = ""; 200 } 201 } 202 }
這里我們做了如下的幾步操作:
(1.) 所有記錄狀態的變量移到了Context 內部類中,包括記錄讀到短信協議的哪一行的
line。每一行讀取了多少個字節的matchCount,還有記錄解析好的狀態行、發送者、
接受者、短信內容、累積數據的innerBuffer 等。這樣就可以在數據不能完全解碼,
等待下一次doDecode()方法的調用時,還能承接上一次調用的數據。
(2.) 在 doDecode()方法中主要的變化是各種狀態變量首先是從Context 中獲取,然后操
作之后,將最新的值setXXX()到Context 中保存。
(3.) 這里注意doDecode()方法最后的判斷,當認為不夠解碼為一條短信息時,返回
false,也就是在本次數據流解碼中不要再調用doDecode()方法;當認為已經解碼
出一條短信息時,輸出短消息,然后重置所有的狀態變量,返回true,也就是如果
本次數據流解碼中還有沒解碼完的數據,繼續調用doDecode()方法。
下面我們對客戶端稍加改造,來模擬上面的紅、藍、綠三次發送聊天短信息的情況:
1 MyClient: 2 ConnectFuture future = connector.connect(new InetSocketAddress( 3 HOSTNAME, PORT)); 4 future.awaitUninterruptibly(); 5 session = future.getSession(); 6 for (int i = 0; i < 3; i++) { 7 SmsObject sms = new SmsObject(); 8 session.write(sms); 9 System.out.println("****************" + i); 10 } 11 這里我們為了方便演示,不在IoHandler 中發送消息,而是直接在MyClient 中發送,你要 12 注意的是三次發送都要使用同一個IoSession,否則就不是從同一個通道發送過去的了。 13 CmccSipcEncoder: 14 public void encode(IoSession session, Object message, 15 ProtocolEncoderOutput out) throws Exception { 16 SmsObject sms = (SmsObject) message; 17 CharsetEncoder ce = charset.newEncoder(); 18 String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0"; 19 String sender = "15801012253"; 20 String receiver = "15866332698"; 21 String smsContent = "你好!Hello World!"; 22 IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); 23 buffer.putString(statusLine + '\n', ce); 24 buffer.putString("S: " + sender + '\n', ce); 25 buffer.putString("R: " + receiver + '\n', ce); 26 buffer.flip(); 27 out.write(buffer); 28 IoBuffer buffer2 = IoBuffer.allocate(100).setAutoExpand(true); 29 buffer2.putString("L: " + (smsContent.getBytes(charset).length) 30 + "\n",ce); 31 buffer2.putString(smsContent, ce); 32 buffer2.putString(statusLine + '\n', ce); 33 buffer2.flip(); 34 out.write(buffer2); 35 IoBuffer buffer3 = IoBuffer.allocate(100).setAutoExpand(true); 36 buffer3.putString("S: " + sender + '\n', ce); 37 buffer3.putString("R: " + receiver + '\n', ce); 38 buffer3.putString("L: " + (smsContent.getBytes(charset).length) 39 + "\n",ce); 40 buffer3.putString(smsContent, ce); 41 buffer3.putString(statusLine + '\n', ce); 42 buffer3.flip(); 43 out.write(buffer3); 44 }
上面的這段代碼要配合MyClient來操作,你需要做的是在MyClient中的紅色輸出語句處設置
斷點,然后第一調用時CmccSipcEncoder中注釋掉藍、綠色的代碼,也就是發送兩條短信息的
第一部分(紅色的代碼),依次類推,也就是MyClient的中的三次斷點中,分別執行
CmccSipcEncoder中的紅、藍、綠三段代碼,也就是模擬兩條短信的三段發送。
你會看到Server端的運行結果是:當MyClient第一次到達斷點時,沒有短信息被讀取到,當
MyClient第二次到達斷點時,第一條短信息輸出,當MyClient第三次到達斷點時,第二條短
信息輸出。
Mina中自帶的解碼器:
解碼器 說明
CumulativeProtocolDecoder 累積性解碼器,上面我們重點說明了這個解
碼器的用法。
SynchronizedProtocolDecoder 這個解碼器用於將任何一個解碼器包裝為一
個線程安全的解碼器,用於解決上面說的每
次執行decode()方法時可能線程不是上一次
的線程的問題,但這樣會在高並發時,大大
降低系統的性能。
TextLineDecoder 按照文本的換行符( Windows:\r\n 、
Linux:\n、Mac:\r)解碼數據。
PrefixedStringDecoder 這個類繼承自CumulativeProtocolDecoder
類,用於讀取數據最前端的1、2、4 個字節
表示后面的數據長度的數據。譬如:一個段
數據的前兩個字節表示后面的真實數據的長
度,那么你就可以用這個方法進行解碼。
_______________________________________________________________________________
(6-3.)多路分離的解碼器:
假設一段數據發送過來之后,需要根據某種條件決定使用哪個解碼器,而不是像上面的例子,
固定使用一個解碼器,那么該如何做呢?
幸好Mina 提供了org.apache.mina.filter.codec.demux 包來完成這種多路分離
(Demultiplexes)的解碼工作,也就是同時注冊多個解碼器,然后運行時依據傳入的數據
決定到底使用哪個解碼器來工作。所謂多路分離就是依據條件分發到指定的解碼器,譬如:
上面的短信協議進行擴展,可以依據狀態行來判斷使用1.0 版本的短信協議解碼器還是2.0
版本的短信協議解碼器。
下面我們使用一個簡單的例子,說明這個多路分離的解碼器是如何使用的,需求如下所示:
(1.) 客戶端傳入兩個int 類型的數字,還有一個char 類型的符號。
(2.) 如果符號是+,服務端就是用1 號解碼器,對兩個數字相加,然后把結果返回給客戶
端。
(3.) 如果符號是-,服務端就使用2 號解碼器,將兩個數字變為相反數,然后相加,把結
果返回給客戶端。
Demux 開發編解碼器主要有如下幾個步驟:
A. 定義Client 端、Server 端發送、接收的數據對象。
B. 使用Demux 編寫編碼器是實現MessageEncoder<T>接口,T 是你要編碼的數據對象,這
個MessageEncoder 會在DemuxingProtocolEncoder 中調用。
C. 使用Demux 編寫編碼器是實現MessageDecoder 接口,這個MessageDecoder 會在
DemuxingProtocolDecoder 中調用。
D. 在 DemuxingProtocolCodecFactory 中調用addMessageEncoder()、addMessageDecoder()
方法組裝編解碼器。
MessageEncoder的接口如下所示:
public interface MessageEncoder<T> {
void encode(IoSession session, T message, ProtocolEncoderOutput out)
throws Exception;
}
你注意到消息編碼器接口與在ProtocolEncoder 中沒什么不同,區別就是Object message
被泛型具體化了類型,你不需要手動的類型轉換了。
1 MessageDecoder的接口如下所示: 2 public interface MessageDecoder { 3 static MessageDecoderResult OK = MessageDecoderResult.OK; 4 static MessageDecoderResult NEED_DATA = 5 MessageDecoderResult.NEED_DATA; 6 static MessageDecoderResult NOT_OK = MessageDecoderResult.NOT_OK; 7 MessageDecoderResult decodable(IoSession session, IoBuffer in); 8 MessageDecoderResult decode(IoSession session, IoBuffer in, 9 ProtocolDecoderOutput out) throws Exception; 10 void finishDecode(IoSession session, ProtocolDecoderOutput out) 11 throws Exception; 12 }
(1.)decodable()方法有三個返回值,分別表示如下的含義:
A. MessageDecoderResult.NOT_OK:表示這個解碼器不適合解碼數據,然后檢查其它解碼
器,如果都不滿足會拋異常;
B. MessageDecoderResult.NEED_DATA:表示當前的讀入的數據不夠判斷是否能夠使用這
個解碼器解碼,然后再次調用decodable()方法檢查其它解碼器,如果都是NEED_DATA,
則等待下次輸入;
C. MessageDecoderResult.OK: 表示這個解碼器可以解碼讀入的數據, 然后則調用
MessageDecoder 的decode()方法。
這里注意decodable()方法對參數IoBuffer in 的任何操作在方法結束之后,都會復原,也就是
你不必擔心在調用decode()方法時,position 已經不在緩沖區的起始位置。這個方法相當於
是預讀取,用於判斷是否是可用的解碼器。
(2.)decode()方法有三個返回值,分別表示如下的含義:
A. MessageDecoderResult.NOT_OK:表示解碼失敗,會拋異常;
B. MessageDecoderResult.NEED_DATA:表示數據不夠,需要讀到新的數據后,再次調用
decode()方法。
C. MessageDecoderResult.OK:表示解碼成功。
代碼演示:
(1.)客戶端發送的數據對象:
1 public class SendMessage { 2 private int i = 0; 3 private int j = 0; 4 private char symbol = '+'; 5 public char getSymbol() { 6 return symbol; 7 } 8 public void setSymbol(char symbol) { 9 this.symbol = symbol; 10 } 11 public int getI() { 12 return i; 13 } 14 public void setI(int i) { 15 this.i = i; 16 } 17 public int getJ() { 18 return j; 19 } 20 public void setJ(int j) { 21 this.j = j; 22 } 23 } 24 (2.)服務端發送的返回結果對象: 25 public class ResultMessage { 26 private int result = 0; 27 public int getResult() { 28 return result; 29 } 30 public void setResult(int result) { 31 this.result = result; 32 } 33 } 34 (3.)客戶端使用的SendMessage的編碼器: 35 public class SendMessageEncoder implements MessageEncoder<SendMessage> 36 { 37 @Override 38 public void encode(IoSession session, SendMessage message, 39 ProtocolEncoderOutput out) throws Exception { 40 IoBuffer buffer = IoBuffer.allocate(10); 41 buffer.putChar(message.getSymbol()); 42 buffer.putInt(message.getI()); 43 buffer.putInt(message.getJ()); 44 buffer.flip(); 45 out.write(buffer); 46 } 47 }
這里我們的SendMessage、ResultMessage 中的字段都是用長度固定的基本數據類型,這樣
IoBuffer 就不需要自動擴展了,提高性能。按照一個char、兩個int 計算,這里的IoBuffer
只需要10 個字節的長度就可以了。
(4.)服務端使用的SendMessage的1號解碼器:
1 public class SendMessageDecoderPositive implements MessageDecoder { 2 @Override 3 public MessageDecoderResult decodable(IoSession session, IoBuffer in) 4 { 5 if (in.remaining() < 2) 6 return MessageDecoderResult.NEED_DATA; 7 else { 8 char symbol = in.getChar(); 9 if (symbol == '+') { 10 return MessageDecoderResult.OK; 11 } else { 12 return MessageDecoderResult.NOT_OK; 13 } 14 } 15 } 16 @Override 17 public MessageDecoderResult decode(IoSession session, IoBuffer in, 18 ProtocolDecoderOutput out) throws Exception { 19 SendMessage sm = new SendMessage(); 20 sm.setSymbol(in.getChar()); 21 sm.setI(in.getInt()); 22 sm.setJ(in.getInt()); 23 out.write(sm); 24 return MessageDecoderResult.OK; 25 } 26 @Override 27 public void finishDecode(IoSession session, ProtocolDecoderOutput 28 out) 29 throws Exception { 30 // undo 31 } 32 }
因為客戶端發送的SendMessage 的前兩個字節(char)就是符號位,所以我們在decodable()
方法中對此條件進行了判斷,之后讀到兩個字節,並且這兩個字節表示的字符是+時,才認
為這個解碼器可用。
(5.)服務端使用的SendMessage的2號解碼器:
1 public class SendMessageDecoderNegative implements MessageDecoder { 2 @Override 3 public MessageDecoderResult decodable(IoSession session, IoBuffer in) 4 { 5 if (in.remaining() < 2) 6 return MessageDecoderResult.NEED_DATA; 7 else { 8 char symbol = in.getChar(); 9 if (symbol == '-') { 10 return MessageDecoderResult.OK; 11 } else { 12 return MessageDecoderResult.NOT_OK; 13 } 14 } 15 } 16 @Override 17 public MessageDecoderResult decode(IoSession session, IoBuffer in, 18 ProtocolDecoderOutput out) throws Exception { 19 SendMessage sm = new SendMessage(); 20 sm.setSymbol(in.getChar()); 21 sm.setI(-in.getInt()); 22 sm.setJ(-in.getInt()); 23 out.write(sm); 24 return MessageDecoderResult.OK; 25 } 26 @Override 27 public void finishDecode(IoSession session, ProtocolDecoderOutput 28 out) 29 throws Exception { 30 // undo 31 } 32 }
(6.)服務端使用的ResultMessage的編碼器:
1 public class ResultMessageEncoder implements 2 MessageEncoder<ResultMessage> { 3 @Override 4 public void encode(IoSession session, ResultMessage message, 5 ProtocolEncoderOutput out) throws Exception { 6 IoBuffer buffer = IoBuffer.allocate(4); 7 buffer.putInt(message.getResult()); 8 buffer.flip(); 9 out.write(buffer); 10 } 11 }
(7.)客戶端使用的ResultMessage的解碼器:
1 public class ResultMessageDecoder implements MessageDecoder { 2 @Override 3 public MessageDecoderResult decodable(IoSession session, IoBuffer in) 4 { 5 if (in.remaining() < 4) 6 return MessageDecoderResult.NEED_DATA; 7 else if (in.remaining() == 4) 8 return MessageDecoderResult.OK; 9 else 10 return MessageDecoderResult.NOT_OK; 11 } 12 @Override 13 public MessageDecoderResult decode(IoSession session, IoBuffer in, 14 ProtocolDecoderOutput out) throws Exception { 15 ResultMessage rm = new ResultMessage(); 16 rm.setResult(in.getInt()); 17 out.write(rm); 18 return MessageDecoderResult.OK; 19 } 20 @Override 21 public void finishDecode(IoSession session, ProtocolDecoderOutput 22 out) 23 throws Exception { 24 // undo 25 } 26 }
(8.)組裝這些編解碼器的工廠:
1 public class MathProtocolCodecFactory extends 2 DemuxingProtocolCodecFactory { 3 public MathProtocolCodecFactory(boolean server) { 4 if (server) { 5 super.addMessageEncoder(ResultMessage.class, 6 ResultMessageEncoder.class); 7 super.addMessageDecoder(SendMessageDecoderPositive.class); 8 super.addMessageDecoder(SendMessageDecoderNegative.class); 9 } else { 10 super 11 .addMessageEncoder(SendMessage.class, 12 SendMessageEncoder.class); 13 super.addMessageDecoder(ResultMessageDecoder.class); 14 } 15 } 16 }
這個工廠類我們使用了構造方法的一個布爾類型的參數,以便其可以在Server 端、Client
端同時使用。我們以Server 端為例,你可以看到調用兩次addMessageDecoder()方法添加
了1 號、2 號解碼器,其實DemuxingProtocolDecoder 內部在維護了一個MessageDecoder
數組,用於保存添加的所有的消息解碼器,每次decode()的時候就調用每個MessageDecoder
的decodable()方法逐個檢查,只要發現一個MessageDecoder 不是對應的解碼器,就從數
組中移除,直到找到合適的MessageDecoder,如果最后發現數組為空,就表示沒找到對應
的MessageDecoder,最后拋出異常。
(9.)Server端:
1 public class Server { 2 public static void main(String[] args) throws Exception { 3 IoAcceptor acceptor = new NioSocketAcceptor(); 4 LoggingFilter lf = new LoggingFilter(); 5 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 6 5); 7 acceptor.getFilterChain().addLast("logger", lf); 8 acceptor.getFilterChain().addLast("codec", 9 new ProtocolCodecFilter(new 10 MathProtocolCodecFactory(true))); 11 acceptor.setHandler(new ServerHandler()); 12 acceptor.bind(new InetSocketAddress(9123)); 13 } 14 }
(10.)Server端使用的IoHandler:
1 public class ServerHandler extends IoHandlerAdapter { 2 private final static Logger log = LoggerFactory 3 .getLogger(ServerHandler.class); 4 @Override 5 public void sessionIdle(IoSession session, IdleStatus status) 6 throws Exception { 7 session.close(true); 8 } 9 @Override 10 public void messageReceived(IoSession session, Object message) 11 throws Exception { 12 SendMessage sm = (SendMessage) message; 13 log.info("The message received is [ " + sm.getI() + " " 14 + sm.getSymbol() + " " + sm.getJ() + " ]"); 15 ResultMessage rm = new ResultMessage(); 16 rm.setResult(sm.getI() + sm.getJ()); 17 session.write(rm); 18 } 19 }
(11.)Client端:
1 public class Client { 2 public static void main(String[] args) throws Throwable { 3 IoConnector connector = new NioSocketConnector(); 4 connector.setConnectTimeoutMillis(30000); 5 connector.getFilterChain().addLast("logger", new 6 LoggingFilter()); 7 connector.getFilterChain().addLast("codec", 8 new ProtocolCodecFilter(new 9 MathProtocolCodecFactory(false))); 10 connector.setHandler(new ClientHandler()); 11 connector.connect(new InetSocketAddress("localhost", 9123)); 12 } 13 }
(12.)Client端的IoHandler:
1 public class ClientHandler extends IoHandlerAdapter { 2 private final static Logger LOGGER = LoggerFactory 3 .getLogger(ClientHandler.class); 4 @Override 5 public void sessionOpened(IoSession session) throws Exception { 6 SendMessage sm = new SendMessage(); 7 sm.setI(100); 8 sm.setJ(99); 9 sm.setSymbol('+'); 10 session.write(sm); 11 } 12 @Override 13 public void messageReceived(IoSession session, Object message) { 14 ResultMessage rs = (ResultMessage) message; 15 LOGGER.info(String.valueOf(rs.getResult())); 16 } 17 }
你嘗試改變(12.)中的紅色代碼中的正負號,會看到服務端使用了兩個不同的解碼器對其進
行處理。
_______________________________________________________________________________
7.線程模型配置:
Mina 中的很多執行環節都使用了多線程機制,用於提高性能。Mina 中默認在三個地方使用
了線程:
(1.) IoAcceptor:
這個地方用於接受客戶端的連接建立,每監聽一個端口(每調用一次bind()方法),都啟用
一個線程,這個數字我們不能改變。這個線程監聽某個端口是否有請求到來,一旦發現,則
創建一個IoSession 對象。因為這個動作很快,所以有一個線程就夠了。
(2.) IoConnector:
這個地方用於與服務端建立連接,每連接一個服務端(每調用一次connect()方法),就啟
用一個線程,我們不能改變。同樣的,這個線程監聽是否有連接被建立,一旦發現,則創建
一個IoSession 對象。因為這個動作很快,所以有一個線程就夠了。
(3.) IoProcessor:
這個地方用於執行真正的IO 操作,默認啟用的線程個數是CPU 的核數+1,譬如:單CPU 雙
核的電腦,默認的IoProcessor 線程會創建3 個。這也就是說一個IoAcceptor 或者
IoConnector 默認會關聯一個IoProcessor 池,這個池中有3 個IoProcessor。因為IO 操作
耗費資源,所以這里使用IoProcessor 池來完成數據的讀寫操作,有助於提高性能。這也就
是前面說的IoAccetor、IoConnector 使用一個Selector,而IoProcessor 使用自己單獨的
Selector 的原因。
那么為什么IoProcessor 池中的IoProcessor 數量只比CPU 的核數大1 呢?因為IO 讀寫操
作是耗費CPU 的操作,而每一核CPU 同時只能運行一個線程,因此IoProcessor 池中的
IoProcessor 的數量並不是越多越好。
這個IoProcessor 的數量可以調整,如下所示:
IoAcceptor acceptor=new NioSocketAcceptor(5);
IoConnector connector=new NioSocketConnector(5);
這樣就會將IoProcessor 池中的數量變為5 個,也就是說可以同時處理5 個讀寫操作。
還記得前面說過Mina 的解碼器要使用IoSession 保存狀態變量,而不是Decoder 本身,這
是因為Mina 不保證每次執行doDecode()方法的都是同一個IoProcessor 這句話嗎?其實這
個問題的根本原因是IoProcessor 是一個池,每次IoSession 進入空閑狀態時(無讀些數據
發生),IoProcessor 都會被回收到池中,以便其他的IoSession 使用,所以當IoSession
從空閑狀態再次進入繁忙狀態時,IoProcessor 會再次分配給其一個IoProcessor 實例,而
此時已經不能保證還是上一次繁忙狀態時的那個IoProcessor 了。
你還會發現IoAcceptor 、IoConnector 還有一個構造方法, 你可以指定一個
java.util.concurrent.Executor 類作為線程池對象,那么這個線程池對象是做什么用的
呢?其實就是用於創建(1.)、(2.)中的用於監聽是否有TCP 連接建立的那個線程,默認情況
下,使用Executors.newCachedThreadPool()方法創建Executor 實例,也就是一個無界的
線程池(具體內容請參看JAVA 的並發庫)。大家不要試圖改變這個Executor 的實例,也就
是使用內置的即可,否則可能會造成一些莫名其妙的問題,譬如:性能在某個訪問量級別時,
突然下降。因為無界線程池是有多少個Socket 建立,就分配多少個線程,如果你改為
Executors 的其他創建線程池的方法,創建了一個有界線程池,那么一些請求將無法得到及
時響應,從而出現一些問題。
下面我們完整的綜述一下Mina 的工作流程:
(1.) 當 IoService 實例創建的時候,同時一個關聯在IoService 上的IoProcessor 池、
線程池也被創建;
(2.) 當 IoService 建立套接字(IoAcceptor 的bind()或者是IoConnector 的connect()
方法被調用)時,IoService 從線程池中取出一個線程,監聽套接字端口;
(3.) 當 IoService 監聽到套接字上有連接請求時,建立IoSession 對象,從IoProcessor
池中取出一個IoProcessor 實例執行這個會話通道上的過濾器、IoHandler;
(4.) 當這條IoSession 通道進入空閑狀態或者關閉時,IoProcessor 被回收。
上面說的是Mina 默認的線程工作方式,那么我們這里要講的是如何配置IoProcessor 的多
線程工作方式。因為一個IoProcessor 負責執行一個會話上的所有過濾器、IoHandler,也
就是對於IO 讀寫操作來說,是單線程工作方式(就是按照順序逐個執行)。假如你想讓某個
事件方法(譬如:sessionIdle()、sessionOpened()等)在單獨的線程中運行(也就是非
IoProcessor 所在的線程),那么這里就需要用到一個ExecutorFilter 的過濾器。
你可以看到IoProcessor 的構造方法中有一個參數是java.util.concurrent.Executor,也
就是可以讓IoProcessor 調用的過濾器、IoHandler 中的某些事件方法在線程池中分配的線
程上獨立運行,而不是運行在IoProcessor 所在的線程。
例:
acceptor.getFilterChain().addLast("exceutor", new ExecutorFilter());
我們看到是用這個功能,簡單的一行代碼就可以了。那么ExecutorFilter 還有許多重載的
構造方法,這些重載的有參構造方法,參數主要用於指定如下信息:
(1.) 指定線程池的屬性信息,譬如:核心大小、最大大小、等待隊列的性質等。
你特別要關注的是ExecutorFilter 內部默認使用的是OrderedThreadPoolExecutor 作為線
程池的實現,從名字上可以看出是保證各個事件在多線程執行中的順序(譬如:各個事件方
法的執行是排他的,也就是不可能出現兩個事件方法被同時執行;messageReceived()總是
在sessionClosed() 方法之前執行), 這是因為多線程的執行是異步的, 如果沒有
OrderedThreadPoolExecutor 來保證IoHandler 中的方法的調用順序,可能會出現嚴重的問
題。但是如果你的代碼確實沒有依賴於IoHandler 中的事件方法的執行順序,那么你可以使
用UnorderedThreadPoolExecutor 作為線程池的實現。
因此,你也最好不要改變默認的Executor 實現,否則,事件的執行順序就會混亂,譬如:
messageReceived()、messageSent()方法被同時執行。
(2.) 哪些事件方法被關注,也就哪些事件方法用這個線程池執行。
線程池可以異步執行的事件類型是位於IoEventType 中的九個枚舉值中除了
SESSION_CREATED 之外的其余八個,這說明Session 建立的事件只能與IoProcessor 在同一
個線程上執行。
1 public enum IoEventType { 2 SESSION_CREATED, 3 SESSION_OPENED, 4 SESSION_CLOSED, 5 MESSAGE_RECEIVED, 6 MESSAGE_SENT, 7 SESSION_IDLE, 8 EXCEPTION_CAUGHT, 9 WRITE, 10 CLOSE, 11 } 12 默認情況下,沒有配置關注的事件類型,有如下六個事件方法會被自動使用線程池異步執行: 13 IoEventType.EXCEPTION_CAUGHT, 14 IoEventType.MESSAGE_RECEIVED, 15 IoEventType.MESSAGE_SENT, 16 IoEventType.SESSION_CLOSED, 17 IoEventType.SESSION_IDLE, 18 IoEventType.SESSION_OPENED
其實ExecutorFilter 的工作機制很簡單,就是在調用下一個過濾器的事件方法時,把其交
給Executor 的execute(Runnable runnable)方法來執行,其實你自己在IoHandler 或者某
個過濾器的事件方法中開啟一個線程,也可以完成同樣的功能,只不過這樣做,你就失去了
程序的可配置性,線程調用的代碼也會完全耦合在代碼中。但要注意的是絕對不能開啟線程
讓其執行sessionCreated()方法。
如果你真的打算使用這個ExecutorFilter,那么最好想清楚它該放在過濾器鏈的哪個位置,
針對哪些事件做異步處理機制。一般ExecutorFilter 都是要放在ProtocolCodecFilter 過
濾器的后面,也就是不要讓編解碼運行在獨立的線程上,而是要運行在IoProcessor 所在的
線程,因為編解碼處理的數據都是由IoProcessor 讀取和發送的,沒必要開啟新的線程,否
則性能反而會下降。一般使用ExecutorFilter 的典型場景是將業務邏輯(譬如:耗時的數
據庫操作)放在單獨的線程中運行,也就是說與IO 處理無關的操作可以考慮使用
ExecutorFilter 來異步執行。
ref:http://blog.sina.com.cn/s/blog_56fd58ab0100pmwp.html
