自定義兼容多種Protobuf協議的編解碼器


《從零開始搭建游戲服務器》自定義兼容多種Protobuf協議的編解碼器

直接在protobuf序列化數據的前面,加上一個自定義的協議頭,協議頭里包含序列數據的長度和對應的數據類型,在數據解包的時候根據包頭來進行反序列化。

1.協議頭定義

關於這一塊,我打算先采取比較簡單的辦法,結構如下: 
 
協議號是自定義的一個int類型的枚舉(當然,假如協議吧比較少的話,可以用一個short來代替int以縮小數據包),這個協議號與協議類型是一一對應的,而協議頭通常使用數據總長度來填入,具體過程如下:

  • 當客戶端向服務器發送數據時,會根據協議類型加上協議號,然后使用protobuf序列化之后再發送給服務器;
  • 當服務器發送數據給客戶端時,根據協議號,確定protobuf協議類型以反序列化數據,並調用相應回調方法。

2.自定義的編碼器和解碼器

編碼器: 
參考netty自帶的編碼器ProtobufEncoder可以發現,被綁定到ChannelPipeline上用於序列化協議數據的編碼器,必須繼承MessageToByteEncoder<MessageLite>這個基類,並通過重寫protected void encode(ChannelHandlerContext ctx, MessageLite msg, ByteBuf out)這個方法來實現自定義協議格式的目的:

package com.tw.login.tools;
import com.google.protobuf.MessageLite;
import com.tw.login.proto.CsEnum.EnmCmdID;
import com.tw.login.proto.CsLogin;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
 * 自定義編碼器
 * @author linsh
 *
 */
public class PackEncoder extends MessageToByteEncoder<MessageLite> {
    /**
     * 傳入協議數據,產生攜帶包頭之后的數據
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageLite msg, ByteBuf out) throws Exception {
        // TODO Auto-generated method stub
        byte[] body = msg.toByteArray();
        byte[] header = encodeHeader(msg, (short)body.length);
        out.writeBytes(header);
        out.writeBytes(body);
        return;
    }
    /**
     * 獲得一個協議頭
     * @param msg
     * @param bodyLength
     * @return
     */
    private byte[] encodeHeader(MessageLite msg,short bodyLength){
        short _typeId = 0;
        if(msg instanceof CsLogin.CSLoginReq){
            _typeId = EnmCmdID.CS_LOGIN_REQ_VALUE;
        }else if(msg instanceof CsLogin.CSLoginRes){
            _typeId = EnmCmdID.CS_LOGIN_RES_VALUE;
        }
        //存放兩個short數據
        byte[] header = new byte[4];
        //前兩位放數據長度
        header[0] = (byte) (bodyLength & 0xff);
        header[1] = (byte) ((bodyLength >> 8) & 0xff);
        //后兩個字段存協議id
        header[2] = (byte) (_typeId & 0xff);
        header[3] = (byte) ((_typeId >> 8) & 0xff);
        return header;
    }
}

 解碼器: 
參考netty自帶的編碼器ProtobufDecoder可以發現,被綁定到ChannelPipeline上用於序列化協議數據的解碼器,必須繼承ByteToMessageDecoder這個基類,並通過重寫protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)這個方法來實現解析自定義協議格式的目的:

package com.tw.login.tools;
import java.util.List;
import com.google.protobuf.MessageLite;
import com.tw.login.proto.CsEnum.EnmCmdID;
import com.tw.login.proto.CsLogin.CSLoginReq;
import com.tw.login.proto.CsLogin.CSLoginRes;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
public class PackDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 獲取包頭中的body長度
        byte low = in.readByte();
        byte high = in.readByte();
        short s0 = (short) (low & 0xff);
        short s1 = (short) (high & 0xff);
        s1 <<= 8;
        short length = (short) (s0 | s1);
        // 獲取包頭中的protobuf類型
        byte low_type = in.readByte();
        byte high_type = in.readByte();
        short s0_type = (short) (low_type & 0xff);
        short s1_type = (short) (high_type & 0xff);
        s1_type <<= 8;
        short dataTypeId = (short) (s0_type | s1_type);
        // 如果可讀長度小於body長度,恢復讀指針,退出。
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }
        //開始讀取核心protobuf數據
        ByteBuf bodyByteBuf = in.readBytes(length);
        byte[] array;
        //反序列化數據的起始點
        int offset;
        //可讀的數據字節長度
        int readableLen= bodyByteBuf.readableBytes();
        //分為包含數組數據和不包含數組數據兩種形式
        if (bodyByteBuf.hasArray()) {
            array = bodyByteBuf.array();
            offset = bodyByteBuf.arrayOffset()   bodyByteBuf.readerIndex();
        } else {
            array = new byte[readableLen];
            bodyByteBuf.getBytes(bodyByteBuf.readerIndex(), array, 0, readableLen);
            offset = 0;
        }
        //反序列化
        MessageLite result = decodeBody(dataTypeId, array, offset, readableLen);
        out.add(result);
    }
    /**
     * 根據協議號用響應的protobuf類型來解析協議數據
     * @param _typeId
     * @param array
     * @param offset
     * @param length
     * @return
     * @throws Exception
     */
    public MessageLite decodeBody(int _typeId,byte[] array,int offset,int length) throws Exception{
        if(_typeId == EnmCmdID.CS_LOGIN_REQ_VALUE){
            return CSLoginReq.getDefaultInstance().getParserForType().parseFrom(array,offset,length);
        }
        else if(_typeId == EnmCmdID.CS_LOGIN_RES_VALUE){
            return CSLoginRes.getDefaultInstance().getParserForType().parseFrom(array,offset,length);
        }
        return null;
    }
}

 

3.修改Socket管道綁定的編解碼器:

在創建Socket管道的時候,將編解碼器替換為自定義的編解碼器,而具體數據發送和接受過程無需做任何修改:

ChannelPipeline pipeline = ch.pipeline();
// 協議數據的編解碼器
pipeline.addLast("frameDecoder",new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder",new PackDecoder());
pipeline.addLast("frameEncoder",new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new PackEncoder());
pipeline.addLast("handler",new SocketServerHandler());

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM