通信協議從廣義上區分,可以分為公有協議和私有協議。由於私有協議的靈活性,它往往會在某個公司或者組織內部使用,按需定制,也因為如此,升級起來會非常方便,靈活性好。絕大多數的私有協議傳輸層都基於TCP/IP,所以利用Netty的NIO TCP協議棧可以非常方便地進行私有協議的定制和開發。
私有協議介紹
私有協議本質上是廠商內部發展和采用的標准,除非授權,其他廠商一般無權使用該協議。私有協議也稱非標准協議,就是未經國際或國家標准化組織采納或批准,由某個企業自己制訂,協議實現細節不願公開,只在企業自己生產的設備之間使用的協議。私有協議具有封閉性、壟斷性、排他性等特點。如果網上大量存在私有(非標准)協議,現行網絡或用戶一旦使用了它,后進入的廠家設備就必須跟着使用這種非標准協議,才能夠互連互通,否則根本不可能進入現行網絡。這樣,使用非標准協議的廠家就實現了壟斷市場的願望。
在傳統的Java應用中,通常使用以下4種方式進行跨節點通信。
(1)通過RMI進行遠程服務調用;
(2)通過Java的Socket+Java序列化的方式進行跨節點調用;
(3)利用一些開源的RPC框架進行遠程服務調用,例如Facebook的Thrift,Apache的Avro等;
(4)利用標准的公有協議進行跨節點服務調用,例如HTTP+XML、RESTful+JSON或者WebService。
跨節點的遠程服務調用,除了鏈路層的物理連接外,還需要對請求和響應消息進行編解碼。在請求和應答消息本身以外,也需要攜帶一些其他控制和管理類指令,例如鏈路建立的握手請求和響應消息、鏈路檢測的心跳消息等。當這些功能組合到一起之后,就會形成私有協議。
事實上,私有協議並沒有標准的定義,只要是能夠用於跨進程、跨主機數據交換的非標准協議,都可以稱為私有協議。通常情況下,正規的私有協議都有具體的協議規范文檔,類似於《XXXX協議VXX規范》,但是在實際的項目中,內部使用的私有協議往往是口頭約定的規范,由於並不需要對外呈現或者被外部調用,所以一般不會單獨寫相關的內部私有協議規范文檔。
Netty協議棧功能設計
使用Netty提供的異步TCP協議棧開發一個私有協議棧,該協議棧被命名為Netty協議棧。Netty協議棧用於內部各模塊之間的通信,它基於TCP/IP協議棧,是一個類HTTP協議的應用層協議棧,相比於傳統的標准協議棧,它更加輕巧、靈活和實用。
網絡拓撲圖
在分布式組網環境下,每個Netty節點(Netty進程)之間建立長連接,使用Netty協議進行通信。Netty節點並沒有服務端和客戶端的區分,誰首先發起連接,誰就作為客戶端,另一方自然就成為服務端。一個Netty節點既可以作為客戶端連接另外的Netty節點,也可以作為Netty服務端被其他Netty節點連接,這完全取決於使用者的業務場景和具體的業務組網。

協議棧功能描述
Netty協議棧承載了業務內部各模塊之間的消息交互和服務調用,它的主要功能如下。
(1)基於Netty的NIO通信框架,提供高性能的異步通信能力;
(2)提供消息的編解碼框架,可以實現POJO的序列化和反序列化;
(3)提供基於IP地址的白名單接入認證機制;
(4)鏈路的有效性校驗機制;
(5)鏈路的斷連重連機制。
通信模型

(1)Netty協議棧客戶端發送握手請求消息,攜帶節點ID等有效身份認證信息;
(2)Netty協議棧服務端對握手請求消息進行合法性校驗,包括節點ID有效性校驗、節點重復登錄校驗和IP地址合法性校驗,校驗通過后,返回登錄成功的握手應答消息;
(3)鏈路建立成功之后,客戶端發送業務消息;
(4)鏈路成功之后,服務端發送心跳消息;
(5)鏈路建立成功之后,客戶端發送心跳消息;
(6)鏈路建立成功之后,服務端發送業務消息;
(7)服務端退出時,服務端關閉連接,客戶端感知對方關閉連接后,被動關閉客戶端連接。
備注:需要指出的是,Netty協議通信雙方鏈路建立成功之后,雙方可以進行全雙工通信,無論客戶端還是服務端,都可以主動發送請求消息給對方,通信方式可以是TWO WAY或者ONE WAY。雙方之間的心跳采用Ping-Pong機制,當鏈路處於空閑狀態時,客戶端主動發送Ping消息給服務端,服務端接收到Ping消息后發送應答消息Pong給客戶端,如果客戶端連續發送N條Ping消息都沒有接收到服務端返回的Pong消息,說明鏈路已經掛死或者對方處於異常狀態,客戶端主動關閉連接,間隔周期T后發起重連操作,直到重連成功。
消息定義
Netty協議棧消息定義包含兩部分:
- 消息頭;
- 消息體。



Netty協議支持的字段類型

Netty協議的編解碼規范
1.Netty協議的編碼
Netty協議NettyMessage的編碼規范如下:
(1)crcCode:java.nio.ByteBuffer.putInt(int value),如果采用其他緩沖區實現,必須與其等價;
(2)length:java.nio.ByteBuffer.putInt(int value),如果采用其他緩沖區實現,必須與其等價;
(3)sessionID:java.nio.ByteBuffer.putLong(long value),如果采用其他緩沖區實現,必須與其等價;
(4)type: java.nio.ByteBuffer.put(byte b),如果采用其他緩沖區實現,必須與其等價;
(5)priority:java.nio.ByteBuffer.put(byte b),如果采用其他緩沖區實現,必須與其等價;
(6)attachment:它的編碼規則為——如果attachment長度為0,表示沒有可選附件,則將長度編碼設為0,java.nio.ByteBuffer.putInt(0);如果大於0,說明有附件需要編碼,具體的編碼規則如下:首先對附件的個數進行編碼,java.nio.ByteBuffer.putInt(attachment.size());然后對Key進行編碼,再將它轉換成byte數組之后編碼內容.
(7)body的編碼:通過JBoss Marshalling將其序列化為byte數組,然后調用java.nio.ByteBuffer.put(byte [] src)將其寫入ByteBuffer緩沖區中。
由於整個消息的長度必須等全部字段都編碼完成之后才能確認,所以最后需要更新消息頭中的length字段,將其重新寫入ByteBuffer中。
2.Netty協議的解碼
相對於NettyMessage的編碼,仍舊以java.nio.ByteBuffer為例,給出Netty協議的解碼規范。
(1)crcCode:通過java.nio.ByteBuffer.getInt()獲取校驗碼字段,其他緩沖區需要與其等價;
(2)length:通過java.nio.ByteBuffer.getInt()獲取Netty消息的長度,其他緩沖區需要與其等價;
(3)sessionID:通過java.nio.ByteBuffer.getLong()獲取會話ID,其他緩沖區需要與其等價;
(4)type:通過java.nio.ByteBuffer.get()獲取消息類型,其他緩沖區需要與其等價;
(5)priority:通過java.nio.ByteBuffer.get()獲取消息優先級,其他緩沖區需要與其等價;
(6)attachment:它的解碼規則為——首先創建一個新的attachment對象,調用java.nio.ByteBuffer.getInt()獲取附件的長度,如果為0,說明附件為空,解碼結束,繼續解消息體;如果非空,則根據長度通過for循環進行解碼。
(7)body:通過JBoss的marshaller對其進行解碼。
鏈路的建立
Netty協議棧支持服務端和客戶端,對於使用Netty協議棧的應用程序而言,不需要刻意區分到底是客戶端還是服務端,在分布式組網環境中,一個節點可能既是服務端也是客戶端,這個依據具體的用戶場景而定。
Netty協議棧對客戶端的說明如下:如果A節點需要調用B節點的服務,但是A和B之間還沒有建立物理鏈路,則由調用方主動發起連接,此時,調用方為客戶端,被調用方為服務端。
考慮到安全,鏈路建立需要通過基於IP地址或者號段的黑白名單安全認證機制,作為樣例,本協議使用基於IP地址的安全認證,如果有多個IP,通過逗號進行分割。在實際商用項目中,安全認證機制會更加嚴格,例如通過密鑰對用戶名和密碼進行安全認證。
客戶端與服務端鏈路建立成功之后,由客戶端發送握手請求消息,握手請求消息的定義如下。
(1)消息頭的type字段值為3;
(2)可選附件為個數為0;
(3)消息體為空;
(4)握手消息的長度為22個字節。
服務端接收到客戶端的握手請求消息之后,如果IP校驗通過,返回握手成功應答消息給客戶端,應用層鏈路建立成功。握手應答消息定義如下。
(1)消息頭的type字段值為4;
(2)可選附件個數為0;
(3)消息體為byte類型的結果,0:認證成功;-1:認證失敗。
鏈路建立成功之后,客戶端和服務端就可以互相發送業務消息了。
鏈路的關閉
由於采用長連接通信,在正常的業務運行期間,雙方通過心跳和業務消息維持鏈路,任何一方都不需要主動關閉連接。
但是,在以下情況下,客戶端和服務端需要關閉連接。
(1)當對方宕機或者重啟時,會主動關閉鏈路,另一方讀取到操作系統的通知信號,得知對方REST鏈路,需要關閉連接,釋放自身的句柄等資源。由於采用TCP全雙工通信,通信雙方都需要關閉連接,釋放資源;
(2)消息讀寫過程中,發生了I/O異常,需要主動關閉連接;
(3)心跳消息讀寫過程中發生了I/O異常,需要主動關閉連接;
(4)心跳超時,需要主動關閉連接;
(5)發生編碼異常等不可恢復錯誤時,需要主動關閉連接。
可靠性設計
Netty協議棧可能會運行在非常惡劣的網絡環境中,網絡超時、閃斷、對方進程僵死或者處理緩慢等情況都有可能發生。為了保證在這些極端異常場景下Netty協議棧仍能夠正常工作或者自動恢復,需要對它的可靠性進行統一規划和設計。
1.心跳機制
在凌晨等業務低谷期時段,如果發生網絡閃斷、連接被Hang住等網絡問題時,由於沒有業務消息,應用進程很難發現。到了白天業務高峰期時,會發生大量的網絡通信失敗,嚴重的會導致一段時間進程內無法處理業務消息。為了解決這個問題,在網絡空閑時采用心跳機制來檢測鏈路的互通性,一旦發現網絡故障,立即關閉鏈路,主動重連。
具體的設計思路如下。
(1)當網絡處於空閑狀態持續時間達到T(連續周期T沒有讀寫消息)時,客戶端主動發送Ping心跳消息給服務端;
(2)如果在下一個周期T到來時客戶端沒有收到對方發送的Pong心跳應答消息或者讀取到服務端發送的其他業務消息,則心跳失敗計數器加1;
(3)每當客戶端接收到服務的業務消息或者Pong應答消息,將心跳失敗計數器清零;當連續N次沒有接收到服務端的Pong消息或者業務消息,則關閉鏈路,間隔INTERVAL時間后發起重連操作;
(4)服務端網絡空閑狀態持續時間達到T后,服務端將心跳失敗計數器加1;只要接收到客戶端發送的Ping消息或者其他業務消息,計數器清零;
(5)服務端連續N次沒有接收到客戶端的Ping消息或者其他業務消息,則關閉鏈路,釋放資源,等待客戶端重連。
通過Ping-Pong雙向心跳機制,可以保證無論通信哪一方出現網絡故障,都能被及時地檢測出來。為了防止由於對方短時間內繁忙沒有及時返回應答造成的誤判,只有連續N次心跳檢測都失敗才認定鏈路已經損害,需要關閉鏈路並重建鏈路。
當讀或者寫心跳消息發生I/O異常的時候,說明鏈路已經中斷,此時需要立即關閉鏈路,如果是客戶端,需要重新發起連接。如果是服務端,需要清空緩存的半包信息,等待客戶端重連。
2.重連機制
如果鏈路中斷,等待INTERVAL時間后,由客戶端發起重連操作,如果重連失敗,間隔周期INTERVAL后再次發起重連,直到重連成功。
為了保證服務端能夠有充足的時間釋放句柄資源,在首次斷連時客戶端需要等待INTERVAL時間之后再發起重連,而不是失敗后就立即重連。
為了保證句柄資源能夠及時釋放,無論什么場景下的重連失敗,客戶端都必須保證自身的資源被及時釋放,包括但不限於SocketChannel、Socket等。
重連失敗后,需要打印異常堆棧信息,方便后續的問題定位。
3.重復登錄保護
當客戶端握手成功之后,在鏈路處於正常狀態下,不允許客戶端重復登錄,以防止客戶端在異常狀態下反復重連導致句柄資源被耗盡。
服務端接收到客戶端的握手請求消息之后,首先對IP地址進行合法性檢驗,如果校驗成功,在緩存的地址表中查看客戶端是否已經登錄,如果已經登錄,則拒絕重復登錄,返回錯誤碼-1,同時關閉TCP鏈路,並在服務端的日志中打印握手失敗的原因。
客戶端接收到握手失敗的應答消息之后,關閉客戶端的TCP連接,等待INTERVAL時間之后,再次發起TCP連接,直到認證成功。
為了防止由服務端和客戶端對鏈路狀態理解不一致導致的客戶端無法握手成功的問題,當服務端連續N次心跳超時之后需要主動關閉鏈路,清空該客戶端的地址緩存信息,以保證后續該客戶端可以重連成功,防止被重復登錄保護機制拒絕掉。
4.消息緩存重發
無論客戶端還是服務端,當發生鏈路中斷之后,在鏈路恢復之前,緩存在消息隊列中待發送的消息不能丟失,等鏈路恢復之后,重新發送這些消息,保證鏈路中斷期間消息不丟失。
考慮到內存溢出的風險,建議消息緩存隊列設置上限,當達到上限之后,應該拒絕繼續向該隊列添加新的消息。
安全性設計
為了保證整個集群環境的安全,內部長連接采用基於IP地址的安全認證機制,服務端對握手請求消息的IP地址進行合法性校驗:如果在白名單之內,則校驗通過;否則,拒絕對方連接。
如果將Netty協議棧放到公網中使用,需要采用更加嚴格的安全認證機制,例如基於密鑰和AES加密的用戶名+密碼認證機制,也可以采用SSL/TSL安全傳輸。
作為示例程序,Netty協議棧采用最簡單的基於IP地址的白名單安全認證機制。
可擴展性設計
Netty協議需要具備一定的擴展能力,業務可以在消息頭中自定義業務域字段,例如消息流水號、業務自定義消息頭等。通過Netty消息頭中的可選附件attachment字段,業務可以方便地進行自定義擴展。
Netty協議棧架構需要具備一定的擴展能力,例如統一的消息攔截、接口日志、安全、加解密等可以被方便地添加和刪除,不需要修改之前的邏輯代碼,類似Servlet的FilterChain和AOP,但考慮到性能因素,不推薦通過AOP來實現功能的擴展。
Netty協議棧開發
數據結構定義
import lombok.Data; @Data public final class NettyMessage { private Header header; //消息頭 private Object body;//消息體 @Override public String toString() { return "NettyMessage [header=" + header + "]"; } } import java.util.HashMap; import java.util.Map;
@Data public final class Header { private int crcCode = 0xabef0101; private int length;// 消息長度 private long sessionID;// 會話ID private byte type;// 消息類型 private byte priority;// 消息優先級 private Map attachment = new HashMap(); // 附件 @Override public String toString() { return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionID=" + sessionID + ", type=" + type + ", priority=" + priority + ", attachment=" + attachment + "]"; } }
消息編解碼
import io.netty.buffer.ByteBuf; import org.jboss.marshalling.*; import java.io.IOException; public class MarshallingDecoder { private final Unmarshaller unmarshaller; public MarshallingDecoder() throws IOException { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); unmarshaller = marshallerFactory.createUnmarshaller(configuration); } protected Object decode(ByteBuf in) throws Exception { int objectSize = in.readInt(); ByteBuf buf = in.slice(in.readerIndex(), objectSize); ByteInput input = new ChannelBufferByteInput(buf); try { unmarshaller.start(input); Object obj = unmarshaller.readObject(); unmarshaller.finish(); in.readerIndex(in.readerIndex() + objectSize); return obj; } finally { unmarshaller.close(); } } } import io.netty.buffer.ByteBuf; import org.jboss.marshalling.Marshaller; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; import java.io.IOException; public class MarshallingEncoder { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; Marshaller marshaller; public MarshallingEncoder() throws IOException { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); marshaller = marshallerFactory.createMarshaller(configuration); } protected void encode(Object msg, ByteBuf out) throws Exception { try { int lengthPos = out.writerIndex(); out.writeBytes(LENGTH_PLACEHOLDER); ChannelBufferByteOutput output = new ChannelBufferByteOutput(out); marshaller.start(output); marshaller.writeObject(msg); marshaller.finish(); out.setInt(lengthPos, out.writerIndex() - lengthPos - 4); } finally { marshaller.close(); } } } import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.io.IOException; import java.util.HashMap; import java.util.Map; //Netty的LengthFieldBasedFrameDecoder解碼器,它支持自動的TCP粘包和半包處理, //只需要給出標識消息長度的字段偏移量和消息長度自身所占的字節數,Netty就能自動實現對半包的處理。 public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder { MarshallingDecoder marshallingDecoder; public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset,int lengthFieldLength) throws IOException { super(maxFrameLength, lengthFieldOffset, lengthFieldLength,-8,0); marshallingDecoder = new MarshallingDecoder(); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in)throws Exception { //對於業務解碼器來說,調用父類LengthFieldBasedFrameDecoder的解碼方法后,返回的就是整包消息或者為空, //如果為空說明是個半包消息,直接返回繼續由I/O線程讀取后續的碼流 ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (frame == null) { return null; } int pre = in.readerIndex(); in.readerIndex(0); NettyMessage message = new NettyMessage(); Header header = new Header(); header.setCrcCode(in.readInt()); header.setLength(in.readInt()); header.setSessionID(in.readLong()); header.setType(in.readByte()); header.setPriority(in.readByte()); int size = in.readInt(); if (size > 0) { Map<String,Object> attch = new HashMap<String,Object>(size); int keySize = 0; byte[] keyArray = null; String key = null; for (int i = 0; i < size; i++) { keySize = in.readInt(); keyArray = new byte[keySize]; in.readBytes(keyArray); key = new String(keyArray, "UTF-8"); attch.put(key, marshallingDecoder.decode(in)); } keyArray = null; key = null; header.setAttachment(attch); } if (in.readableBytes() > 4) { message.setBody(marshallingDecoder.decode(in)); } in.readerIndex(pre); message.setHeader(header); return message; } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import java.io.IOException; import java.util.List; import java.util.Map; public final class NettyMessageEncoder extends MessageToMessageEncoder { MarshallingEncoder marshallingEncoder; public NettyMessageEncoder() throws IOException { this.marshallingEncoder = new MarshallingEncoder(); } @Override protected void encode(ChannelHandlerContext ctx, Object o, List out) throws Exception { NettyMessage msg = (NettyMessage) o; if (msg == null || msg.getHeader() == null) { throw new Exception("The encode message is null"); } ByteBuf sendBuf = Unpooled.buffer(); sendBuf.writeInt((msg.getHeader().getCrcCode())); sendBuf.writeInt((msg.getHeader().getLength())); sendBuf.writeLong((msg.getHeader().getSessionID())); sendBuf.writeByte((msg.getHeader().getType())); sendBuf.writeByte((msg.getHeader().getPriority())); sendBuf.writeInt((msg.getHeader().getAttachment().size())); String key = null; byte[] keyArray = null; Object value = null; for (Map.Entry param : msg.getHeader().getAttachment().entrySet()) { key = (String) param.getKey(); keyArray = key.getBytes("UTF-8"); sendBuf.writeInt(keyArray.length); sendBuf.writeBytes(keyArray); value = param.getValue(); marshallingEncoder.encode(value, sendBuf); } key = null; keyArray = null; value = null; if (msg.getBody() != null) { marshallingEncoder.encode(msg.getBody(), sendBuf); } else { sendBuf.writeInt(0); } sendBuf.setInt(4, sendBuf.readableBytes()); out.add(sendBuf); } }
握手和安全認證
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class LoginAuthReqHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //當客戶端跟服務端TCP三次握手成功之后,由客戶端構造握手請求消息發送給服務端 ctx.writeAndFlush(buildLoginReq()); } // 握手請求發送之后,按照協議規范,服務端需要返回握手應答消息。 @Override public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception { NettyMessage message = (NettyMessage) msg; // 如果是握手應答消息,需要判斷是否認證成功 //對握手應答消息進行處理,首先判斷消息是否是握手應答消息, if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) { byte loginResult = (Byte) message.getBody(); if (loginResult != (byte) 0) { // 如果是握手應答消息,則對應答結果進行判斷,如果非0,說明認證失敗,關閉鏈路,重新發起連接。 // 握手失敗,關閉連接 ctx.close(); } else { System.out.println("Login is ok : " + message); ctx.fireChannelRead(msg); } } else { // 如果不是,直接透傳給后面的ChannelHandler進行處理; ctx.fireChannelRead(msg); } } private NettyMessage buildLoginReq() { // 由於采用IP白名單認證機制,因此,不需要攜帶消息體,消息體為空,消息類型為3:握手請求消息。 NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.LOGIN_REQ.value()); message.setHeader(header); return message; } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } } import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class LoginAuthRespHandler extends ChannelHandlerAdapter { private Map nodeCheck = new ConcurrentHashMap(); private String[] whitekList = {"127.0.0.1","10.100.1.122"}; @Override public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception { NettyMessage message = (NettyMessage) msg; // 如果是握手請求消息,處理,其他消息透傳 if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_REQ.value()) { String nodeIndex = ctx.channel().remoteAddress().toString(); NettyMessage loginResp = null; // 重復登錄,拒絕 // 重復登錄保護 if (nodeCheck.containsKey(nodeIndex)) { loginResp = buildResponse((byte) -1); } else { //IP認證白名單列表 InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = address.getAddress().getHostAddress(); boolean isOK = false; for (String WIP : whitekList) { if (WIP.equals(ip)) { isOK = true; break; } } //通過buildResponse構造握手應答消息返回給客戶端 loginResp = isOK ? buildResponse((byte) 0) : buildResponse((byte) -1); if (isOK) { nodeCheck.put(nodeIndex, true); } } System.out.println("The login response is : " + loginResp + " body [" + loginResp.getBody() + "]"); ctx.writeAndFlush(loginResp); } else { ctx.fireChannelRead(msg); } } private NettyMessage buildResponse(byte result) { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.LOGIN_RESP.value()); message.setHeader(header); message.setBody(result); return message; } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception { //當發生異常關閉鏈路的時候,需要將客戶端的信息從登錄注冊表中去注冊,以保證后續客戶端可以重連成功。 nodeCheck.remove(ctx.channel().remoteAddress().toString());//刪除緩存 ctx.close(); ctx.fireExceptionCaught(cause); } }
心跳檢測機制
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; public class HeartBeatReqHandler extends ChannelHandlerAdapter { private volatile ScheduledFuture heartBeat; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 握手成功,主動發送心跳消息 //HeartBeatReqHandler接收到之后對消息進行判斷 if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) { //當握手成功之后,握手請求Handler會繼續將握手成功消息向下透傳 //如果是握手成功消息,則啟動無限循環定時器用於定期發送心跳消息。 //由於NioEventLoop是一個schedule,因此它支持定時器的執行。 // 心跳定時器的單位是毫秒,默認為5000,即每5秒發送一條心跳消息。 heartBeat = ctx.executor().scheduleAtFixedRate( new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS); } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP.value()) { //接收服務端發送的心跳應答消息,並打印客戶端接收和發送的心跳消息。 System.out.println("Client receive server heart beat message : ---> "+ message); } else { //當握手成功之后,握手請求Handler會繼續將握手成功消息向下透傳 ctx.fireChannelRead(msg); } } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { NettyMessage heatBeat = buildHeatBeat(); System.out.println("Client send heart beat messsage to server : ---> "+ heatBeat); ctx.writeAndFlush(heatBeat); } private NettyMessage buildHeatBeat() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_REQ.value()); message.setHeader(header); return message; } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } } import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class HeartBeatRespHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception { NettyMessage message = (NettyMessage) msg; // 返回心跳應答消息 if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ.value()) { System.out.println("Receive client heart beat message : ---> "+ message); NettyMessage heartBeat = buildHeatBeat(); System.out.println("Send heart beat response message to client : ---> "+ heartBeat); ctx.writeAndFlush(heartBeat); } else { ctx.fireChannelRead(msg); } } private NettyMessage buildHeatBeat() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_RESP.value()); message.setHeader(header); return message; } }
基礎類
public enum MessageType { LOGIN_REQ((byte)3), LOGIN_RESP((byte)4), HEARTBEAT_REQ((byte)5), HEARTBEAT_RESP((byte)6), ; public byte value; MessageType(byte v){ this.value = v; } public byte value(){ return value; } } import io.netty.buffer.ByteBuf; import org.jboss.marshalling.ByteInput; import java.io.IOException; public class ChannelBufferByteInput implements ByteInput { private final ByteBuf buffer; ChannelBufferByteInput(ByteBuf buffer) { this.buffer = buffer; } @Override public void close() throws IOException { } @Override public int available() throws IOException { return buffer.readableBytes(); } @Override public int read() throws IOException { if (buffer.isReadable()) { return buffer.readByte() & 0xff; } return -1; } @Override public int read(byte[] array) throws IOException { return read(array, 0, array.length); } @Override public int read(byte[] dst, int dstIndex, int length) throws IOException { int available = available(); if (available == 0) { return -1; } length = Math.min(available, length); buffer.readBytes(dst, dstIndex, length); return length; } @Override public long skip(long bytes) throws IOException { int readable = buffer.readableBytes(); if (readable < bytes) { bytes = readable; } buffer.readerIndex((int) (buffer.readerIndex() + bytes)); return bytes; } } import io.netty.buffer.ByteBuf; import org.jboss.marshalling.ByteOutput; import java.io.IOException; public class ChannelBufferByteOutput implements ByteOutput { private final ByteBuf buffer; ChannelBufferByteOutput(ByteBuf buffer) { this.buffer = buffer; } @Override public void close() throws IOException { // Nothing to do } @Override public void flush() throws IOException { // nothing to do } @Override public void write(int b) throws IOException { buffer.writeByte(b); } @Override public void write(byte[] bytes) throws IOException { buffer.writeBytes(bytes); } @Override public void write(byte[] bytes, int srcIndex, int length) throws IOException { buffer.writeBytes(bytes, srcIndex, length); } /** * Return the {@link ByteBuf} which contains the written content * */ ByteBuf getBuffer() { return buffer; } }import io.netty.buffer.ByteBuf; import org.jboss.marshalling.ByteOutput; import java.io.IOException; public class ChannelBufferByteOutput implements ByteOutput { private final ByteBuf buffer; ChannelBufferByteOutput(ByteBuf buffer) { this.buffer = buffer; } @Override public void close() throws IOException { // Nothing to do } @Override public void flush() throws IOException { // nothing to do } @Override public void write(int b) throws IOException { buffer.writeByte(b); } @Override public void write(byte[] bytes) throws IOException { buffer.writeBytes(bytes); } @Override public void write(byte[] bytes, int srcIndex, int length) throws IOException { buffer.writeBytes(bytes, srcIndex, length); } /** * Return the {@link ByteBuf} which contains the written content * */ ByteBuf getBuffer() { return buffer; } }
客戶端
import io.netty.buffer.ByteBuf; import org.jboss.marshalling.ByteOutput; import java.io.IOException; public class ChannelBufferByteOutput implements ByteOutput { private final ByteBuf buffer; ChannelBufferByteOutput(ByteBuf buffer) { this.buffer = buffer; } @Override public void close() throws IOException { // Nothing to do } @Override public void flush() throws IOException { // nothing to do } @Override public void write(int b) throws IOException { buffer.writeByte(b); } @Override public void write(byte[] bytes) throws IOException { buffer.writeBytes(bytes); } @Override public void write(byte[] bytes, int srcIndex, int length) throws IOException { buffer.writeBytes(bytes, srcIndex, length); } /** * Return the {@link ByteBuf} which contains the written content * */ ByteBuf getBuffer() { return buffer; } } public class NettyConstant { public static String LOCALIP = "127.0.0.1"; public static String REMOTEIP = "127.0.0.1"; public static Integer LOCAL_PORT = 8085; public static Integer PORT = 9099; }
服務端
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.io.IOException; public class NettyServer { public void bind() throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(Channel ch)throws IOException{ ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast(new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50)); ch.pipeline().addLast(new LoginAuthRespHandler()); ch.pipeline().addLast("HeartBeatHandler",new HeartBeatRespHandler()); } }); // 綁定端口,同步等待成功 b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync(); System.out.println("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT)); } public static void main(String[] args) throws Exception { new NettyServer().bind(); } }
測試結果
服務端
12:30:32.998 [nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x4d893ed4, /127.0.0.1:9099] RECEIVED: [id: 0x343516a3, /127.0.0.1:8085 => /127.0.0.1:9099] 12:30:33.205 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetectionLevel: simple The login response is : NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=4, priority=0, attachment={}]] body [0] Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=26, sessionID=0, type=5, priority=0, attachment={}]] Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]] Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=26, sessionID=0, type=5, priority=0, attachment={}]] Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]] Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=26, sessionID=0, type=5, priority=0, attachment={}]] Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]]
客戶端
12:30:33.152 [nioEventLoopGroup-2-1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetectionLevel: simple Login is ok : NettyMessage [header=Header [crcCode=-1410399999, length=101, sessionID=0, type=4, priority=0, attachment={}]] Client send heart beat messsage to server : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=5, priority=0, attachment={}]] Client receive server heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=26, sessionID=0, type=6, priority=0, attachment={}]] Client send heart beat messsage to server : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=5, priority=0, attachment={}]] Client receive server heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=26, sessionID=0, type=6, priority=0, attachment={}]] Client send heart beat messsage to server : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=5, priority=0, attachment={}]] Client receive server heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=26, sessionID=0, type=6, priority=0, attachment={}]] Client send heart beat messsage to server : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=5, priority=0, attachment={}]]
