Netty學習筆記


一些類與方法說明

1)ByteBuf
ByteBuf的API說明:

  • Creation of a buffer
    It is recommended to create a new buffer using the helper methods in Unpooled rather than calling an individual implementation's constructor.
    建議用Unpooled類的幫助方法來創建一個ByteBuf,而不是用new ByteBuf()創建。具體如下:
 import static io.netty.buffer.Unpooled.*;
 ByteBuf heapBuffer    = buffer(128);
 ByteBuf directBuffer  = directBuffer(256);
 ByteBuf wrappedBuffer = wrappedBuffer(new byte[128], new byte[256]);
 ByteBuf copiedBuffe r = copiedBuffer(ByteBuffer.allocate(128));
  • Random Access Indexing
    除了順序讀寫之外,ByteBuf還支持隨機讀寫,這些方法如下:


    必須注意的是,set操作與write操作不支持動態擴展緩沖區,並且不會修改讀寫索引。
  • Sequential Access Indexing
  • Readable bytes (the actual content)
  • Writable bytes
  • Discardable bytes
    緩沖區的分配和釋放是個耗時的操作,我們需要盡量重用它們;但同時頻繁的調用將會導致性能下降,我們在調用前要確認是否確實需要這樣做。
  • Clearing the buffer indexes
  • Search operations
  • Mark and reset
    mark操作會將當前的位置指針備份到mark變量中,當調用reset操作之后,重新將指針的當前位置恢復為備份在mark中的值。ByteBuffer只有2個相關方法,而ByteBuf有4個相關方法,因為ByteBuf有兩個位置指針。
  • Derived buffers
    duplicate()操作,兩個對象的內容是共享的,讀寫索引是獨立的;copy()操作,兩個對象的內容和讀寫索引都是獨立的;slice()操作,兩個對象的內容是共享的,讀寫索引是獨立的。
  • Conversion to existing JDK types
    將ByteBuf轉換為ByteBuffer操作的方法有兩個,nioBuffer()和nioBuffer(int index, int length)。

ByteBuf的典型用法示例:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    byte[] req = new byte[buf.readableBytes()];
    buf.readBytes(req);
    String body = new String(req, "UTF-8");
    System.out.println(body);
}

ByteBuffer完全可以滿足NIO編程的需要,但是由於NIO的復雜性,ByteBuffer也有其局限性,它的主要缺點如下:

  • ByteBuffer長度固定,一旦分配完成,它的容量不能動態擴展和收縮;
  • ByteBuffer只有一個標識位置的指針,讀寫的時候需要手工調用clear()、flip()和rewind()等;
  • ByteBuffer的API功能有限,一些高級和實用的特性它不支持,需要使用者自己實現。

ByteBuffer的典型用法示例:

ByteBuffer buffer = ByteBuffer.allocate(100);
String value = "0123456789";
buffer.put(value.getBytes());
buffer.flip();
byte[] valueArray = new byte[buffer.remaining()];
buffer.get(valueArray);
String decodeValue = new String(valueArray);

2)connect()
當服務端要進行重復登錄檢查時,需要綁定客戶端端口。並且,從產品管理角度看,一般情況下不允許客戶端系統隨便使用隨機端口。

ChannelFuture future = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(MyConstant.LOCAL_IP, MyConstant.LOCAL_PORT)).sync();
future.channel().closeFuture().sync();

解決粘包/拆包問題

這個問題其實是流式協議的特點。常用的解決方案有:
1)消息定長; 2)在包尾增加回車換行符進行分割; 3)將特殊的分隔符作為消息的結束符; 4)將消息分為消息頭和消息體; 5)更復雜的應用層協議。

Netty提供了多種解碼器用於處理該問題,消息接收端只需要將相應的handler添加到ChannelPipeline中即可。如FixedLengthFrameDecoder用於方案1,LineBasedFrameDecoder用於方案2,DelimiterBasedFrameDecoder用於方案3,LengthFieldBasedFrameDecoder用於方案4。

方案1: FixedLengthFrameDecoder + StringDecoder組合。部分源碼如下:

new ChannelInitializer<SocketChannel>() {
	@Override
	public void initChannel(SocketChannel ch) throws Exception {
		ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
		ch.pipeline().addLast(new StringDecoder());
		ch.pipeline().addLast(new MyHandler());
	}
}

FixedLengthFrameDecoder的API說明如下:
A decoder that splits the received ByteBufs by the fixed number of bytes.

方案2: LineBasedFrameDecoder + StringDecoder組合就是按行切換的文本解碼器。不過需要注意的是,發送數據是要加上separator,而解碼之后是沒有separator的。部分源碼如下:

new ChannelInitializer<SocketChannel>() {
	@Override
	public void initChannel(SocketChannel ch)
		throws Exception {
		ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
		ch.pipeline().addLast(new StringDecoder());
		ch.pipeline().addLast(new MyHandler());
	}
}

LineBasedFrameDecoder的API說明如下:
A decoder that splits the received ByteBufs on line endings. Both "\n" and "\r\n" are handled. For a more general delimiter-based decoder, see DelimiterBasedFrameDecoder.
StringDecoder的API說明如下:
Decodes a received ByteBuf into a String. Please note that this decoder must be used with a proper ByteToMessageDecoder such as DelimiterBasedFrameDecoder or LineBasedFrameDecoder if you are using a stream-based transport such as TCP/IP.

方案3: DelimiterBasedFrameDecoder + StringDecoder組合。部分源碼如下:

new ChannelInitializer<SocketChannel>() {
	@Override
	public void initChannel(SocketChannel ch) throws Exception {
		ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
		ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
		ch.pipeline().addLast(new StringDecoder());
		ch.pipeline().addLast(new MyHandler());
	}
}

DelimiterBasedFrameDecoder的API說明如下:
A decoder that splits the received ByteBufs by one or more delimiters. It is particularly useful for decoding the frames which ends with a delimiter such as NUL or newline characters.
DelimiterBasedFrameDecoder allows you to specify more than one delimiter. If more than one delimiter is found in the buffer, it chooses the delimiter which produces the shortest frame.

方案4:LengthFieldBasedFrameDecoder。部分源碼如下:

//消息接收端
public class MyMessageDecoder extends LengthFieldBasedFrameDecoder {

	public MyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException {
		super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
		//...
	}
	
	@Override
	protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
		ByteBuf frame = (ByteBuf) super.decode(ctx, in); //解碼
		if (frame == null) {
			return null;
		}
		//...
	}
}
//消息發送端
public final class MyMessageEncoder extends MessageToByteEncoder<MyMessage> {

    public MyMessageEncoder() {
        //...
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf sendBuf) throws Exception {
        if (msg == null || msg.getHeader() == null)
            //...

        sendBuf.writeInt((msg.getHeader().getLength()));
        sendBuf.writeByte((msg.getHeader().getType()));
        sendBuf.writeByte((msg.getHeader().getPriority()));
        //...

        if (msg.getBody() != null) {
            //...
        } else {
            //...        
        }
        //...
    }
}

LengthFieldBasedFrameDecoder解碼器支持自動的TCP粘包和半包處理,只需要給出標識消息長度的字段偏移量和消息長度自身所占的字節數。
LengthFieldBasedFrameDecoder的API說明如下:
A decoder that splits the received ByteBufs dynamically by the value of the length field in the message. It is particularly useful when you decode a binary message which has an integer header field that represents the length of the message body or the whole message.
LengthFieldBasedFrameDecoder has many configuration parameters so that it can decode any message with a length field, which is often seen in proprietary client-server protocols.

並且API說明給出了使用該解碼器的多種情況:

  • 2 bytes length field at offset 0, do not strip header
  • 2 bytes length field at offset 0, strip header
  • 2 bytes length field at offset 0, do not strip header, the length field represents the length of the whole message
  • 3 bytes length field at the end of 5 bytes header, do not strip header
  • 3 bytes length field at the beginning of 5 bytes header, do not strip header
  • 2 bytes length field at offset 1 in the middle of 4 bytes header, strip the first header field and the length field
  • 2 bytes length field at offset 1 in the middle of 4 bytes header, strip the first header field and the length field, the length field represents the length of the whole message

編解碼

編解碼框架優劣標准: 是否支持跨語言; 編碼后的碼流大小; 編解碼的性能; API使用是否方便。
無論是序列化后的碼流大小,還是序列化性能,JDK默認的序列化機制表現得都很差。適當的時候用Netty的ObjectEncoder + ObjectDecoder、Google Protobuf編解碼、JBoss Marshalling代替。

方案1: ObjectEncoder + ObjectDecoder
接收時,encoder后可以直接將msg強制轉換為相應類的對象; 發送時,可以重寫相應類的toString()方法,並用writeAndFlush()發送相應對象。特別地,這時不用考慮粘包和拆包問題。
server部分源碼:

new ChannelInitializer<SocketChannel>() {
	@Override
	public void initChannel(SocketChannel ch) {
		ch.pipeline().addLast(new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
		ch.pipeline().addLast(new ObjectEncoder());
		ch.pipeline().addLast(new MyServerHandler());
	}
}

client部分源碼:

new ChannelInitializer<SocketChannel>() {
	@Override
	public void initChannel(SocketChannel ch) throws Exception {
		ch.pipeline().addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
		ch.pipeline().addLast(new ObjectEncoder());
		ch.pipeline().addLast(new MyClientHandler());
	}
}

ObjectDecoder的API說明:
A decoder which deserializes the received ByteBufs into Java objects.
Please note that the serialized form this decoder expects is not compatible with the standard ObjectOutputStream. Please use ObjectEncoder or ObjectEncoderOutputStream to ensure the interoperability with this decoder.

ObjectEncoder的API說明:
An encoder which serializes a Java object into a ByteBuf.
Please note that the serialized form this encoder produces is not compatible with the standard ObjectInputStream. Please use ObjectDecoder or ObjectDecoderInputStream to ensure the interoperability with this encoder.

方案2: Google Protobuf編解碼
摘自overview
Protocol buffers are a flexible, efficient, automated mechanism for serializing structured data – think XML, but smaller, faster, and simpler.

摘自tutorials
With protocol buffers, you write a .proto description of the data structure you wish to store. From that, the protocol buffer compiler creates a class that implements automatic encoding and parsing of the protocol buffer data with an efficient binary format. The generated class provides getters and setters for the fields that make up a protocol buffer and takes care of the details of reading and writing the protocol buffer as a unit. Importantly, the protocol buffer format supports the idea of extending the format over time in such a way that the code can still read data encoded with the old format.

server部分源碼:

new ChannelInitializer<SocketChannel>() {
	@Override
	public void initChannel(SocketChannel ch) {
		ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
		ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
		ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
		ch.pipeline().addLast(new ProtobufEncoder());
		ch.pipeline().addLast(new MyServerHandler());
	}
}

client部分源碼:

new ChannelInitializer<SocketChannel>() {
	@Override
	public void initChannel(SocketChannel ch) {
		ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
		ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
		ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
		ch.pipeline().addLast(new ProtobufEncoder());
		ch.pipeline().addLast(new MyClientHandler());
	}
}

源碼中使用了ProtobufVarint32LengthFieldPrepender和ProtobufVarint32FrameDecoder編解碼器,實際上是上文提到的解決粘包/拆包問題方案4的運用。關於Protobuf部分,請詳細查看tutorials,業務邏輯中不需要再關注編解碼了。

Netty from the ground up

1)兩種發送數據方式的不同
Once a ChannelHandler is added to a ChannelPipeline it also gets what's called a ChannelHandlerContext. Typically it is safe to get a reference to this object and keep it around. This is not true when a datagram protocol is used such as UDP. This object can later be used to obtain the underlying channel but is typically kept because you use it to write/send messages. This means there are two ways of sending messages in Netty. You can write directly to the channel or write to the ChannelHandlerContext object. The main difference between the two is that writing to the channel directly causes the message to start from the tail of the ChannelPipeline where as writing to the context object causes the message to start from the next handler in the ChannelPipeline.

2)簡單的handler只需要繼承SimpleChannelInboundHandler
To create a handler like this, your application only needs to extend the base class called SimpleChannelInboundHandler , where T is the type of message your handler can process. It is in this handler where your application obtains a reference to the ChannelHandlerContext by overriding one of the methods from the base class, all of them accept the ChannelHandlerContext as a parameter which you can then store as a field in the class.

特別注意SimpleChannelInboundHandler的API說明:
Be aware that depending of the constructor parameters it will release all handled messages by pass them to ReferenceCountUtil.release(Object). In this case you may need to use ReferenceCountUtil.retain(Object) if you pass the object to the next handler in the ChannelPipeline.
這和ChannelHandlerAdapter是不同的。

3)當業務邏輯耗時的時候可以使用EventExecutorGroup
As said before you MUST NOT block the IO Thread at all. This means doing blocking operations within your ChannelHandler is problematic. Lucky enough there is a solution for this. Netty allows to specify an EventExecutorGroup when adding ChannelHandlers to the ChannelPipeline. This EventExecutorGroup will then be used to obtain an EventExecutor and this EventExecutor will execute all the methods of the ChannelHandler. The EventExecutor here will use a different Thread then the one that is used for the IO and thus free up the
EventLoop.

4)關於ChannelPipeline與ChannelInitializer
Both ChannelInboundHandler and ChannelOutboundHandler can be mixed into the same ChannelPipeline.

The role of the ChannelInitializer is to add ChannelHandler implementations to what's called the ChannelPipeline.
An ChannelInitializer is also itself a ChannelHandler which automatically removes itself from the ChannelPipeline after it has added the other handlers.

5)不需要關注多線程同步,同步已經被Netty的機制保證了
When a channel is registered, Netty binds that channel to a single EventLoop (and so to a single thread) for the lifetime of that Channel. This is why your application doesn't need to synchronize on Netty IO operations because all IO for a given Channel will always be performed by the same thread.

6)Netty中所有的IO操作默認都是異步的,但是這些提交的異步操作的執行是有先后關系的
So basically a ChannelFuture is a placeholder for a result of an operation that is executed in the future. When exactly it is executed depends on many facts and is not easy to say. The only thing you can be sure of is that it will be executed and all operations that return a ChannelFuture and belong to the same Channel will be executed in the correct order, which is the same order as you executed the methods.

關於UDP協議

UDP使用的是面向無連接的、不可靠的數據報投遞服務。當使用UDP協議傳輸信息時,用戶應用程序必須負責解決數據報的丟失、重復、排序、差錯確認等問題。
由於UDP具有資源消耗小、處理速度快的優點,所以通常視頻、音頻等可靠性要求不高的數據傳輸一般會使用UDP,即使有一定的丟包率,也不會對功能造成嚴重的影響。

文件傳輸

Netty的文件傳輸無論在功能還是在可靠性方面,相比於傳統的I/O類庫或者其他一些第三方文件傳輸類庫,都有獨特的優勢。
server部分源碼:

public class MyHandler extends SimpleChannelInboundHandler<String> {

    private static final String CR = System.getProperty("line.separator");

    public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
        File file = new File(msg);
        if (file.exists()) {
            if (!file.isFile()) {
                ctx.writeAndFlush("Not a file : " + file + CR);
                return;
            }
            ctx.write(file + " " + file.length() + CR);
            RandomAccessFile randomAccessFile = new RandomAccessFile(msg, "r");
            FileRegion region = new DefaultFileRegion(randomAccessFile.getChannel(), 0,randomAccessFile.length());
            ctx.write(region);
            ctx.writeAndFlush(CR);
            randomAccessFile.close();
        } else {
            ctx.writeAndFlush("File not found: " + file + CR);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

DefaultFileRegion的API說明:
Default FileRegion implementation which transfer data from a FileChannel or File. Be aware that the FileChannel will be automatically closed once AbstractReferenceCounted.refCnt() returns 0.
DefaultFileRegion類實現了FileRegion接口,繼承了AbstractReferenceCounted類。創建DefaultFileRegion實例時可以傳入一個FileChannel實現或一個File實例,而且在這里使用FileChannel不需要手動close。

FileRegion的API說明:
If your operating system (or JDK / JRE) does not support zero-copy file transfer, sending a file with FileRegion might fail or yield worse performance. For example, sending a large file doesn't work well in Windows.
FileRegion的使用需要操作系統(或JDK)支持zero-copy,這是因為Netty中是通過在FileRegion中包裝了NIO的FileChannel.transferTo()方法實現的零拷貝。操作系統,設備驅動,文件系統,網絡協議棧都會影響zero-copy。
wikipedia有這樣兩段話:
1、Java input streams can support zero-copy through the java.nio.channels.FileChannel's transferTo() method if the underlying operating system also supports zero copy.
2、Zero-copy versions of operating system elements, such as device drivers, file systems, and network protocol stacks, greatly increase the performance of certain application programs and more efficiently utilize system resources.

關於zero-copy,請參考wikipediadeveloperWorks 中國,在這兒就不再更多地描述了。

需要注意的是,在進行大文件傳輸的時候,一次將文件的全部內容映射到內存中,很有可能導致內存溢出。為了解決大文件傳輸過程中的內存溢出,Netty提供了ChunkedWriteHandler來解決大文件或者碼流傳輸過程中可能發生的內存溢出問題。
ChunkedWriteHandler的API說明:
A ChannelHandler that adds support for writing a large data stream asynchronously neither spending a lot of memory nor getting OutOfMemoryError. Large data streaming such as file transfer requires complicated state management in a ChannelHandler implementation. ChunkedWriteHandler manages such complicated states so that you can send a large data stream without difficulties.

心跳機制

參考Netty系列之Netty可靠性分析關於“鏈路的有效性檢測”的說明

要解決鏈路的可靠性問題,必須周期性的對鏈路進行有效性檢測。目前最流行和通用的做法就是心跳檢測。
不同的協議,心跳檢測機制也存在差異,歸納起來主要分為兩類:

  1. Ping-Pong型心跳:由通信一方定時發送Ping消息,對方接收到Ping消息之后,立即返回Pong應答消息給對方,屬於請求-響應型心跳;
  2. Ping-Ping型心跳:不區分心跳請求和應答,由通信雙方按照約定定時向對方發送心跳Ping消息,它屬於雙向心跳。
    Netty提供的空閑檢測機制分為三種:
  3. 讀空閑,鏈路持續時間t沒有讀取到任何消息;
  4. 寫空閑,鏈路持續時間t沒有發送任何消息;
  5. 讀寫空閑,鏈路持續時間t沒有接收或者發送任何消息。

需要補充的是,盡管有SO_KEEPALIVE(周期性測試連接是否仍存活)選項,但是應用層的心跳還是必不可少的。《Linux多線程服務器端編程》解釋

心跳除了說明應用程序還活着(進程還在,網絡通暢),更重要的是表明應用程序還能正常工作。而 TCP keepalive 有操作系統負責探查,即便進程死鎖,或阻塞,操作系統也會如常收發 TCP keepalive 消息。對方無法得知這一異常。

這兒提供兩種實現心跳檢測的方案:
1)使用io.netty.handler.timeout包下的相關類與接口
ReadTimeoutHandler實現了readTimedOut()方法

/**
 * Is called when a read timeout was detected.
 */
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
    if (!closed) {
        ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
        ctx.close();
        closed = true;
    }
}

使用方法

WriteTimeoutHandler實現了writeTimedOut()方法

/**
 * Is called when a write timeout was detected
 */
protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
    if (!closed) {
        ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
        ctx.close();
        closed = true;
    }
}

使用方法

IdleStateHandler實現了channelIdle()方法

/**
 * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
 * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
 */
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
    ctx.fireUserEventTriggered(evt);
}

使用方法

2)利用Netty提供的自定義EventExecutor接口實現
這是《Netty權威指南》中給出的一種實現方式,client實現Ping,server實現Pong。
client部分源碼:

public class HeartBeatReqHandler extends ChannelHandlerAdapter {

    private volatile ScheduledFuture<?> heartBeat;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;
        
        if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) { // 握手成功, 主動發送心跳消息
            heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
        } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP.value()) { // 心跳信號
            System.out.println("Client receive server heart beat message : ---> " + message);
        } else //一般消息, 透傳給下一個handler
            ctx.fireChannelRead(msg);
    }

    private class HeartBeatTask implements Runnable {
        private final ChannelHandlerContext ctx;

        public HeartBeatTask(final ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            NettyMessage heatBeat = buildHeatBeat();
            System.out.println("Client send heart beat messsage to server : ---> " + heatBeat);
            ctx.writeAndFlush(heatBeat); //Ping
        }

        private NettyMessage buildHeatBeat() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.HEARTBEAT_REQ.value());
            message.setHeader(header);
            return message;
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        if (heartBeat != null) {
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.fireExceptionCaught(cause);
    }
}

ChannelHandlerContext.executor()返回EventExecutor接口,該接口繼承了EventExecutorGroup接口,EventExecutor的API說明:
The EventExecutor is a special EventExecutorGroup which comes with some handy methods to see if a Thread is executed in a event loop. Besides this, it also extends the EventExecutorGroup to allow for a generic way to access methods.
EventExecutor除了scheduleAtFixedRate()方法,還有scheduleWithFixedDelay()方法可供使用,它們均返回ScheduledFuture,ScheduledFuture的API說明:
The result of an scheduled asynchronous operation.

server部分源碼:

public class HeartBeatRespHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;
        // 返回心跳應答消息
        if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ.value()) {
            System.out.println("Receive client heart beat message : ---> " + message);
            NettyMessage heartBeat = buildHeatBeat();
            System.out.println("Send heart beat response message to client : ---> " + heartBeat);
            ctx.writeAndFlush(heartBeat); //Pong
        } else
            ctx.fireChannelRead(msg);
    }

    private NettyMessage buildHeatBeat() {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.HEARTBEAT_RESP.value());
        message.setHeader(header);
        return message;
    }
}

重連機制

如果鏈路中斷,等待INTERVAL時間后,由客戶端發起重連操作,如果重連失敗,間隔周期INTERVAL后再次發起重連,直到重連成功。
為了保證服務端有充足的時間釋放句柄資源,在首次斷連時客戶端需要等待INTERVAL時間之后再發起重連,而不是失敗后就立即重連。
為了保證句柄資源能夠及時釋放,無論在什么場景下的重連失敗,客戶端都必須保證自身的資源被及時釋放。

client部分源碼:

public class MyClient {

    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    //...

    public void connect(int port, String host) throws Exception {

        // 配置客戶端NIO線程組

        try {
            //...
            ChannelFuture future = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(MyConstant.LOCAL_IP, MyConstant.LOCAL_PORT)).sync();
            future.channel().closeFuture().sync();
        } finally {
            // 所有資源釋放完成之后,清空資源,再次發起重連操作
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        try {
                            connect(MyConstant.REMOTE_PORT, MyConstant.REMOTE_IP); // 發起重連操作
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    public static void main(String[] args) throws Exception {
        new MyClient().connect(MyConstant.REMOTE_PORT, MyConstant.REMOTE_IP);
    }

}

參考:《Netty in Action》、《Netty權威指南》。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM