私有協議介紹
通信協議從廣義上區分,可以分為共有協議和私有協議。由於私有協議的靈活性,它往往會在某個公司或者組織內部使用,按需定制,也因為如此,升級起來會非常方便,靈活性好。
Netty私有協議棧功能設計
Netty協議棧用於內部各模塊之間的通信,它基於TCP/IP協議棧,是一個類似HTTP協議的應用層協議棧,相比與傳統的標准協議棧,它更加輕巧、靈活和實用
1.功能描述
1)基於Netty的NIO通信框架,提供高性能的異步通信能力。
2)提供消息的編解碼框架,可以實現POJO的序列化和反序列化。
3)提供基於IP地址的白名單接入認證機制。
4)鏈路的有效性校驗機制
5)鏈路的斷連重連機制。
2.通信模型
3.消息定義
Netty消息定義表(NettyMessage)
名稱 |
類型 |
長度 |
描述 |
Header |
Header |
變長 |
消息頭 |
Body |
Object |
變長 |
對於請求消息,它是方法的參數 對於響應消息,它是返回值 |
Netty協議消息頭定義(Header)
名稱 |
類型 |
長度 |
描述 |
crcCode |
整型 int |
32 |
Netty消息的校驗碼,它由三部分組成 1)0xABEF:固定值,表明該消息是Netty私有協議,2個字節 2)主版本號:1~255,一個字節 3)此版本號:1~255,一個字節 crcCode=0xABEF+主版本號+次版本號 |
length |
整型 int |
32 |
對於請求消息,它是方法的參數 對於響應消息,它是返回值 |
sessionID |
長整型long |
64 |
集群節點全局唯一,由回話ID生成器生成 |
type |
Byte |
8 |
0:業務請求消息 1:業務響應消息 2:業務ONE WAY消息 3:握手請求消息 4:握手應答消息 5:心跳請求消息 6:心跳應答消息 |
priority |
Byte |
8 |
消息優先級 |
attachment |
Map<String,Object> |
變長 |
擴展消息 |
代碼實現
package com.cw.netty.high.protocol.bean; import java.util.HashMap; import java.util.Map; /** * 消息頭header定義 * * @author chenwei * @create 2018-07-02 11:20 **/ public class Header { private int crcCode=0xabef0101; private int length;//消息長度 private long sessionID;//會話ID private byte type;//消息類型 private byte priority;//消息優先級 private Map<String, Object> attachment = new HashMap<String, Object>();//附件 public int getCrcCode() { return crcCode; } public void setCrcCode(int crcCode) { this.crcCode = crcCode; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public long getSessionID() { return sessionID; } public void setSessionID(long sessionID) { this.sessionID = sessionID; } public byte getType() { return type; } public void setType(byte type) { this.type = type; } public byte getPriority() { return priority; } public void setPriority(byte priority) { this.priority = priority; } public Map<String, Object> getAttachment() { return attachment; } public void setAttachment(Map<String, Object> attachment) { this.attachment = attachment; } @Override public String toString() { return "Header{" + "crcCode=" + crcCode + ", length=" + length + ", sessionID=" + sessionID + ", type=" + type + ", priority=" + priority + ", attachment=" + attachment + '}'; } }
package com.cw.netty.high.protocol; import com.cw.netty.high.protocol.bean.Header; /** * netty消息類定義 * * @author chenwei * @create 2018-07-02 11:19 **/ public class NettyMessage { private Header header;//消息頭 private Object body;//消息體 public Header getHeader() { return header; } public void setHeader(Header header) { this.header = header; } public Object getBody() { return body; } public void setBody(Object body) { this.body = body; } @Override public String toString() { return "NettyMessage{" + "header=" + header + ", body=" + body + '}'; } }
package com.cw.netty.high.protocol; /** * @author chenwei * @create 2018-07-02 17:05 **/ public class MessageType { // public static final byte LOGIN_R2=0;//業務請求消息 // public static final byte LOGIN_1=1;//業務響應消息 // public static final byte LOGIN_2=2;//業務ONE WAY 消息 public static final byte LOGIN_REQ=3;//握手請求消息 public static final byte LOGIN_RESP=4;//握手應答消息 public static final byte HEARTBEAT_REQ=5;//心跳請求消息 public static final byte HEARTBEAT_RESP=6;//心跳應答消息 }
package com.cw.netty.high.protocol; import com.cw.netty.high.protocol.bean.MarshallingCodecFactory; import com.cw.netty.high.protocol.bean.NettyMarshallingEncoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import java.util.List; import java.util.Map; /** * 消息編碼器 * * @author chenwei * @create 2018-07-02 11:32 **/ public class NettyMessageEncoder extends MessageToMessageEncoder<NettyMessage> { private NettyMarshallingEncoder marshallingEncoder; public NettyMessageEncoder(){ marshallingEncoder= MarshallingCodecFactory.buildMarshallingEncoder(); } @Override protected void encode(ChannelHandlerContext channelHandlerContext, NettyMessage nettyMessage, List<Object> out) throws Exception{ if (nettyMessage == null || nettyMessage.getHeader()==null) { throw new Exception("The encode message is null"); } ByteBuf buffer = Unpooled.buffer(); System.out.println("開始編碼:"+nettyMessage); //按順利編碼后,根據定義的字段數據類型寫入ByteBuf,解碼時也要按順序挨個取出 buffer.writeInt(nettyMessage.getHeader().getCrcCode()); buffer.writeInt(nettyMessage.getHeader().getLength()); buffer.writeLong(nettyMessage.getHeader().getSessionID()); buffer.writeByte(nettyMessage.getHeader().getType()); buffer.writeByte(nettyMessage.getHeader().getPriority()); buffer.writeInt(nettyMessage.getHeader().getAttachment().size()); String key=null; Object value=null; byte[] keyArray=null; //針對header中的附件編碼 for (Map.Entry<String,Object> param : nettyMessage.getHeader().getAttachment().entrySet()) { key=param.getKey(); keyArray= key.getBytes("UTF-8"); value= param.getValue(); buffer.writeInt(keyArray.length); buffer.writeBytes(keyArray); marshallingEncoder.encode(channelHandlerContext,value,buffer); } if (nettyMessage.getBody() != null) { //使用MarshallingEncoder編碼消息體 marshallingEncoder.encode(channelHandlerContext,nettyMessage.getBody(),buffer); }else { //沒有消息體的話,就賦予0值 buffer.writeInt(0); } //更新消息長度字段的值,至於為什么-8,是因為8是長度字段后的偏移量,LengthFieldBasedFrameDecoder的源碼中 //對長度字段和長度的偏移量之和做了判斷,如果不-8,會導致LengthFieldBasedFrameDecoder解碼返回null //這是 《Netty權威指南》中的寫錯的地方 buffer.setInt(4, buffer.readableBytes()-8); //書中此處沒有add,也即沒有將ByteBuf加入到List中,也就沒有消息進行編碼了,所以導致運行了沒有效果…… out.add(buffer); } }
package com.cw.netty.high.protocol; import com.cw.netty.high.protocol.bean.Header; import com.cw.netty.high.protocol.bean.MarshallingCodecFactory; import com.cw.netty.high.protocol.bean.NettyMarshallingDecoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.util.HashMap; import java.util.Map; /** * Netty消息解碼類 * @author chenwei * @create 2018-07-02 16:31 * * 繼承LengthFieldBasedFrameDecoder是為了更好了使用它對tcp的粘包和半包處理, * 只需要給我表示消息長度的字段偏移量和消息長度自身所占的字節數,該解碼器就能 * 自動實現對半包的處理,調用父類LengthFieldBasedFrameDecoder的decode方法后, * 返回的就是整包消息或者為null, **/ public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder { NettyMarshallingDecoder marshallingDecoder; public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength); marshallingDecoder = MarshallingCodecFactory.buildMarshallingDecoder(); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in1) throws Exception { //父類解碼后的消息,后續就針對處理后的消息體進行解碼,這也是 //《Netty權威指南》的另一處書寫錯誤,書中仍對原ByteBuf進行讀取 //由於父類decode后,讀指針已經到達了消息頭總長度處,此后再對原消息 //進行讀取后報處下標越界的異常 ByteBuf frame=(ByteBuf) super.decode(ctx, in1); if (frame == null) { return null; } NettyMessage message =new NettyMessage(); Header header=new Header(); header.setCrcCode(frame.readInt()); header.setLength(frame.readInt()); header.setSessionID(frame.readLong()); header.setType(frame.readByte()); header.setPriority(frame.readByte()); int size= frame.readInt(); if (size > 0) { int keySize=0; byte[] keyArray=null; String key=null; Map<String, Object> attch = new HashMap<String, Object>(); for (int i = 0; i <size ; i++) { keySize= frame.readInt(); keyArray = new byte[keySize]; frame.readBytes(keyArray); key = new String(keyArray, "UTF-8"); attch.put(key, marshallingDecoder.decode(ctx,frame)); } header.setAttachment(attch); } //readableBytes即為判斷剩余可讀取的字節數( this.writerIndex - this.readerIndex) //大於4說明有消息體(無消息體時readableBytes=4),故進行解碼 if (frame.readableBytes() > 4) { message.setBody(marshallingDecoder.decode(ctx, frame)); } message.setHeader(header); return message; } }
package com.cw.netty.high.protocol; import com.cw.netty.high.protocol.bean.Header; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * @author chenwei * @create 2018-07-02 17:02 **/ public class LoginAuthReqHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //建立連接后,發送認證消息 NettyMessage message=buildLoginReq(); System.out.println("client 發送 認證消息:message="+message); ctx.writeAndFlush(message); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message=(NettyMessage)msg; //若是握手應答消息,判斷是否認證成功 if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP) { byte loginResult= (byte)message.getBody(); if (loginResult != 0) { //握手失敗,關閉連接 ctx.close(); }else { System.out.println("login is ok :"+message); ctx.fireChannelRead(msg); } }else{ ctx.fireChannelRead(msg); } } private NettyMessage buildLoginReq(){ NettyMessage message=new NettyMessage(); Header header=new Header(); header.setType((byte) MessageType.LOGIN_REQ); message.setHeader(header); return message; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
package com.cw.netty.high.protocol; import com.cw.netty.high.protocol.bean.Header; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author chenwei * @create 2018-07-02 17:19 **/ public class LoginAuthRespHandler extends ChannelHandlerAdapter { private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>(); private String [] whiteList={"127.0.0.1","10.155.33.113"}; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message=(NettyMessage)msg; //若為握手認證消息,則校驗並返回響應,否則傳遞到下一個handler if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_REQ) { String nodeIndex= ctx.channel().remoteAddress().toString(); NettyMessage loginResp=null; if (nodeCheck.containsKey(nodeIndex)) { //重復登陸,拒絕 loginResp=buildResponse((byte)-1); System.out.println("重復登陸,拒絕 :ip="+nodeIndex); }else{ boolean isOk=true; InetSocketAddress address=(InetSocketAddress) ctx.channel().remoteAddress(); String ip= address.getAddress().getHostAddress(); for (String wip:whiteList){ if (wip.equals(ip)) { isOk=true; nodeCheck.put(ip, true); System.out.println("通過白名單檢測 ip="+ip); break; } } loginResp=isOk?buildResponse((byte) 0):buildResponse((byte)-1); } ctx.writeAndFlush(loginResp); }else { ctx.fireChannelRead(msg); } } private NettyMessage buildResponse(byte result){ NettyMessage message=new NettyMessage(); Header header=new Header(); header.setType(MessageType.LOGIN_RESP); message.setHeader(header); message.setBody(result); return message; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { nodeCheck.remove(ctx.channel().remoteAddress().toString()); ctx.close(); ctx.fireExceptionCaught(cause); } }
package com.cw.netty.high.protocol; import com.cw.netty.high.protocol.bean.Header; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * 客戶端心跳檢測 * @author chenwei * @create 2018-07-02 17:36 **/ public class HeartBeatReqHandler extends ChannelHandlerAdapter { private volatile ScheduledFuture heartBeaet; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message= (NettyMessage)msg; //認證成功后,定時發送心跳檢測 if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP) { heartBeaet=ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS); } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP) { System.out.println("client receive server heart beat message :-->"+message); }else { ctx.fireChannelRead(msg); } } private class HeartBeatTask implements Runnable{ ChannelHandlerContext ctx; public HeartBeatTask(ChannelHandlerContext ctx) { this.ctx=ctx; } @Override public void run() { NettyMessage nettyMessage=buildHeatBeat(); System.out.println("client send heart beat message to server :--->"+nettyMessage); ctx.writeAndFlush(nettyMessage); } } //心跳檢測僅消息頭就夠了 private NettyMessage buildHeatBeat(){ NettyMessage message=new NettyMessage(); Header header=new Header(); header.setType(MessageType.HEARTBEAT_REQ); message.setHeader(header); return message; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (heartBeaet != null) { heartBeaet.cancel(true); heartBeaet=null; } ctx.fireExceptionCaught(cause); } }
package com.cw.netty.high.protocol; import com.cw.netty.high.protocol.bean.Header; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * 服務器端心跳handler * @author chenwei * @create 2018-07-02 18:00 **/ public class HeartBeatRespHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message=(NettyMessage)msg; //收到心跳消息后,構造心跳應答消息返回 if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ) { System.out.println("receive client heart beat message :-->"); NettyMessage heartBeat=buildHeatBeat(); System.out.println("send heart beat response message to client:-->"+heartBeat); ctx.writeAndFlush(heartBeat); }else { ctx.fireChannelRead(msg); } } private NettyMessage buildHeatBeat(){ NettyMessage message=new NettyMessage(); Header header=new Header(); header.setType(MessageType.HEARTBEAT_RESP); message.setHeader(header); return message; } }
package com.cw.netty.high.protocol.bean; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; /** * @author chenwei * @create 2018-07-02 16:12 **/ public class NettyMarshallingDecoder extends MarshallingDecoder { public NettyMarshallingDecoder(UnmarshallerProvider provider) { super(provider); } public NettyMarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) { super(provider, maxObjectSize); } public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception{ return super.decode(ctx,in); } }
package com.cw.netty.high.protocol.bean; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingEncoder; /** * @author chenwei * @create 2018-07-02 16:15 **/ public class NettyMarshallingEncoder extends MarshallingEncoder { public NettyMarshallingEncoder(MarshallerProvider provider) { super(provider); } @Override public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { super.encode(ctx, msg, out); } }
package com.cw.netty.high.protocol.bean; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * @author chenwei * @create 2018-07-02 14:15 **/ public class MarshallingCodecFactory { public static NettyMarshallingDecoder buildMarshallingDecoder(){ final MarshallerFactory factory= Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration=new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration); NettyMarshallingDecoder decoder = new NettyMarshallingDecoder(provider, 1024); return decoder; } public static NettyMarshallingEncoder buildMarshallingEncoder(){ final MarshallerFactory factory= Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration=new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration); NettyMarshallingEncoder encoder = new NettyMarshallingEncoder(provider); return encoder; } }
Netty客戶端:
package com.cw.netty.high.protocol; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author chenwei * @create 2018-07-02 18:15 **/ public class NettyClient { private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); public void connect(final int port, final String host)throws Exception{ try { Bootstrap boot=new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); boot.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new NettyMessageDecoder(1024*1024,4,4)); channel.pipeline().addLast("messageEncoder",new NettyMessageEncoder()); //添加超時處理handler,規定時間內沒有收到消息則關閉鏈路 channel.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); channel.pipeline().addLast("loginAuthHandler", new LoginAuthReqHandler()); channel.pipeline().addLast("heartbeatHandler", new HeartBeatReqHandler()); } }); ChannelFuture future = boot.connect(new InetSocketAddress(host, port),new InetSocketAddress("127.0.0.1",8889)).sync(); System.out.println("client is start……"); future.channel().closeFuture().sync(); }finally { //釋放完畢后,清空資源,再次發起重連操作 executorService.execute(new Runnable() { @Override public void run() { try { System.out.println("重連……"); TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } try { connect(port,host); } catch (Exception e) { e.printStackTrace(); } } }); } } public static void main(String[] args) throws Exception { new NettyClient().connect(8000,"127.0.0.1"); } }
服務端:
package com.cw.netty.high.protocol; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; /** * @author chenwei * @create 2018-07-02 18:36 **/ public class NettyServer { public void bind() throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap=new ServerBootstrap(); bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyMessageDecoder(1024*1024,4,4)); socketChannel.pipeline().addLast(new NettyMessageEncoder()); socketChannel.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50)); socketChannel.pipeline().addLast(new LoginAuthRespHandler()); socketChannel.pipeline().addLast("HeartBeathandler",new HeartBeatRespHandler()); } }); bootstrap.bind("127.0.0.1",8000).sync(); System.out.println("netty server start ok:"+("127.0.0.1"+8000)); } public static void main(String[] args) throws Exception { new NettyServer().bind(); } }