基於Netty的私有協議棧的開發
書是人類進步的階梯,每讀一本書都使自己得以提升,以前看書都是看了就看了,當時感覺受益匪淺,時間一長就又還回到書本了!所以說,好記性不如爛筆頭,以后每次看完一本書都寫一些讀后感,對於技術書則把對讓自己醍醐灌頂的篇章記錄下來,以便以后翻閱查看,也是記錄自己學習的過程- _ -。
OK!言歸正傳,最近由於公司需要做一個網關項目,需要用到基於TCP/IP私有協議接收數據,看完了《Netty權威指南》這本書,感覺作者寫的很好,有些地方讓我獲益良多,雖然書上有些例子跑不通(可能是因為環境問題吧),但是不妨礙理解作者想表達的意思,以前看Hadoop的時候總是對它的底層實現挺感興趣的,但最后不了了之了,看完這本書后,讓我明白了很多,感覺有必要拿出來分享一下書中第十四章講解的關於Netty的私有協議的設計和開發。
一、私有協議介紹
首先需要了解一些什么是私有協議,私有協議本質上是廠商內部發展和采用的標准,或者兩系統之間約定的數據交互格式,如書中提到的例子中提到的協議:
| 名稱 |
字段 |
類型 |
長度 |
描述 |
|
Header |
crcCode |
Int |
32 |
Netty消息校驗碼 |
| Length |
Int |
32 |
整個消息長度 |
|
| sessionID |
Long |
64 |
會話ID |
|
| Type |
Byte |
8 |
0:業務請求消息 1:業務響應消息 2:業務one way消息 3握手請求消息 4握手應答消息 5:心跳請求消息 6:心跳應答消息 |
|
| Priority |
Byte |
8 |
消息優先級:0~255 |
|
| Attachment |
Map<String,Object> |
變長 |
可選字段,由於推展消息頭 |
|
| Body |
|
Object |
變長 |
對於請求消息,它是方法的參數 對於響應消息,它是返回值 |
二、Netty協議棧功能設計
Netty協議棧用於內部各模塊之間的通信,它基於TCP/IP協議棧,是一個類HTTP協議的應用層協議棧,相比於傳統的標准協議棧,它更加輕巧、靈活和實用。
1.協議棧功能描述
Netty協議棧承載了業務內部各模塊之間的消息交互和服務調用,它的主要功能如下:
(1) 基於Netty的NIO通信框架,提供高性能的異步通信能力;
(2) 提供消息的編解碼框架,可以實現POJO的序列化和反序列化;
(3) 提供基於IP地址的白名單接入認證機制;
(4) 鏈路的有效性校驗機制;
(5) 鏈路的斷連重連機制;
2.通信模型
Netty協議棧通信模型如圖所示:
(1)Netty協議棧客戶端發送握手請求消息,攜帶節點ID等有效身份認證;
(2)Netty協議棧服務端對握手請求消息進行合法性校驗,包括節點ID有效性校驗、節點重復登錄校驗和IP地址合法性校驗,校驗通過后,返回登錄成功的握手應答消息;
(3)鏈路建立成功之后,客戶端發送業務消息;
(4)鏈路成功之后,服務端發送心跳消息;
(5)鏈路建立成功之后,客戶端發送心跳消息;
(6)鏈路建立成功之后,服務端發送業務消息;
(7)服務端退出時,服務端關閉連接,客戶端感知對方關閉連接后,被動關閉客戶端連接
備注:需要指出的是,Netty協議通信雙方鏈路建立成功之后,雙方可以進行全雙工通信,無論客戶端還是服務器端,都可以主動發送請求消息給對方,通信方式可以是TWO WAY或者ONE WAY。雙方之間的心跳采用Ping-Pong機制,當鏈路處於空閑狀態時,客戶端主動發送Ping消息給服務端,服務端接收到Ping消息后發送應答消息Pong給客戶端,如果客戶端連續發送N條Ping消息都沒有接收到服務端返回的Pong消息,說明鏈路已經掛死或者對方出入異常狀態,客戶端主動關閉連接,間隔周期T后發起重連操作,知道重連成功。
三、消息定義
Netty協議棧消息定義包含消息頭和消息體兩部分
Netty消息定義表
| 名稱 |
類型 |
長度 |
描述 |
| Header |
Header |
變長 |
消息頭定義 |
| Body |
Object |
變長 |
對於請求消息,它只是方法的參數 對於響應消息,它是返回值 |
Netty協議消息頭定義(Header)
| 名稱 |
類型 |
長度 |
描述 |
| crcCode |
Int |
32 |
Netty消息校驗碼 |
| Length |
Int |
32 |
整個消息長度 |
| sessionID |
Long |
64 |
會話ID |
| Type |
Byte |
8 |
0:業務請求消息 1:業務響應消息 2:業務one way消息 3握手請求消息 4握手應答消息 5:心跳請求消息 6:心跳應答消息 |
| Priority |
Byte |
8 |
消息優先級:0~255 |
| Attachment |
Map<String,Object> |
變長 |
可選字段,由於推展消息頭 |
四、私有協議的編解碼規范
4.1 協議的編碼
Netty協議NettyMessage的編碼規范如下:
(1)crcCode:java.nio.ByteBuffer.putInt(int value),如果采用其他緩沖區實現,必須與其等價;
(2)Length:java.nio.ByteBuffer.putInt(int value),如果采用其他緩沖區實現,必須與其等價;
(3)sessionID:java.nio.ByteBuffer.putLong(int value),如果采用其他緩沖區實現,必須與其等價;
(4)Type:java.nio.ByteBuffer.put(byte value),如果采用其他緩沖區實現,必須與其等價;
(5)Priority:java.nio.ByteBuffer.put(byte value),如果采用其他緩沖區實現,必須與其等價;
(6)attachMent:它的編碼規則為----如果attachment長度為0,標示沒有可選附件,則將長度編碼設為0,java.nio.ByteBuffer.putInt(0);如果大於0,說明有附件需要編碼,具體的編碼規則入校----
- 首先對附件的個數進行編碼,Java.nio.ByteBuffer.putInt(attachment.size());
然后對Key進行編碼,先編碼長度,再將它轉換成byte數組之后編碼內容。
(7)Body的編碼:通過JBoss Marshalling將其序列化為byte數組,然后調用java.nio.ByteBuffer.put(byte[] src)將其寫入ByteBuffer緩沖區中。
由於整個消息的長度必須等全部字段都編碼完成之后才能確認,所以最后需要更新消息頭中的length字段,將其重新寫入ByteBuffer中。
4.2 協議的解碼
相對於Netty的編碼,仍舊以java.nio.ByteBuffer為例,給出Netty協議的解碼規范。
(1)crcCode:通過java.nio.ByteBuffer.getInt()獲取校驗碼字段,其他緩沖區需要與其等價;
(2)Length:通過java.nio.ByteBuffer.getInt()獲取校驗碼字段,其他緩沖區需要與其等價;
(3)sessionID:通過java.nio.ByteBuffer.getLong()獲取校驗碼字段,其他緩沖區需要與其等價;
(4)Type:通過java.nio.byteBuffer.get()獲取消息類型,其他緩沖區需要與其等價;
(5)priority:通過java.nio.byteBuffer.get()獲取消息優先級,其他緩沖區需要與其等價;
(6)Attachment:它的劍麻規則為--首先創建一個新的attachment對象,調用java.nio.ByteBuffer.getInt()獲取附件的長度,如果為0,說明附件為空,解碼結束,繼續解碼消息頭;如果非空,則根據長度通過for循環進行解碼。
(7)Body:通過Jboss的marshaller對其進行解碼。
五、鏈路的建立
Netty協議棧支持服務端和客服端,對於使用Netty協議棧的應用程序而言,不需要刻意區分到底是客戶端還是服務器端,在分布式組網環境中,一個節點可能既是客戶端也是服務器端,這個依據具體的用戶場景而定。
Netty協議棧對客戶端的說明如下:如果A節點需要調用B節點的服務,但是A和B之間還沒有建立物理鏈路,則有調用方主動發起連接,此時,調用方為客戶端,被調用方為服務端。
考慮到安全,鏈路建立需要通過基於Ip地址或者號段的黑白名單安全認證機制,作為樣例,本協議使用基於IP地址的安全認知,如果有多個Ip,通過逗號進行分割。在實際的商用項目中,安全認證機制會更加嚴格,例如通過密鑰對用戶名和密碼進行安全認證。
客戶端與服務端鏈路建立成功之后,由客戶端發送握手請求消息,握手請求消息的定義如下
(1) 消息頭的type字段值為3;
(2) 可選附件數為0;
(3) 消息頭為空
(4) 握手消息的長度為22個字節
服務端接收到客戶端的握手請求消息之后,如果IP校驗通過,返回握手成功應答消息給客戶端,應用層鏈路建立成功。握手應答消息定義如下:
(1)消息頭的type字段值為4
(2)可選附件個數為0;
(3)消息體為byte類型的結果,0:認證成功;-1認證失敗;
鏈路建立成功之后,客戶端和服務端就可以互相發送業務消息了。
六、鏈路的關閉
由於采用長連接通信,在正常的業務運行期間,雙方通過心跳和業務消息維持鏈路,任何一方都不需要主動關閉連接。
但是,在以下情況下,客戶端和服務端需要關閉連接:
(1)當對方宕機或者重啟時,會主動關閉鏈路,另一方讀取到操作系統的通知信號
得知對方REST鏈路,需要關閉連接,釋放自身的句柄等資源。由於采用TCP全雙工通信,通信雙方都需要關閉連接,釋放資源;
(2)消息讀寫過程中,發生了I/O異常,需要主動關閉連接;
(3)心跳消息讀寫過程發生了I/O異常,需要主動關閉連接;
(4)心跳超時,需要主動關閉連接;
(5)發生編碼異常等不可恢復錯誤時,需要主動關閉連接。
七、可靠性設計
Netty協議棧可能會運行在非常惡劣的網絡環境中,網絡超時、閃斷、對方進程僵死或者處理緩慢等情況都有可能發生。為了保證在這些極端異常場景下Netty協議棧仍能夠正常工作或者自動恢復,需要對他的可靠性進行統一規划和設計。
7.1心跳機制
在凌晨等業務低谷時段,如果發生網絡閃斷、連接被Hang住等問題時,由於沒有業務消息,應用程序很難發現。到了白天業務高峰期時,會發生大量的網絡通信失敗,嚴重的會導致一段時間進程內無法處理業務消息。為了解決這個問題,在網絡空閑時采用心跳機制來檢測鏈路的互通性,一旦發現網絡故障,立即關閉鏈路,主動重連。
具體的設計思路如下:
(1)當網絡處於空閑狀態持續時間達到T(連續周期T沒有讀寫消息)時,客戶端主動發送Ping心跳消息給服務端;
(2)如果在下一個周期T到來時客戶端沒有收到對方發送的Pong心跳應答消息或者讀取到服務端發送的其他業務消息,則心跳失敗計數器加1;
(3)每當客戶端接收到服務的業務消息或者Pong應答消息,將心跳失敗計數器清零;當練習N次沒有接收到服務端的Pong消息或者業務消息,則關閉鏈路,間隔INTERVAL時間后發起重連操作;
(4)服務端網絡空閑狀態持續時間達到T后,服務端將心跳失敗計數器加1;只要接收到客戶端發送的Ping消息或者其他業務消息,計數器清零;
(5)服務端連續N次沒有接收到客戶端的ping消息或者其他業務消息,則關閉鏈路,釋放資源,等到客戶端重連。
通過Ping-Pong雙向心跳機制,可以保證無論通信哪一方出現網絡故障,都能被及時的檢查出來,為了防止由於對方短時間內繁忙沒有及時返回應答造成的誤判,只有連續N次心跳檢查都失敗才認定鏈路已經損害,需要關閉鏈路並重建鏈路。
當讀或者寫心跳消息發生I/O異常的時候,說明已經中斷,此時需要立即關閉連接,如果是客戶端,需要重新發起連接。如果是服務端,需要清空緩存的半包信息,等到客戶端重連。
7.2重連機制
如果鏈路中斷,等到INTEVAL時間后,由客戶端發起重連操作,如果重連失敗,間隔周期INTERVAL后再次發起重連,直到重連成功。
為了保持服務端能夠有充足的時間釋放句柄資源,在首次斷連時客戶端需要等待INTERVAL時間之后再發起重連,而不是失敗后立即重連。
為了保證句柄資源能夠及時釋放,無論什么場景下重連失敗,客戶端必須保證自身的資源被及時釋放,包括但不現居SocketChannel、Socket等。
重連失敗后,需要打印異常堆棧信息,方便后續的問題定位。
7.3重復登錄保護
當客戶端握手成功之后,在鏈路處於正常狀態下,不允許客戶端重復登錄,以防止客戶端在異常狀態下反復重連導致句柄資源被耗盡。
服務端接收到客戶端的握手請求消息之后,首先對IP地址進行合法性校驗,如果校驗成功,在緩存的地址表中查看客戶端是否已經登錄,如果登錄,則拒絕重復登錄,返回錯誤碼-1,同時關閉TCP鏈路,並在服務端的日志中打印握手失敗的原因。
客戶端接收到握手失敗的應答消息之后,關閉客戶端的TCP連接,等待INTERVAL時間之后,再次發起TCP連接,知道認證成功。
為了防止由服務端和客戶端對鏈路狀態理解不一致導致的客戶端無法握手成功問題,當服務端連續N次心跳超時之后需要主動關閉鏈路,清空改客戶端的地址緩存信息,以保證后續改客戶端可以重連成功,防止被重復登錄保護機制拒絕掉。
7.4消息緩存重發
無論客戶端還是服務端,當發生鏈路中斷之后,在鏈路恢復之前,緩存的消息隊列中待發送的消息不能丟失,等鏈路恢復之后,重新發送這些消息,保證鏈路中斷期間消息不丟失。
考慮到內存溢出的風險,建議消息緩存隊列設置上限,當達到上限之后,應該拒絕繼續想該隊列添加新的消息。
八、代碼設計
8.1 數據結構定義
首先對數據結構進行定義,netty的消息定義如下
| package cn.yesway.demo.privateprotocol.model; public class NettyMessage { private Header header;//消息頭 private Object body;//消息體 public Header getHeader() { return header; } public void setHeader(Header header) { this.header = header; } public Object getBody() { return body; } public void setBody(Object body) { this.body = body; } @Override public String toString() { return "NettyMessage [header="+header+"]"; } } |
消息頭(Header)的定義如下:
| package cn.yesway.demo.privateprotocol.model; import java.util.HashMap; import java.util.Map; public class Header { private int crcCode=0xabef0101; private int length;//消息長度 private long sessionID;//回話ID private byte type;//消息類型 private byte priority;//消息優先級 private Map<String,Object> attachment=new HashMap<String, Object>();//附件 public int getCrcCode() { return crcCode; } public void setCrcCode(int crcCode) { this.crcCode = crcCode; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public long getSessionID() { return sessionID; } public void setSessionID(long sessionID) { this.sessionID = sessionID; } public byte getType() { return type; } public void setType(byte type) { this.type = type; } public byte getPriority() { return priority; } public void setPriority(byte priority) { this.priority = priority; } public Map<String, Object> getAttachment() { return attachment; }
public void setAttachment(Map<String, Object> attachment) { this.attachment = attachment; } @Override public String toString() { return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionID=" + sessionID + ", type=" + type + ", priority=" + priority + ", attachment=" + attachment + "]"; } } |
由於心跳消息、握手請求和握手應答消息都可以統一由NettyMessage承載,所以不需要為這幾個內控制消息做單獨的數據結構定義。
8.2 消息編解碼
分別定義NettyMessageDecoder和NettyMessageEncoder用於NettyMessage消息的編解碼,他們的具體實現如下:
Netty消息編碼類:NettyMessageEncoder
| package cn.yesway.demo.privateprotocol.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.io.IOException; import java.util.Map; import cn.yesway.demo.privateprotocol.model.NettyMessage; public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage>{ private MarshallingEncoder marshallingEncoder; public NettyMessageEncoder() throws IOException{ marshallingEncoder = new MarshallingEncoder(); } @Override protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception { if(msg==null||msg.getHeader()==null){ throw new Exception("The encode message is null"); } out.writeInt(msg.getHeader().getCrcCode()); out.writeInt(msg.getHeader().getLength()); out.writeLong(msg.getHeader().getSessionID()); out.writeByte(msg.getHeader().getType()); out.writeByte(msg.getHeader().getPriority()); out.writeInt((msg.getHeader().getAttachment().size())); String key = null; byte[] keyArray = null; Object value = null; for(Map.Entry<String, Object> param:msg.getHeader().getAttachment().entrySet()){ key = param.getKey(); keyArray = key.getBytes("UTF-8"); out.writeInt(keyArray.length); out.writeBytes(keyArray); value = param.getValue(); marshallingEncoder.encode(value, out); } key = null; keyArray = null; value = null; if (msg.getBody() != null) { marshallingEncoder.encode(msg.getBody(), out); } else{ out.writeInt(0); } out.setInt(4, out.readableBytes() - 8); } } |
其中消息體的編碼采用Jboss的Marshalling來編碼的,這里不再說明,具體的實現請參見Github源碼,地址:https://github.com/wz12406/netty-demo里面有書中的源碼,也有我比葫蘆畫瓢寫的代碼,不過抄一遍代碼也讓自己對netty有了進一步的認識。
Netty消息編碼工具類:MarshallingEncoder
| package cn.yesway.demo.privateprotocol.codec;
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import java.io.IOException; import org.jboss.marshalling.Marshaller; @Sharable public class MarshallingEncoder { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; Marshaller marshaller; public MarshallingEncoder() throws IOException { marshaller = MarshallingCodecFactory.buildMarshalling(); } protected void encode(Object msg, ByteBuf out) throws Exception { try { int lengthPos = out.writerIndex(); out.writeBytes(LENGTH_PLACEHOLDER); ChannelBufferByteOutput output = new ChannelBufferByteOutput(out); marshaller.start(output); marshaller.writeObject(msg); marshaller.finish(); out.setInt(lengthPos, out.writerIndex() - lengthPos - 4); } finally { marshaller.close(); } } } |
Netty消息解碼工具類:MarshallingDecoder
| package cn.yesway.demo.privateprotocol.codec; import io.netty.buffer.ByteBuf; import java.io.IOException; import java.io.StreamCorruptedException; import org.jboss.marshalling.ByteInput; import org.jboss.marshalling.Unmarshaller; public class MarshallingDecoder { private final Unmarshaller unmarshaller; public MarshallingDecoder() throws IOException { unmarshaller = MarshallingCodecFactory.buildUnMarshalling(); } protected Object decode(ByteBuf in) throws Exception { int objectSize = in.readInt(); ByteBuf buf = in.slice(in.readerIndex(), objectSize); ByteInput input = new ChannelBufferByteInput(buf); try { unmarshaller.start(input); Object obj = unmarshaller.readObject(); unmarshaller.finish(); in.readerIndex(in.readerIndex() + objectSize); return obj; } finally { unmarshaller.close(); } } } |
Netty消息解碼類:NettyMessageDecoder:
| package cn.yesway.demo.privateprotocol.codec; import java.io.IOException; import java.util.HashMap; import java.util.Map; import cn.yesway.demo.privateprotocol.model.Header; import cn.yesway.demo.privateprotocol.model.NettyMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder { MarshallingDecoder marshallingDecoder ; public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException { super(maxFrameLength, lengthFieldOffset, lengthFieldLength); this.marshallingDecoder = new MarshallingDecoder(); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = (ByteBuf) super.decode(ctx, in); if(frame == null)return null; NettyMessage message = new NettyMessage(); Header header = new Header(); header.setCrcCode(frame.readInt()); header.setLength(frame.readInt()); header.setSessionID(frame.readLong()); header.setType(frame.readByte()); header.setPriority(frame.readByte()); int size = frame.readInt(); if (size > 0) { Map<String, Object> attch = new HashMap<String, Object>(size); int keySize = 0; byte[] keyArray = null; String key = null; for (int i = 0; i < size; i++) { keySize = frame.readInt(); keyArray = new byte[keySize]; frame.readBytes(keyArray); key = new String(keyArray, "UTF-8"); attch.put(key, marshallingDecoder.decode(frame)); } keyArray = null; key = null; header.setAttachment(attch); } if (frame.readableBytes() > 4) { message.setBody(marshallingDecoder.decode(frame)); } message.setHeader(header); return message; } } |
在這里我們用到了Netty的LengthFieldBasedFrameDecoder解碼器,它支持自動的Tcp粘包和半包處理,只需要給出標識消息長度的字段偏移量和消息長度自身所占的字節數,Netty就能自動實現對半包的處理。對於業務解碼器來說,調用父類LengthFieldBasedFrameDecoder的解碼方法后,返回的就是整包消息或者為空,如果為空說明是個半包,之間返回繼續由I/O線程讀取后續的碼流。
8.3 握手和安全認證
握手的發起是在客戶端和服務器端TCP鏈路建立成功通道激活時,握手消息的接入和安全認證在服務端的處理。下面看下具體實現。
首先開發一個握手認證的客戶端ChannelHandle,用於在通道激活時發起握手請求,具體代碼實現如下。
| package cn.yesway.demo.privateprotocol.client; import cn.yesway.demo.privateprotocol.MessageType; import cn.yesway.demo.privateprotocol.model.Header; import cn.yesway.demo.privateprotocol.model.NettyMessage; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class LoginAuthReqHandler extends ChannelHandlerAdapter{
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buildLoginReq()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; System.out.println("--------------------------------"); //如果是握手應答消息,需要判斷是否認證成功 if(message.getHeader()!=null&&message.getHeader().getType()==MessageType.LOGIN_RESP.value()){ byte loginResult = (byte) message.getBody(); if(loginResult!=(byte)0){ ctx.close(); }else{ System.out.println("Login is OK:"+message); ctx.fireChannelRead(message); } }else{ ctx.fireChannelRead(message); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception { System.out.println("--------------------------"); ctx.fireExceptionCaught(cause); } private NettyMessage buildLoginReq() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.LOGIN_REQ.value()); message.setHeader(header); return message; } } |
接着看服務端的握手接入和安全認證代碼。
| package cn.yesway.demo.privateprotocol.server;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;
import cn.yesway.demo.privateprotocol.MessageType; import cn.yesway.demo.privateprotocol.model.Header; import cn.yesway.demo.privateprotocol.model.NettyMessage; public class LoginAuthRespHandler extends ChannelHandlerAdapter{ private Map<String,Boolean> nodeCheck = new ConcurrentHashMap<String,Boolean>(); private String[] whiteList = {"127.0.0.1","10.1.2.95"}; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; if(message.getHeader()!=null&&message.getHeader().getType()==MessageType.LOGIN_REQ.value()){ String nodeIndex = ctx.channel().remoteAddress().toString(); NettyMessage loginResp = null; //重復登錄,拒絕 if(nodeCheck.containsKey(nodeIndex)){ loginResp = buildResponse((byte)-1); }else{ InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = address.getAddress().getHostAddress(); boolean isOK = false; for(String WIP:whiteList){ if(WIP.equals(ip)){ isOK = true; } } loginResp = isOK?buildResponse((byte)0):buildResponse((byte)-1); if(isOK)nodeCheck.put(ip, true); System.out.println("The login response is :"+loginResp +" body["+loginResp.getBody()+"]"); ctx.writeAndFlush(loginResp); } }else{ ctx.fireChannelRead(msg); } } private NettyMessage buildResponse(byte b) { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.LOGIN_RESP.value()); message.setHeader(header); message.setBody(b); return message; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { nodeCheck.remove(ctx.channel().remoteAddress().toString()); ctx.close(); ctx.fireExceptionCaught(cause); } } |
8.4 心跳檢測機制
握手成功之后,由客戶端主動發送心跳消息,服務端接收到心跳消息之后,返回心跳應答消息。由於心跳消息的目的是為了檢測鏈路的可用性,因此不需要攜帶消息體。
客戶端發送心跳請求的代碼如下:
| package cn.yesway.demo.book.protocol.netty.client;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit;
import cn.yesway.demo.book.protocol.netty.MessageType; import cn.yesway.demo.book.protocol.netty.struct.Header; import cn.yesway.demo.book.protocol.netty.struct.NettyMessage;
/** * @author Lilinfeng * @date 2014年3月15日 * @version 1.0 */ public class HeartBeatReqHandler extends ChannelHandlerAdapter {
private volatile ScheduledFuture<?> heartBeat;
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 握手成功,主動發送心跳消息 if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP .value()) { heartBeat = ctx.executor().scheduleAtFixedRate( new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS); } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP .value()) { System.out .println("Client receive server heart beat message : ---> " + message); } else ctx.fireChannelRead(msg); }
private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx;
public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; }
@Override public void run() { NettyMessage heatBeat = buildHeatBeat(); System.out .println("Client send heart beat messsage to server : ---> " + heatBeat); ctx.writeAndFlush(heatBeat); }
private NettyMessage buildHeatBeat() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_REQ.value()); message.setHeader(header); return message; } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } } |
服務端的心跳應答Handler代碼如下:
|
package cn.yesway.demo.book.protocol.netty.server;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import cn.yesway.demo.book.protocol.netty.MessageType; import cn.yesway.demo.book.protocol.netty.struct.Header; import cn.yesway.demo.book.protocol.netty.struct.NettyMessage;
public class HeartBeatRespHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 返回心跳應答消息 if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ .value()) { System.out.println("Receive client heart beat message : ---> " + message); NettyMessage heartBeat = buildHeatBeat(); System.out .println("Send heart beat response message to client : ---> " + heartBeat); ctx.writeAndFlush(heartBeat); } else ctx.fireChannelRead(msg); }
private NettyMessage buildHeatBeat() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_RESP.value()); message.setHeader(header); return message; }
} |
服務端的心跳Handler非常簡單,接收到心跳請求消息后,構造心跳應答消息返回,並打印接收和發送的心跳消息。
心跳超時的實現非常簡單,直接利用Netty的ReadTimeoutHandler機制,當一定周期內(默認值50s)沒有讀取到對方任何消息時,需要主動關閉鏈路。如果是客戶端,重新發起連接:如果是服務端,釋放資源,清除客戶端登錄緩存信息,等到服務端重連。
8.5 斷連重連
當客戶端感知斷連事件之后,釋放資源,重新發起連接,具體實現代碼如下:
首先監聽網絡斷連事件,如果channel關閉,則執行后續的重連任務,通過Bootstarp重新發起連接,客戶端掛在closeFuture上監聽鏈路關閉信號,一旦關閉,則創建重連定時器,5s之后重新發起連接,直到重連成功。
服務器端感知到斷連事件之后,需要情況緩存的登錄認證注冊消息,以保證后續客戶端能夠正常連接。
8.6 客戶端代碼
客戶端主要用於初始化系統資源,根據配置信息發起連接,代碼如下:
| package cn.yesway.demo.privateprotocol.client;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler;
import java.net.InetSocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;
import cn.yesway.demo.privateprotocol.NettyConstant; import cn.yesway.demo.privateprotocol.codec.NettyMessageDecoder; import cn.yesway.demo.privateprotocol.codec.NettyMessageEncoder; /** * @author wangzhen * @version 1.0 * @createDate:2015年12月16日 下午4:14:47 * */ public class NettyClient { private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); EventLoopGroup group = new NioEventLoopGroup();
public static void main(String[] args) { new NettyClient().connect(NettyConstant.PORT,NettyConstant.REMOTEIP); }
private void connect(int port, String host) {
try { Bootstrap b =new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<Channel>() {
@Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new NettyMessageDecoder(1024*1024, 4, 4)); ch.pipeline().addLast("MessageEncoder",new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50)); ch.pipeline().addLast("LoginAuthHandler",new LoginAuthReqHandler()); ch.pipeline().addLast("HeartBeatHandler",new HeartBeatReqHandler());
} }); //發起異步連接操作 ChannelFuture future = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(NettyConstant.LOCALIP,NettyConstant.LOCAL_PORT)).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ executorService.execute(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(5); connect(NettyConstant.PORT, NettyConstant.REMOTEIP); } catch (Exception e) { e.printStackTrace(); } } }); }
} } |
利用Netty的ChannelPipeline和ChannelHandler機制,可以非常方便的實現功能的解耦和業務產品的定制。例如本例中的心跳定時器、握手請求和后端的業務處理可以通過不同的Handler來實現,類似於Aop。通過Handler chain的機制可以方便的實現切面攔截和定制,相比於Aop它的性能更高。
8.7 服務器端代碼
相對於客戶端,服務器端的代碼更簡單一些,主要的工作就是握手的接入認證等,不用關心斷連重連等事件。
服務端的代碼如下:
| package cn.yesway.demo.privateprotocol.server;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import cn.yesway.demo.privateprotocol.NettyConstant; import cn.yesway.demo.privateprotocol.codec.NettyMessageDecoder; import cn.yesway.demo.privateprotocol.codec.NettyMessageEncoder;
/** * @author wangzhen * @version 1.0 * @createDate:2015年12月16日 下午4:37:42 * */ public class NettyServer {
public static void main(String[] args) throws Exception{ new NettyServer().bind(); }
private void bind() throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<Channel>() {
@Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new NettyMessageDecoder(1024*1024,4,4)); ch.pipeline().addLast("MessageEncoder",new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50)); ch.pipeline().addLast(new LoginAuthRespHandler()); ch.pipeline().addLast("HeartBeatHandler",new HeartBeatRespHandler()); }
}); //綁定端口,同步等待成功 b.bind(NettyConstant.REMOTEIP,NettyConstant.PORT).sync(); System.out.println("netty server start ok:"+(NettyConstant.REMOTEIP+":"+NettyConstant.PORT)); } } |
代碼托管地址:https://github.com/wz12406/netty-demo
