Mina框架斷包、粘包問題解決方式


Mina框架斷包、粘包問題解決方式

Apache Mina Server 是一個網絡通信應用框架,也就是說,它主要是對基於TCP/IP、UDP/IP協議棧的通信框架(當然。也能夠提供JAVA 對象的序列化服務、虛擬機管道通信服務等),Mina 能夠幫助我們高速開發高性能、高擴展性的網絡通信應用,Mina 提供了事件驅動、異步(Mina 的異步IO 默認使用的是JAVA NIO 作為底層支持)操作的編程模型。

在mina中,一般的應用場景用TextLine的Decode和Encode就夠用了(TextLine的默認切割符盡管是\r\n,但事實上分隔符是能夠自己指定的,如:newTextLineDecoder(charset, decodingDelimiter);)

但默認解碼器每次讀取緩沖的數據是有限制的,即ReadBufferSize的大小。默認是2048個字節。當數據包比較大時將被分成多次讀取。造成斷包。

盡管能夠通過acceptor.getSessionConfig().setReadBufferSize(newsize)這樣的方式來添加默認容量,但畢竟不是王道(太大了浪費空間。肯定會減少數據的處理效率)。

所以。當我們接收的數據的大小不是非常固定,且easy偏大的時候,默認的TextLine就不適合了。

這時我們在解析之前就須要推斷數據包是否完整,這樣處理起來就會非常麻煩。那么Mina 中幸好提供了CumulativeProtocolDecoder

類,從名字上能夠看出累積性的協議解碼器,也就是說僅僅要有數據發送過來。這個類就會去讀取數據。然后累積到內部的IoBuffer 緩沖區,可是詳細的拆包(把累積到緩沖區的數據解碼為JAVA 對象)交由子類的doDecode()方法完畢,實際上CumulativeProtocolDecoder就是在decode()重復的調用暴漏給子類實現的doDecode()方法。

詳細運行步驟例如以下所看到的:

A. 你的doDecode()方法返回true 時,CumulativeProtocolDecoder 的decode()方法會首先推斷你是否在doDecode()方法中從內部的IoBuffer 緩沖區讀取了數據。假設沒有,則會拋出非法的狀態異常,也就是你的doDecode()方法返回true 就表示你已經消費了本次數據(相當於聊天室中一個完整的消息已經讀取完成),進一步說,也就是此時你必須已經消費過內部的IoBuffer 緩沖區的數據(哪怕是消費了一個字節的數據)。

假設驗證通過,那么CumulativeProtocolDecoder會檢查緩沖區內是否還有數據未讀取。假設有就繼續調用doDecode()方法。沒有就停止對doDecode()方法的調用。直到有新的數據被緩沖。

B. 當你的doDecode()方法返回false 時。CumulativeProtocolDecoder 會停止對doDecode()方法的調用。但此時假設本次數據還有未讀取完的,就將含有剩余數據的IoBuffer 緩沖區保存到IoSession 中,以便下一次數據到來時能夠從IoSession 中提取合並。

假設發現本次數據全都讀取完成,則清空IoBuffer 緩沖區。

簡而言之,當你覺得讀取到的數據已經夠解碼了。那么就返回true,否則就返回false。這個CumulativeProtocolDecoder事實上最重要的工作就是幫你完畢了數據的累積,由於這個工作是非常煩瑣的。

一、    實現解碼器

CumulativeProtocolDecoder是一個抽象類,必須繼承並實現其doDecode方法。用戶自己定義協議的拆分就應該寫在doDecode方法中,以下的MyDecoder類是一個其子類的實現:

public class MyDecoder extends CumulativeProtocolDecoder {

    public static Logger log = Logger.getLogger(MyDecoder.class);

    /**

     * 包解碼器組件

     */

    private PacketComponent packetComponent;

     /**

     * 這種方法的返回值是重點:

     * 1、當內容剛好時,返回false,告知父類接收下一批內容

     * 2、內容不夠時須要下一批發過來的內容,此時返回false,這樣父類 CumulativeProtocolDecoder

     *   會將內容放進IoSession中,等下次來數據后就自己主動拼裝再交給本類的doDecode

     * 3、當內容多時,返回true,由於須要再將本批數據進行讀取。父類會將剩余的數據再次推送本

     * 類的doDecode

     */ 

    public boolean doDecode(IoSession session,IoBuffer in, 

            ProtocolDecoderOutput out) throws Exception { 

    log.info("in.remaining : "+in.remaining());

        if(in.remaining() > 0){//有數據時。讀取前8字節推斷消息長度 

            byte [] sizeBytes = new byte[8]; 

            in.mark();//標記當前位置。以便reset

//由於我的前數據包的長度是保存在第4-8字節中,

            in.get(sizeBytes,0,8);//讀取4字節 

                        //DataTypeChangeHelper是自己寫的一個byte[]int的一個工具類 

            int size = (int) DataTypeUtil.bytesToInt(sizeBytes,4);

            log.info("size : "+size);

            in.reset();

            if(size > in.remaining()){//假設消息內容不夠,則重置。相當於不讀取size 

                return false;//父類接收新數據,以拼湊成完整數據 

            } else

                byte[] bytes = new byte[size];  

                in.get(bytes, 0, size);

              //把字節轉換為Java對象的工具類

                PackageData pack = packetComponent.getDataFromBuffer(IoBuffer.wrap(bytes));

                out.write(pack);

                if(in.remaining() > 0){//假設讀取內容后還粘了包,就讓父類再重讀  一次。進行下一次解析 

                    return true

                } 

            } 

        } 

        return false;//處理成功,讓父類進行接收下個包 

    }

    getter();

    Setter();

}

二、    實現編解碼工廠和解碼器

我們還須要一個編解碼工廠,用來為編解碼過濾器提供編碼器和解碼器,解碼器此處我們用不到。可是也必須提供,所以能夠提供一個空的實現。

/**

 *

 * 編解碼工廠

 *

 */

public class MyCodecFcatory implements ProtocolCodecFactory {

    private ProtocolEncoder encoder = null;

    private ProtocolDecoder decoder = null;

 

    public MyCodecFcatory(ProtocolEncoder encoder, ProtocolDecoderdecoder) {

       this.encoder = encoder;

       this.decoder = decoder;

    }

 

    @Override

    public ProtocolEncoder getEncoder(IoSession session) throws Exception {

       return this.encoder;

    }

 

    @Override

    public ProtocolDecoder getDecoder(IoSession session) throws Exception {

       return this.decoder;

    }

}

/**

 *

 * 編碼器:不做不論什么操作,數據已是約定好的格式。按原格式編碼

 *

 */

public class MyEncoder extends ProtocolEncoderAdapter {

 

    @Override

    public void encode(IoSession session, Object message,

           ProtocolEncoderOutput out) throws Exception {

       // TODO Do nothing

    }

}

三、    配置編解碼過濾器

以下就能夠配置編解碼過濾器了:

<!-- 累加數據包解碼器:解斷丟包、粘包問題 -->

    <bean id="codec" class="org.apache.mina.filter.codec.ProtocolCodecFilter">

       <constructor-arg>

           <bean class="com.mina.codec.MyCodecFcatory">

              <constructor-arg index="0">

                  <bean class="com.mina.codec.MyEncoder"></bean>

              </constructor-arg>

              <constructor-arg index="1">

                  <bean class="com.mina.codec.MyDecoder">

                     <property name="packetComponent">

                         <bean class="com. mina.component.RootComponent">

                           

                         </bean>

                      </property>

                  </bean>

              </constructor-arg>

                 

           </bean>

       </constructor-arg>

    </bean>

<bean id="filterChainBuilder" class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder"> 

      <property name="filters"> 

        <map>

          <entry key="codec" value-ref="codec"/>

          <entry key="logger" value-ref="loggerFilter"/>

          <entry key="executors" value-ref="executors"/>

        </map> 

      </property> 

    </bean> 

須要注意的是:在doDecode中通過out.write(pack) 把數據輸出后,官方的說明文檔中說接下來會繼續運行后面的過濾器,然后是IoHandle。假設你是僅僅用了一個編解碼過濾器的話,這可能全然沒問題,可是假設使用了兩個編解碼過濾器(可能非常少有人會這樣做,但本人因為前期使用了另外一個自己定義的編解碼過濾器。后來想加上這個可累加的解碼器。為了圖省事就在原過濾器的前面新添加了一個編解碼過濾器,后來數據流就不走我原來的編解碼過濾器了,out.write()之后直接到了IoHandle里面,搞了我好久,無奈最后把兩個編解碼過濾器合二為一啦。當中原因我還沒時間去搞個清楚。為防止大家和我犯同一個錯誤,特此提醒!)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM