基於Mina開發網絡通信程序,在傳感器數據接入領域應用的很廣泛,今天我無意中發現一個問題,那就是我在前端session.write(msg)數據出去之后,卻沒有經過Filter的Encoder方法,同樣能夠寫入遠程服務器。因為我所發送的數據不需要很復雜的編碼,所以encoder方法也一直沒有去看,今天發現無法被自己寫的過濾器所編碼,針對這個問題,我打開以前的代碼以及以前的項目中的相關代碼,有些同事也是session.write(IoBuffer)之后,在encoder方法里面還加上了一句out.write(message);通過跟蹤Mina源碼發現,session寫出去的數據類型是IoBuffer格式的,就不經過自定義的過濾器了。所以下面的代碼壓根是多余的
- @Override
- public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
- out.write(message);//IoBuffer格式寫出去之后,跳過了encoder.
- }
下面我把自己跟蹤調試Mina的過程記錄下來.
一、場景
客戶端需要每隔Time時間向服務端發送心跳包,代碼如下:
session.write(IoBuffer.wrap("心跳包XXX".getBytes()));
二、現象
MyFilter中的Encoder方法encoder不執行
- public class MyFilter implements ProtocolCodecFactory {
- private ProtocolEncoder encoder = new MyEncoder();
- private ProtocolDecoder decoder = new MyDecoder();
- @Override
- public ProtocolEncoder getEncoder(IoSession session) throws Exception {
- return encoder;
- }
- @Override
- public ProtocolDecoder getDecoder(IoSession session) throws Exception {
- return decoder;
- }
- }
三、分析
進入session.write方法,實現IoSession.write方法的是AbstractIoSession。直接調用的是
- public WriteFuture write(Object message) {
- return write(message, null);
- }
而AbstractIoSession.write(Object message, SocketAddress address)
該方法的工作流程是:
- 創建WriteFeature對象,用於返回值(session.write本身就是返回writeFeature)
- 將session.write(message)中的Object類型的message封裝成writeRequest.
- 啟動write動作,這個主要是IoFilterChain來完成的。
具體的核心代碼如下:
- // Now, we can write the message. First, create a future
- WriteFuture writeFuture = new DefaultWriteFuture(this);
- WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
- // Then, get the chain and inject the WriteRequest into it
- IoFilterChain filterChain = getFilterChain();
- filterChain.fireFilterWrite(writeRequest);
繼續跟蹤到fireFilterWrite里面去,可知IoFilterChain的默認實現類DefaultIoFilterChain中的關鍵方法:
- public void fireFilterWrite(WriteRequest writeRequest) {
- Entry tail = this.tail;
- callPreviousFilterWrite(tail, session, writeRequest);
- }
在這里先要介紹一下DefaultIoFiterChain的數據格式,主要的屬性如下:
- private final Map<String, Entry> name2entry = new ConcurrentHashMap<String, Entry>();
- /** The chain head */
- private final EntryImpl head;
- /** The chain tail */
- private final EntryImpl tail;
其中 head與tail都是DefaultIoFilterChain固有的屬性,name2entity是我們為FilterChain添加的過濾器。因而IoFilterChain是用一個鏈表來保存過濾器的(('tail', prev: 'myFilter:ProtocolCodecFilter', next: 'null')),其中表頭和表位都是固定的head和tail,他們對應的Filter也是專有的,HeadFilter和TailFilter.
關鍵方法是callPreviousFilterWrite(tail, session, writeRequest);
- try {
- IoFilter filter = entry.getFilter();
- NextFilter nextFilter = entry.getNextFilter();
- filter.filterWrite(nextFilter, session, writeRequest);
- } catch (Throwable e) {
- writeRequest.getFuture().setException(e);
- fireExceptionCaught(e);
- }
從上面兩個代碼片段中,可以看出,IoFilterChain首先從列表中找到tail,從tail開始查找filter,順序調用每個filter的filterWrite()方法。這里的‘順序調用’,指的是從tail->head調用,也就是逆向調用Filter。但是看到filter.filterWrite(nextFilter, session, writeRequest);這行代碼中的參數可以發現,nextFilter,表面的意思是下一個過濾器,有點誤解,感覺tail下一個過濾器不就是null嗎,其實不然,進入filterWriter可知。
- Entry nextEntry = EntryImpl.this.prevEntry;
- callPreviousFilterWrite(nextEntry, session, writeRequest);
對於除head和tail過濾器外,其他的過濾器是如何工作的呢?我們看看ProtocolCodecFilter中的fireFilter方法,做了這樣的處理:
- if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
- nextFilter.filterWrite(session, writeRequest);
- return;
- }
到這里,就明白了為什么session.write(IoBuffer.wrap())這樣寫出去,無法經過自己定義的過濾器了,原來在fireFilter中,對message做了判斷,如果已經是IoBuffer類型的,就直接return了。
最后執行的是HeadFilter的fireFilter方法,直接看內容:
- if (writeRequest.getMessage() instanceof IoBuffer) {
- IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
- // I/O processor implementation will call buffer.reset()
- // it after the write operation is finished, because
- // the buffer will be specified with messageSent event.
- buffer.mark();
- int remaining = buffer.remaining();
- if (remaining == 0) {
- // Zero-sized buffer means the internal message
- // delimiter.
- s.increaseScheduledWriteMessages();
- } else {
- s.increaseScheduledWriteBytes(remaining);
- }
- } else {
- s.increaseScheduledWriteMessages();
- }
- s.getWriteRequestQueue().offer(s, writeRequest);
- if (!s.isWriteSuspended()) {
- s.getProcessor().flush(s);
- }
WriteRequestQueue的默認實現就是java.util.concurrent.ConcurrentLinkedQueue,舍去傳入的session對象。