netty8---自定義編碼解碼器


package com.cn.codc;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

import com.cn.constant.ConstantValue;
import com.cn.model.Request;

/**
 * 請求解碼器
 * <pre>
 * 數據包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * | 包頭          | 模塊號        | 命令號      |  長度        |   數據       |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 * 包頭4字節
 * 模塊號2字節short
 * 命令號2字節short
 * 長度4字節(描述數據部分字節長度)
 */
public class RequestDecoder extends FrameDecoder{// FrameDecoder 這個decoder可以協助我們解決粘包分包問題
    
    /**
     * 數據包基本長度
     */
    public static int BASE_LENTH = 4 + 2 + 2 + 4;

    //ChannelBuffer里面有一個讀指針和寫指針。讀指針和寫指針初始值是0,寫多少數據寫指針就移動多少
    //調用readShort方法,readInt方法就會移動讀指針, 0 =< readerIndex =< writerIndex
    @Override
    protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {
        
        //可讀長度readableBytes必須大於基本長度才處理
        if(buffer.readableBytes() >= BASE_LENTH){
            //防止socket字節流攻擊
            if(buffer.readableBytes() > 2048){
                buffer.skipBytes(buffer.readableBytes());
            }
            
            //記錄包頭開始的index
            int beginReader;
            
            while(true){//循環讀取,直到包頭讀取完畢
                beginReader = buffer.readerIndex();//獲取讀指針
                buffer.markReaderIndex();
                if(buffer.readInt() == ConstantValue.FLAG){
                    break;
                }
                
                //未讀到包頭,略過一個字節
                buffer.resetReaderIndex();
                buffer.readByte();
                
                //長度又變得不滿足
                if(buffer.readableBytes() < BASE_LENTH){
                    return null;
                }
            }
            
            //包頭讀取完畢,讀取模塊號
            short module = buffer.readShort();
            //讀取命令號
            short cmd = buffer.readShort();
            //讀取長度
            int length = buffer.readInt();
            
            //readableBytes現在可讀的長度小於數據的長度。判斷請求數據包數據部分是否到齊
            if(buffer.readableBytes() < length){
                //還原讀指針,已經讀取了12個字節,但是沒用,所以要還原buffer的讀指針,
                buffer.readerIndex(beginReader);
                return null;//等待后面的數據包來
            }
            
            //比length要長,就讀取data數據
            byte[] data = new byte[length];
            buffer.readBytes(data);//數據讀取完畢
            
            //封裝request對象繼續向下傳遞
            Request request = new Request();
            request.setModule(module);
            request.setCmd(cmd);
            request.setData(data);
            
            //繼續往下傳遞 ,調用sendUpStreamEvent方法向下傳遞
            return request;
            
        }
        //長度短了,數據包不完整,需要等待后面的包來
        return null;
        //FrameDecoder: return null就是等待后面的包,return一個解碼的對象就是向下傳遞。
    }

}
package com.cn.codc;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

import com.cn.constant.ConstantValue;
import com.cn.model.Request;

/**
 * 請求編碼器
 * <pre>
 * 數據包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * | 包頭          | 模塊號        | 命令號      |  長度        |   數據       |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 * 包頭4字節
 * 模塊號2字節short
 * 命令號2字節short
 * 長度4字節(描述數據部分字節長度)
 */
public class RequestEncoder extends OneToOneEncoder{

    //把一個request對象轉換成了一個ChannelBuffer二進制數據
    @Override
    protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception {
        Request request = (Request)(rs);
        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
        //包頭,確定數據包的開始
        buffer.writeInt(ConstantValue.FLAG);
        //module
        buffer.writeShort(request.getModule());
        //cmd
        buffer.writeShort(request.getCmd());
        //長度
        buffer.writeInt(request.getDataLength());
        //data
        if(request.getData() != null){
            buffer.writeBytes(request.getData());
        }
        return buffer;//返回一個ChannelBuffer繼續向下傳遞。
    }

}
package com.cn.codc;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import com.cn.constant.ConstantValue;
import com.cn.model.Response;

/**
 * response解碼器
 * <pre>
 * 數據包格式
 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
 * | 包頭          | 模塊號        | 命令號       |  狀態碼    |  長度          |   數據       |
 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
 * </pre>
 * 包頭4字節
 * 模塊號2字節short
 * 命令號2字節short
 * 長度4字節(描述數據部分字節長度)
 */
public class ResponseDecoder extends FrameDecoder{
    
    /**
     * 數據包基本長度
     */
    public static int BASE_LENTH = 4 + 2 + 2 + 4;

    @Override
    protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {
        
        //可讀長度必須大於基本長度
        if(buffer.readableBytes() >= BASE_LENTH){
            
            //記錄包頭開始的index
            int beginReader = buffer.readerIndex();
            
            while(true){
                if(buffer.readInt() == ConstantValue.FLAG){
                    break;
                }
            }
            
            //模塊號
            short module = buffer.readShort();
            //命令號
            short cmd = buffer.readShort();
            //狀態碼
            int stateCode = buffer.readInt();
            //長度
            int length = buffer.readInt();
            
            if(buffer.readableBytes() < length){
                //還原讀指針
                buffer.readerIndex(beginReader);
                return null;
            }
            
            byte[] data = new byte[length];
            buffer.readBytes(data);
            
            //封裝Response對象
            Response response = new Response();
            response.setModule(module);
            response.setCmd(cmd);
            response.setStateCode(stateCode);
            response.setData(data);
            
            //繼續往下傳遞 
            return response;
            
        }
        //數據包不完整,需要等待后面的包來
        return null;
    }

}
package com.cn.codc;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
import com.cn.constant.ConstantValue;
import com.cn.model.Response;

/**
 * 請求編碼器
 * <pre>
 * 數據包格式
 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
 * | 包頭          | 模塊號        | 命令號       |  狀態碼    |  長度          |   數據       |
 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
 * </pre>
 * 包頭4字節
 * 模塊號2字節short
 * 命令號2字節short
 * 長度4字節(描述數據部分字節長度)
 */
public class ResponseEncoder extends OneToOneEncoder{

    @Override
    protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception {
        Response response = (Response)(rs);
        
        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
        //包頭
        buffer.writeInt(ConstantValue.FLAG);
        //module
        buffer.writeShort(response.getModule());
        //cmd
        buffer.writeShort(response.getCmd());
        //狀態碼
        buffer.writeInt(response.getStateCode());
        //長度
        buffer.writeInt(response.getDataLength());
        //data
        if(response.getData() != null){
            buffer.writeBytes(response.getData());
        }
    
        return buffer;
    }

}
package com.cn.constant;

public interface ConstantValue {
    
    /**
     * 包頭
     */
    public static final int FLAG = -32523523;

}
package com.cn.model;
/**
 * 客戶端請求服務端的對象
 */
public class Request {
    
    /**
     * 請求模塊
     */
    private short module;
    
    /**
     * 命令號
     */
    private short cmd;
    
    /**
     * 數據部分
     */
    private byte[] data;

    public short getModule() {
        return module;
    }

    public void setModule(short module) {
        this.module = module;
    }

    public short getCmd() {
        return cmd;
    }

    public void setCmd(short cmd) {
        this.cmd = cmd;
    }

    public byte[] getData() {
        return data;
    }

    public void setData(byte[] data) {
        this.data = data;
    }
    
    
    public int getDataLength(){
        if(data == null){
            return 0;
        }
        return data.length;
    }
}
package com.cn.model;
/**
 * 服務端返回給客戶端的對象
 */
public class Response {
    /**
     * 請求模塊
     */
    private short module;
    
    /**
     * 命令號
     */
    private short cmd;
    
    /**
     * 狀態碼
     */
    private int stateCode;
    
    /**
     * 數據部分
     */
    private byte[] data;

    public short getModule() {
        return module;
    }

    public void setModule(short module) {
        this.module = module;
    }

    public short getCmd() {
        return cmd;
    }

    public void setCmd(short cmd) {
        this.cmd = cmd;
    }

    public int getStateCode() {
        return stateCode;
    }

    public void setStateCode(int stateCode) {
        this.stateCode = stateCode;
    }

    public byte[] getData() {
        return data;
    }

    public void setData(byte[] data) {
        this.data = data;
    }
    
    public int getDataLength(){
        if(data == null){
            return 0;
        }
        return data.length;
    }
}
package com.cn.model;

public interface StateCode {
    
    /**
     * 成功
     */
    public static int SUCCESS  = 0;
    
    /**
     * 失敗
     */
    public static int FAIL  =  1;

}
package com.cn.module.fuben.request;

import com.cn.serial.Serializer;

//FightRequest是模塊名
public class FightRequest extends Serializer{
    
    /**
     * 副本id
     */
    private int fubenId;
    
    /**
     * 次數
     */
    private int count;

    public int getFubenId() {
        return fubenId;
    }

    public void setFubenId(int fubenId) {
        this.fubenId = fubenId;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    protected void read() {
        this.fubenId = readInt();
        this.count = readInt();
    }

    @Override
    protected void write() {
        writeInt(fubenId);
        writeInt(count);
    }
    
    

}
package com.cn.module.fuben.response;

import com.cn.serial.Serializer;

public class FightResponse extends Serializer{

    /**
     * 獲取金幣
     */
    private int gold;

    public int getGold() {
        return gold;
    }

    public void setGold(int gold) {
        this.gold = gold;
    }

    @Override
    protected void read() {
        this.gold = readInt();
    }

    @Override
    protected void write() {
        writeInt(gold);
    }
}
package com.cn.serial;


import java.nio.ByteOrder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
/**
 * buff工廠
 */
public class BufferFactory {
    
    public static ByteOrder BYTE_ORDER = ByteOrder.BIG_ENDIAN;

    /**
     * 獲取一個buffer
     */
    public static ChannelBuffer getBuffer() {
        ChannelBuffer dynamicBuffer = ChannelBuffers.dynamicBuffer();
        return dynamicBuffer;
    }

    /**
     * 將數據寫入buffer
     */
    public static ChannelBuffer getBuffer(byte[] bytes) {
        ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer(bytes);
        return copiedBuffer;
    }

}
package com.cn.serial;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.jboss.netty.buffer.ChannelBuffer;
/**
 * 自定義序列化接口
 */
public abstract class Serializer {
    
    
    public static final Charset CHARSET = Charset.forName("UTF-8");
    
    protected ChannelBuffer writeBuffer;
    
    protected ChannelBuffer readBuffer;
    
    /**
     * 反序列化具體實現
     */
    protected abstract void read();
    
    /**
     * 序列化具體實現
     */
    protected abstract void write();
    
    /**
     * 從byte數組獲取數據
     * @param bytes    讀取的數組
     */
    public Serializer readFromBytes(byte[] bytes) {
        readBuffer = BufferFactory.getBuffer(bytes);
        read();
        readBuffer.clear();
        return this;
    }
    
    /**
     * 從buff獲取數據
     * @param readBuffer
     */
    public void readFromBuffer(ChannelBuffer readBuffer) {
        this.readBuffer = readBuffer;
        read();
    }
    
    /**
     * 寫入本地buff
     * @return
     */
    public ChannelBuffer writeToLocalBuff(){
        writeBuffer = BufferFactory.getBuffer();
        write();
        return writeBuffer;
    }
    
    /**
     * 寫入目標buff
     * @param buffer
     * @return
     */
    public ChannelBuffer writeToTargetBuff(ChannelBuffer buffer){
        writeBuffer = buffer;
        write();
        return writeBuffer;
    }
    
    /**
     * 返回buffer數組
     * 
     * @return
     */
    public byte[] getBytes() {
        writeToLocalBuff();
        byte[] bytes = null;
        if (writeBuffer.writerIndex() == 0) {
            bytes = new byte[0];
        } else {
            bytes = new byte[writeBuffer.writerIndex()];
            writeBuffer.readBytes(bytes);
        }
        writeBuffer.clear();
        return bytes;
    }

    
    public byte readByte() {
        return readBuffer.readByte();
    }

    public short readShort() {
        return readBuffer.readShort();
    }

    public int readInt() {
        return readBuffer.readInt();
    }

    public long readLong() {
        return readBuffer.readLong();
    }

    public float readFloat() {
        return readBuffer.readFloat();
    }

    public double readDouble() {
        return readBuffer.readDouble();
    }
    
    public String readString() {
        int size = readBuffer.readShort();
        if (size <= 0) {
            return "";
        }

        byte[] bytes = new byte[size];
        readBuffer.readBytes(bytes);

        return new String(bytes, CHARSET);
    }
    
    public <T> List<T> readList(Class<T> clz) {
        List<T> list = new ArrayList<>();
        int size = readBuffer.readShort();
        for (int i = 0; i < size; i++) {
            list.add(read(clz));
        }
        return list;
    }
    
    public <K,V> Map<K,V> readMap(Class<K> keyClz, Class<V> valueClz) {
        Map<K,V> map = new HashMap<>();
        int size = readBuffer.readShort();
        for (int i = 0; i < size; i++) {
            K key = read(keyClz);
            V value = read(valueClz);
            map.put(key, value);    
        }
        return map;
    }
    
    @SuppressWarnings("unchecked")
    public <I> I read(Class<I> clz) {
        Object t = null;
        if ( clz == int.class || clz == Integer.class) {
            t = this.readInt();
        } else if (clz == byte.class || clz == Byte.class){
            t = this.readByte();
        } else if (clz == short.class || clz == Short.class){
            t = this.readShort();
        } else if (clz == long.class || clz == Long.class){
            t = this.readLong();
        } else if (clz == float.class || clz == Float.class){
            t = readFloat();
        } else if (clz == double.class || clz == Double.class){
            t = readDouble();
        } else if (clz == String.class ){
            t = readString();
        } else if (Serializer.class.isAssignableFrom(clz)){
            try {
                byte hasObject = this.readBuffer.readByte();
                if(hasObject == 1){
                    Serializer temp = (Serializer)clz.newInstance();
                    temp.readFromBuffer(this.readBuffer);
                    t = temp;
                }else{
                    t = null;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } 
            
        } else {
            throw new RuntimeException(String.format("不支持類型:[%s]", clz));
        }
        return (I) t;
    }


    public Serializer writeByte(Byte value) {
        writeBuffer.writeByte(value);
        return this;
    }

    public Serializer writeShort(Short value) {
        writeBuffer.writeShort(value);
        return this;
    }

    public Serializer writeInt(Integer value) {
        writeBuffer.writeInt(value);
        return this;
    }

    public Serializer writeLong(Long value) {
        writeBuffer.writeLong(value);
        return this;
    }

    public Serializer writeFloat(Float value) {
        writeBuffer.writeFloat(value);
        return this;
    }

    public Serializer writeDouble(Double value) {
        writeBuffer.writeDouble(value);
        return this;
    }

    public <T> Serializer writeList(List<T> list) {
        if (isEmpty(list)) {
            writeBuffer.writeShort((short) 0);
            return this;
        }
        writeBuffer.writeShort((short) list.size());
        for (T item : list) {
            writeObject(item);
        }
        return this;
    }

    public <K,V> Serializer writeMap(Map<K, V> map) {
        if (isEmpty(map)) {
            writeBuffer.writeShort((short) 0);
            return this;
        }
        writeBuffer.writeShort((short) map.size());
        for (Entry<K, V> entry : map.entrySet()) {
            writeObject(entry.getKey());
            writeObject(entry.getValue());
        }
        return this;
    }

    public Serializer writeString(String value) {
        if (value == null || value.isEmpty()) {
            writeShort((short) 0);
            return this;
        }

        byte data[] = value.getBytes(CHARSET);
        short len = (short) data.length;
        writeBuffer.writeShort(len);
        writeBuffer.writeBytes(data);
        return this;
    }

    public Serializer writeObject(Object object) {
        
        if(object == null){
            writeByte((byte)0);
        }else{
            if (object instanceof Integer) {
                writeInt((int) object);
                return this;
            }

            if (object instanceof Long) {
                writeLong((long) object);
                return this;
            }

            if (object instanceof Short) {
                writeShort((short) object);
                return this;
            }

            if (object instanceof Byte) {
                writeByte((byte) object);
                return this;
            }

            if (object instanceof String) {
                String value = (String) object;
                writeString(value);
                return this;
            }
            if (object instanceof Serializer) {
                writeByte((byte)1);
                Serializer value = (Serializer) object;
                value.writeToTargetBuff(writeBuffer);
                return this;
            }
            
            throw new RuntimeException("不可序列化的類型:" + object.getClass());
        }
        
        return this;
    }

    private <T> boolean isEmpty(Collection<T> c) {
        return c == null || c.size() == 0;
    }
    public <K,V> boolean isEmpty(Map<K,V> c) {
        return c == null || c.size() == 0;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM