
package com.cn.codc; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; import com.cn.constant.ConstantValue; import com.cn.model.Request; /** * 請求解碼器 * <pre> * 數據包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包頭 | 模塊號 | 命令號 | 長度 | 數據 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包頭4字節 * 模塊號2字節short * 命令號2字節short * 長度4字節(描述數據部分字節長度) */ public class RequestDecoder extends FrameDecoder{// FrameDecoder 這個decoder可以協助我們解決粘包分包問題 /** * 數據包基本長度 */ public static int BASE_LENTH = 4 + 2 + 2 + 4; //ChannelBuffer里面有一個讀指針和寫指針。讀指針和寫指針初始值是0,寫多少數據寫指針就移動多少 //調用readShort方法,readInt方法就會移動讀指針, 0 =< readerIndex =< writerIndex @Override protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception { //可讀長度readableBytes必須大於基本長度才處理 if(buffer.readableBytes() >= BASE_LENTH){ //防止socket字節流攻擊 if(buffer.readableBytes() > 2048){ buffer.skipBytes(buffer.readableBytes()); } //記錄包頭開始的index int beginReader; while(true){//循環讀取,直到包頭讀取完畢 beginReader = buffer.readerIndex();//獲取讀指針 buffer.markReaderIndex(); if(buffer.readInt() == ConstantValue.FLAG){ break; } //未讀到包頭,略過一個字節 buffer.resetReaderIndex(); buffer.readByte(); //長度又變得不滿足 if(buffer.readableBytes() < BASE_LENTH){ return null; } } //包頭讀取完畢,讀取模塊號 short module = buffer.readShort(); //讀取命令號 short cmd = buffer.readShort(); //讀取長度 int length = buffer.readInt(); //readableBytes現在可讀的長度小於數據的長度。判斷請求數據包數據部分是否到齊 if(buffer.readableBytes() < length){ //還原讀指針,已經讀取了12個字節,但是沒用,所以要還原buffer的讀指針, buffer.readerIndex(beginReader); return null;//等待后面的數據包來 } //比length要長,就讀取data數據 byte[] data = new byte[length]; buffer.readBytes(data);//數據讀取完畢 //封裝request對象繼續向下傳遞 Request request = new Request(); request.setModule(module); request.setCmd(cmd); request.setData(data); //繼續往下傳遞 ,調用sendUpStreamEvent方法向下傳遞 return request; } //長度短了,數據包不完整,需要等待后面的包來 return null; //FrameDecoder: return null就是等待后面的包,return一個解碼的對象就是向下傳遞。 } }
package com.cn.codc; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; import com.cn.constant.ConstantValue; import com.cn.model.Request; /** * 請求編碼器 * <pre> * 數據包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包頭 | 模塊號 | 命令號 | 長度 | 數據 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包頭4字節 * 模塊號2字節short * 命令號2字節short * 長度4字節(描述數據部分字節長度) */ public class RequestEncoder extends OneToOneEncoder{ //把一個request對象轉換成了一個ChannelBuffer二進制數據 @Override protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception { Request request = (Request)(rs); ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); //包頭,確定數據包的開始 buffer.writeInt(ConstantValue.FLAG); //module buffer.writeShort(request.getModule()); //cmd buffer.writeShort(request.getCmd()); //長度 buffer.writeInt(request.getDataLength()); //data if(request.getData() != null){ buffer.writeBytes(request.getData()); } return buffer;//返回一個ChannelBuffer繼續向下傳遞。 } }
package com.cn.codc; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; import com.cn.constant.ConstantValue; import com.cn.model.Response; /** * response解碼器 * <pre> * 數據包格式 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * | 包頭 | 模塊號 | 命令號 | 狀態碼 | 長度 | 數據 | * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * </pre> * 包頭4字節 * 模塊號2字節short * 命令號2字節short * 長度4字節(描述數據部分字節長度) */ public class ResponseDecoder extends FrameDecoder{ /** * 數據包基本長度 */ public static int BASE_LENTH = 4 + 2 + 2 + 4; @Override protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception { //可讀長度必須大於基本長度 if(buffer.readableBytes() >= BASE_LENTH){ //記錄包頭開始的index int beginReader = buffer.readerIndex(); while(true){ if(buffer.readInt() == ConstantValue.FLAG){ break; } } //模塊號 short module = buffer.readShort(); //命令號 short cmd = buffer.readShort(); //狀態碼 int stateCode = buffer.readInt(); //長度 int length = buffer.readInt(); if(buffer.readableBytes() < length){ //還原讀指針 buffer.readerIndex(beginReader); return null; } byte[] data = new byte[length]; buffer.readBytes(data); //封裝Response對象 Response response = new Response(); response.setModule(module); response.setCmd(cmd); response.setStateCode(stateCode); response.setData(data); //繼續往下傳遞 return response; } //數據包不完整,需要等待后面的包來 return null; } }
package com.cn.codc; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; import com.cn.constant.ConstantValue; import com.cn.model.Response; /** * 請求編碼器 * <pre> * 數據包格式 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * | 包頭 | 模塊號 | 命令號 | 狀態碼 | 長度 | 數據 | * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * </pre> * 包頭4字節 * 模塊號2字節short * 命令號2字節short * 長度4字節(描述數據部分字節長度) */ public class ResponseEncoder extends OneToOneEncoder{ @Override protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception { Response response = (Response)(rs); ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); //包頭 buffer.writeInt(ConstantValue.FLAG); //module buffer.writeShort(response.getModule()); //cmd buffer.writeShort(response.getCmd()); //狀態碼 buffer.writeInt(response.getStateCode()); //長度 buffer.writeInt(response.getDataLength()); //data if(response.getData() != null){ buffer.writeBytes(response.getData()); } return buffer; } }
package com.cn.constant; public interface ConstantValue { /** * 包頭 */ public static final int FLAG = -32523523; }
package com.cn.model; /** * 客戶端請求服務端的對象 */ public class Request { /** * 請求模塊 */ private short module; /** * 命令號 */ private short cmd; /** * 數據部分 */ private byte[] data; public short getModule() { return module; } public void setModule(short module) { this.module = module; } public short getCmd() { return cmd; } public void setCmd(short cmd) { this.cmd = cmd; } public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } public int getDataLength(){ if(data == null){ return 0; } return data.length; } }
package com.cn.model; /** * 服務端返回給客戶端的對象 */ public class Response { /** * 請求模塊 */ private short module; /** * 命令號 */ private short cmd; /** * 狀態碼 */ private int stateCode; /** * 數據部分 */ private byte[] data; public short getModule() { return module; } public void setModule(short module) { this.module = module; } public short getCmd() { return cmd; } public void setCmd(short cmd) { this.cmd = cmd; } public int getStateCode() { return stateCode; } public void setStateCode(int stateCode) { this.stateCode = stateCode; } public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } public int getDataLength(){ if(data == null){ return 0; } return data.length; } }
package com.cn.model; public interface StateCode { /** * 成功 */ public static int SUCCESS = 0; /** * 失敗 */ public static int FAIL = 1; }
package com.cn.module.fuben.request; import com.cn.serial.Serializer; //FightRequest是模塊名 public class FightRequest extends Serializer{ /** * 副本id */ private int fubenId; /** * 次數 */ private int count; public int getFubenId() { return fubenId; } public void setFubenId(int fubenId) { this.fubenId = fubenId; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override protected void read() { this.fubenId = readInt(); this.count = readInt(); } @Override protected void write() { writeInt(fubenId); writeInt(count); } }
package com.cn.module.fuben.response; import com.cn.serial.Serializer; public class FightResponse extends Serializer{ /** * 獲取金幣 */ private int gold; public int getGold() { return gold; } public void setGold(int gold) { this.gold = gold; } @Override protected void read() { this.gold = readInt(); } @Override protected void write() { writeInt(gold); } }
package com.cn.serial; import java.nio.ByteOrder; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; /** * buff工廠 */ public class BufferFactory { public static ByteOrder BYTE_ORDER = ByteOrder.BIG_ENDIAN; /** * 獲取一個buffer */ public static ChannelBuffer getBuffer() { ChannelBuffer dynamicBuffer = ChannelBuffers.dynamicBuffer(); return dynamicBuffer; } /** * 將數據寫入buffer */ public static ChannelBuffer getBuffer(byte[] bytes) { ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer(bytes); return copiedBuffer; } }
package com.cn.serial; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.jboss.netty.buffer.ChannelBuffer; /** * 自定義序列化接口 */ public abstract class Serializer { public static final Charset CHARSET = Charset.forName("UTF-8"); protected ChannelBuffer writeBuffer; protected ChannelBuffer readBuffer; /** * 反序列化具體實現 */ protected abstract void read(); /** * 序列化具體實現 */ protected abstract void write(); /** * 從byte數組獲取數據 * @param bytes 讀取的數組 */ public Serializer readFromBytes(byte[] bytes) { readBuffer = BufferFactory.getBuffer(bytes); read(); readBuffer.clear(); return this; } /** * 從buff獲取數據 * @param readBuffer */ public void readFromBuffer(ChannelBuffer readBuffer) { this.readBuffer = readBuffer; read(); } /** * 寫入本地buff * @return */ public ChannelBuffer writeToLocalBuff(){ writeBuffer = BufferFactory.getBuffer(); write(); return writeBuffer; } /** * 寫入目標buff * @param buffer * @return */ public ChannelBuffer writeToTargetBuff(ChannelBuffer buffer){ writeBuffer = buffer; write(); return writeBuffer; } /** * 返回buffer數組 * * @return */ public byte[] getBytes() { writeToLocalBuff(); byte[] bytes = null; if (writeBuffer.writerIndex() == 0) { bytes = new byte[0]; } else { bytes = new byte[writeBuffer.writerIndex()]; writeBuffer.readBytes(bytes); } writeBuffer.clear(); return bytes; } public byte readByte() { return readBuffer.readByte(); } public short readShort() { return readBuffer.readShort(); } public int readInt() { return readBuffer.readInt(); } public long readLong() { return readBuffer.readLong(); } public float readFloat() { return readBuffer.readFloat(); } public double readDouble() { return readBuffer.readDouble(); } public String readString() { int size = readBuffer.readShort(); if (size <= 0) { return ""; } byte[] bytes = new byte[size]; readBuffer.readBytes(bytes); return new String(bytes, CHARSET); } public <T> List<T> readList(Class<T> clz) { List<T> list = new ArrayList<>(); int size = readBuffer.readShort(); for (int i = 0; i < size; i++) { list.add(read(clz)); } return list; } public <K,V> Map<K,V> readMap(Class<K> keyClz, Class<V> valueClz) { Map<K,V> map = new HashMap<>(); int size = readBuffer.readShort(); for (int i = 0; i < size; i++) { K key = read(keyClz); V value = read(valueClz); map.put(key, value); } return map; } @SuppressWarnings("unchecked") public <I> I read(Class<I> clz) { Object t = null; if ( clz == int.class || clz == Integer.class) { t = this.readInt(); } else if (clz == byte.class || clz == Byte.class){ t = this.readByte(); } else if (clz == short.class || clz == Short.class){ t = this.readShort(); } else if (clz == long.class || clz == Long.class){ t = this.readLong(); } else if (clz == float.class || clz == Float.class){ t = readFloat(); } else if (clz == double.class || clz == Double.class){ t = readDouble(); } else if (clz == String.class ){ t = readString(); } else if (Serializer.class.isAssignableFrom(clz)){ try { byte hasObject = this.readBuffer.readByte(); if(hasObject == 1){ Serializer temp = (Serializer)clz.newInstance(); temp.readFromBuffer(this.readBuffer); t = temp; }else{ t = null; } } catch (Exception e) { e.printStackTrace(); } } else { throw new RuntimeException(String.format("不支持類型:[%s]", clz)); } return (I) t; } public Serializer writeByte(Byte value) { writeBuffer.writeByte(value); return this; } public Serializer writeShort(Short value) { writeBuffer.writeShort(value); return this; } public Serializer writeInt(Integer value) { writeBuffer.writeInt(value); return this; } public Serializer writeLong(Long value) { writeBuffer.writeLong(value); return this; } public Serializer writeFloat(Float value) { writeBuffer.writeFloat(value); return this; } public Serializer writeDouble(Double value) { writeBuffer.writeDouble(value); return this; } public <T> Serializer writeList(List<T> list) { if (isEmpty(list)) { writeBuffer.writeShort((short) 0); return this; } writeBuffer.writeShort((short) list.size()); for (T item : list) { writeObject(item); } return this; } public <K,V> Serializer writeMap(Map<K, V> map) { if (isEmpty(map)) { writeBuffer.writeShort((short) 0); return this; } writeBuffer.writeShort((short) map.size()); for (Entry<K, V> entry : map.entrySet()) { writeObject(entry.getKey()); writeObject(entry.getValue()); } return this; } public Serializer writeString(String value) { if (value == null || value.isEmpty()) { writeShort((short) 0); return this; } byte data[] = value.getBytes(CHARSET); short len = (short) data.length; writeBuffer.writeShort(len); writeBuffer.writeBytes(data); return this; } public Serializer writeObject(Object object) { if(object == null){ writeByte((byte)0); }else{ if (object instanceof Integer) { writeInt((int) object); return this; } if (object instanceof Long) { writeLong((long) object); return this; } if (object instanceof Short) { writeShort((short) object); return this; } if (object instanceof Byte) { writeByte((byte) object); return this; } if (object instanceof String) { String value = (String) object; writeString(value); return this; } if (object instanceof Serializer) { writeByte((byte)1); Serializer value = (Serializer) object; value.writeToTargetBuff(writeBuffer); return this; } throw new RuntimeException("不可序列化的類型:" + object.getClass()); } return this; } private <T> boolean isEmpty(Collection<T> c) { return c == null || c.size() == 0; } public <K,V> boolean isEmpty(Map<K,V> c) { return c == null || c.size() == 0; } }
