java網絡編程-netty(心跳檢測、重連機制)


一、什么是Netty?

Netty 是一個利用 Java 的高級網絡的能力,隱藏其背后的復雜性而提供一個易於使用的 API 的客戶端/服務器框架。
Netty 是一個廣泛使用的 Java 網絡編程框架(Netty 在 2011 年獲得了Duke's Choice Award,見 https://www.java.net/dukeschoice/2011)。它活躍和成長於用戶社區,像大型公司 Facebook 和 Instagram 以及流行 開源項目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其強大的對於網絡抽象的核心代碼。

二、Netty和Tomcat有什么區別?

Netty和Tomcat最大的區別就在於通信協議,Tomcat是基於Http協議的,他的實質是一個基於http協議的web容器,但是Netty不一樣,他能通過編程自定義各種協議,因為netty能夠通過codec自己來編碼/解碼字節流,完成類似redis訪問的功能,這就是netty和tomcat最大的不同。

有人說netty的性能就一定比tomcat性能高,其實不然,tomcat從6.x開始就支持了nio模式,並且后續還有APR模式——一種通過jni調用apache網絡庫的模式,相比於舊的bio模式,並發性能得到了很大提高,特別是APR模式,而netty是否比tomcat性能更高,則要取決於netty程序作者的技術實力了。

三、為什么Netty受歡迎?

如第一部分所述,netty是一款收到大公司青睞的框架,在我看來,netty能夠受到青睞的原因有三:

  1. 並發高
  2. 傳輸快
  3. 封裝好

四、Netty為什么並發高

Netty是一款基於NIO(Nonblocking I/O,非阻塞IO)開發的網絡通信框架,對比於BIO(Blocking I/O,阻塞IO),他的並發性能得到了很大提高,兩張圖讓你了解BIO和NIO的區別:

 

 

 

 

從這兩圖可以看出,NIO的單線程能處理連接的數量比BIO要高出很多,而為什么單線程能處理更多的連接呢?原因就是圖二中出現的 Selector
當一個連接建立之后,他有兩個步驟要做,第一步是接收完客戶端發過來的全部數據,第二步是服務端處理完請求業務之后返回response給客戶端。NIO和BIO的區別主要是在第一步。
在BIO中,等待客戶端發數據這個過程是阻塞的,這樣就造成了一個線程只能處理一個請求的情況,而機器能支持的最大線程數是有限的,這就是為什么BIO不能支持高並發的原因。
而NIO中,當一個Socket建立好之后,Thread並不會阻塞去接受這個Socket,而是將這個請求交給Selector,Selector會不斷的去遍歷所有的Socket,一旦有一個Socket建立完成,他會通知Thread,然后Thread處理完數據再返回給客戶端—— 這個過程是不阻塞的,這樣就能讓一個Thread處理更多的請求了。
下面兩張圖是基於BIO的處理流程和netty的處理流程,輔助你理解兩種方式的差別:

 

 

 

 除了BIO和NIO之外,還有一些其他的IO模型,下面這張圖就表示了五種IO模型的處理流程:

 

 

  • BIO,同步阻塞IO,阻塞整個步驟,如果連接少,他的延遲是最低的,因為一個線程只處理一個連接,適用於少連接且延遲低的場景,比如說數據庫連接。
  • NIO,同步非阻塞IO,阻塞業務處理但不阻塞數據接收,適用於高並發且處理簡單的場景,比如聊天軟件。
  • 多路復用IO,他的兩個步驟處理是分開的,也就是說,一個連接可能他的數據接收是線程a完成的,數據處理是線程b完成的,他比BIO能處理更多請求。
  • 信號驅動IO,這種IO模型主要用在嵌入式開發,不參與討論。
  • 異步IO,他的數據請求和數據處理都是異步的,數據請求一次返回一次,適用於長連接的業務場景。

五、Netty為什么傳輸快

Netty的傳輸快其實也是依賴了NIO的一個特性—— 零拷貝。我們知道,Java的內存有堆內存、棧內存和字符串常量池等等,其中堆內存是占用內存空間最大的一塊,也是Java對象存放的地方,一般我們的數據如果需要從IO讀取到堆內存,中間需要經過Socket緩沖區,也就是說一個數據會被拷貝兩次才能到達他的的終點,如果數據量大,就會造成不必要的資源浪費。
Netty針對這種情況,使用了NIO中的另一大特性——零拷貝,當他需要接收數據的時候,他會在堆內存之外開辟一塊內存,數據就直接從IO讀到了那塊內存中去,在netty里面通過ByteBuf可以直接對這些數據進行直接操作,從而加快了傳輸速度。

 

 

 

 六、為什么說Netty封裝好?

要說Netty為什么封裝好,這種用文字是說不清的,直接上代碼:

  • 阻塞I/O
public class PlainOioServer {

    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);     //1
        try {
            for (;;) {
                final Socket clientSocket = socket.accept();    //2
                System.out.println("Accepted connection from " + clientSocket);

                new Thread(new Runnable() {                        //3
                    @Override
                    public void run() {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));                            //4
                            out.flush();
                            clientSocket.close();                //5

                        } catch (IOException e) {
                            e.printStackTrace();
                            try {
                                clientSocket.close();
                            } catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start();                                        //6
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  • 非阻塞IO
public class PlainNioServer {
    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);                                            //1
        Selector selector = Selector.open();                        //2
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);    //3
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (;;) {
            try {
                selector.select();                                    //4
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();    //5
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {                //6
                        ServerSocketChannel server =
                                (ServerSocketChannel)key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE |
                                SelectionKey.OP_READ, msg.duplicate());    //7
                        System.out.println(
                                "Accepted connection from " + client);
                    }
                    if (key.isWritable()) {                //8
                        SocketChannel client =
                                (SocketChannel)key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer)key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) {        //9
                                break;
                            }
                        }
                        client.close();                    //10
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        // 在關閉時忽略
                    }
                }
            }
        }
    }
}
  • Netty
public class NettyOioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();        //1

            b.group(group)                                    //2
             .channel(OioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {//3
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {            //4
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
                         }
                     });
                 }
             });
            ChannelFuture f = b.bind().sync();  //6
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();        //7
        }
    }
}

從代碼量上來看,Netty就已經秒殺傳統Socket編程了,但是這一部分博大精深,僅僅貼幾個代碼豈能說明問題,在這里給大家介紹一下Netty的一些重要概念,讓大家更理解Netty。

  • Channel
    數據傳輸流,與channel相關的概念有以下四個,上一張圖讓你了解netty里面的Channel。


     
    Channel一覽
    • Channel,表示一個連接,可以理解為每一個請求,就是一個Channel。
    • ChannelHandler,核心處理業務就在這里,用於處理業務請求。
    • ChannelHandlerContext,用於傳輸業務數據。
    • ChannelPipeline,用於保存處理過程需要用到的ChannelHandler和ChannelHandlerContext。
  • ByteBuf
    ByteBuf是一個存儲字節的容器,最大特點就是使用方便,它既有自己的讀索引和寫索引,方便你對整段字節緩存進行讀寫,也支持get/set,方便你對其中每一個字節進行讀寫,他的數據結構如下圖所示:
 
ByteBuf數據結構

他有三種使用模式:

  1. Heap Buffer 堆緩沖區
    堆緩沖區是ByteBuf最常用的模式,他將數據存儲在堆空間。
  2. Direct Buffer 直接緩沖區
    直接緩沖區是ByteBuf的另外一種常用模式,他的內存分配都不發生在堆,jdk1.4引入的nio的ByteBuffer類允許jvm通過本地方法調用分配內存,這樣做有兩個好處
    • 通過免去中間交換的內存拷貝, 提升IO處理速度; 直接緩沖區的內容可以駐留在垃圾回收掃描的堆區以外。
    • DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的內存, GC對此”無能為力”,也就意味着規避了在高負載下頻繁的GC過程對應用線程的中斷影響.
  3. Composite Buffer 復合緩沖區
    復合緩沖區相當於多個不同ByteBuf的視圖,這是netty提供的,jdk不提供這樣的功能。

除此之外,他還提供一大堆api方便你使用,在這里我就不一一列出了,具體參見ByteBuf字節緩存

  • Codec
    Netty中的編碼/解碼器,通過他你能完成字節與pojo、pojo與pojo的相互轉換,從而達到自定義協議的目的。
    在Netty里面最有名的就是HttpRequestDecoder和HttpResponseEncoder了。

七、實戰

1、引入相關依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.pubing</groupId>
  <artifactId>helloNetty</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>
    </dependencies>
  
  
</project>

2、服務端

package com.zhouzhiyao.netty;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
 
//實現客戶端發送請求,服務器端會返回Hello Netty
public class HelloNettyServer {
    public static void main(String[] args) throws InterruptedException {
        /**
         * 定義一對線程組(兩個線程池)
         * 
         */
        //主線程組,用於接收客戶端的鏈接,但不做任何處理
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //定義從線程組,主線程組會把任務轉給從線程組進行處理
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            /**
             * 服務啟動類,任務分配自動處理
             * 
             */
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //需要去針對一個之前的線程模型(上面定義的是主從線程)
            serverBootstrap.group(bossGroup, workerGroup)
                //設置NIO的雙向通道
                .channel(NioServerSocketChannel.class)
                //子處理器,用於處理workerGroup
                /**
                 * 設置chanel初始化器
                 * 每一個chanel由多個handler共同組成管道(pipeline)
                 */
                .childHandler(new HelloNettyServerInitializer());    
            
            /**
             * 啟動
             * 
             */
            //綁定端口,並設置為同步方式,是一個異步的chanel
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            
            /**
             * 關閉
             */
            //獲取某個客戶端所對應的chanel,關閉並設置同步方式
            future.channel().closeFuture().sync();
            
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            //使用一種優雅的方式進行關閉
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
        
        
        
        
    
        
    }
}

3、建立初始化器HelloNettyServerInitializer

package com.zhouzhiyao.netty;
 
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
 
/**
 * 這是一個初始化器,chanel注冊后會執行里面相應的初始化方法(也就是將handler逐個進行添加)
 * 
 * @author phubing
 *
 */
public class HelloNettyServerInitializer extends ChannelInitializer<SocketChannel>{
 
    //對chanel進行初始化
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //通過socketChannel去獲得對應的管道
        ChannelPipeline channelPipeline = socketChannel.pipeline();
        /**
         * pipeline中會有很多handler類(也稱之攔截器類)
         * 獲得pipeline之后,可以直接.add,添加不管是自己開發的handler還是netty提供的handler
         * 
         */
        //一般來講添加到last即可,添加了一個handler並且取名為HttpServerCodec
        //當請求到達服務端,要做解碼,響應到客戶端做編碼
        channelPipeline.addLast("HttpServerCodec", new HttpServerCodec());
        //添加自定義的CustomHandler這個handler,返回Hello Netty
        channelPipeline.addLast("customHandler", new CustomHandler());
        
    }
    
}

4、建立自定義處理器CustomHandler

package com.zhouzhiyao.netty;
 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
 
/**
 * 創建自定義助手類
 * @author phubing
 *
 */
//處理的請求是:客戶端向服務端發起送數據,先把數據放在緩沖區,服務器端再從緩沖區讀取,類似於[ 入棧, 入境 ]
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject>{//Http請求,所以使用HttpObject
 
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel注冊");
        super.channelRegistered(ctx);
    }
 
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel注冊");
        super.channelUnregistered(ctx);
    }
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel活躍狀態");
        super.channelActive(ctx);
    }
 
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客戶端與服務端斷開連接之后");
        super.channelInactive(ctx);
    }
 
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel讀取數據完畢");
        super.channelReadComplete(ctx);
    }
 
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("用戶事件觸發");
        super.userEventTriggered(ctx, evt);
    }
 
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel可寫事件更改");
        super.channelWritabilityChanged(ctx);
    }
 
    @Override
    //channel發生異常,若不關閉,隨着異常channel的逐漸增多,性能也就隨之下降
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("捕獲channel異常");
        super.exceptionCaught(ctx, cause);
    }
 
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("助手類添加");
        super.handlerAdded(ctx);
    }
 
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("助手類移除");
        super.handlerRemoved(ctx);
    }
 
    /**
     * ChannelHandlerContext:上下文對象
     * 
     * 
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        //獲取當前channel
        Channel currentChannel = ctx.channel();
        
        //判斷msg是否為一個HttpRequest的請求類型
        if(msg instanceof HttpRequest) {
 
            //客戶端遠程地址
            System.out.println(currentChannel.remoteAddress());
            /**
             * 
             * 未加判斷類型,控制台打印的遠端地址如下:
             * 
                /0:0:0:0:0:0:0:1:5501
                /0:0:0:0:0:0:0:1:5501
                /0:0:0:0:0:0:0:1:5502
                /0:0:0:0:0:0:0:1:5502
                /0:0:0:0:0:0:0:1:5503
                /0:0:0:0:0:0:0:1:5503
             * 
             * 原因是接收的MSG沒有進行類型判斷
             * 
             * 
             * 增加了判斷,為何還會打印兩次?
             * 
                /0:0:0:0:0:0:0:1:5605
                /0:0:0:0:0:0:0:1:5605
             * 
             * 打開瀏覽器的network會發現,客戶端對服務端進行了兩次請求:
             *     1、第一次是所需的
             *  2、第二次是一個icon
             *  因為沒有加路由(相當於Springmvc中的requestMapping),只要發起請求,就都會到handler中去
             * 
             */
            /**
             * 在Linux中也可以通過CURL 本機Ip:端口號 發送請求(只打印一次,干凈的請求)            
             */
            
            //定義發送的消息(不是直接發送,而是要把數據拷貝到緩沖區,通過緩沖區)
            //Unpooed:是一個專門用於拷貝Buffer的深拷貝,可以有一個或多個
            //CharsetUtil.UTF_8:Netty提供
            ByteBuf content = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8);
            
            //構建一個HttpResponse,響應客戶端
            FullHttpResponse response = 
                    /**
                     * params1:針對Http的版本號
                     * params2:狀態(響應成功或失敗)
                     * params3:內容
                     */
                    //HttpVersion.HTTP_1_1:默認開啟keep-alive
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            //設置當前內容長度、類型等
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            //readableBytes:可讀長度
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            
            //通過長下文對象,把響應刷到客戶端
            ctx.writeAndFlush(response);
            
        }
        
    }
 
}

5、客戶端

public class NettyClient extends SimpleChannelInboundHandler<Response> {

    private final String ip;
    private final int port;
    private Response response;

    public NettyClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
        this.response = response;
    }

    public Response client(Request request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {

            // 創建並初始化 Netty 客戶端 Bootstrap 對象
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();

                    pipeline.addLast(new RpcDecoder(Response.class));
                    pipeline.addLast(new RpcEncoder(Request.class));
                    pipeline.addLast(NettyClient.this);
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);


//            String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":");

            // 連接 RPC 服務器
            ChannelFuture future = bootstrap.connect(ip, port).sync();

            // 寫入 RPC 請求數據並關閉連接
            Channel channel = future.channel();

            channel.writeAndFlush(request).sync();
            channel.closeFuture().sync();

            return response;
        } finally {
            group.shutdownGracefully();
        }
    }

6、心跳機制

netty心跳機制示例,使用Netty實現心跳機制,使用netty4,IdleStateHandler 實現

服務端添加IdleStateHandler心跳檢測處理器,並添加自定義處理Handler類實現userEventTriggered()方法作為超時事件的邏輯處理;

設定IdleStateHandler心跳檢測每五秒進行一次讀檢測,如果五秒內ChannelRead()方法未被調用則觸發一次userEventTrigger()方法

ServerBootstrap b= new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG,1024)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
             socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
                socketChannel.pipeline().addLast(new StringDecoder());
                socketChannel.pipeline().addLast(new HeartBeatServerHandler());
            }
        });
  • 自定義處理類Handler繼承ChannlInboundHandlerAdapter,實現其userEventTriggered()方法,在出現超時事件時會被觸發,包括讀空閑超時或者寫空閑超時;
class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
    private int lossConnectCount = 0;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("已經5秒未收到客戶端的消息了!");
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.READER_IDLE){
                lossConnectCount++;
                if (lossConnectCount>2){
                    System.out.println("關閉這個不活躍通道!");
                    ctx.channel().close();
                }
            }
        }else {
            super.userEventTriggered(ctx,evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        lossConnectCount = 0;
        System.out.println("client says: "+msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客戶端添加IdleStateHandler心跳檢測處理器,並添加自定義處理Handler類實現userEventTriggered()方法作為超時事件的邏輯處理;

設定IdleStateHandler心跳檢測每四秒進行一次寫檢測,如果四秒內write()方法未被調用則觸發一次userEventTrigger()方法,實現客戶端每四秒向服務端發送一次消息;

Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
                socketChannel.pipeline().addLast(new StringEncoder());
                socketChannel.pipeline().addLast(new HeartBeatClientHandler());
            }
        });
  • 自定義處理類Handler繼承ChannlInboundHandlerAdapter,實現自定義userEventTrigger()方法,如果出現超時時間就會被觸發,包括讀空閑超時或者寫空閑超時;
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("客戶端循環心跳監測發送: "+new Date());
    if (evt instanceof IdleStateEvent){
        IdleStateEvent event = (IdleStateEvent)evt;
        if (event.state()== IdleState.WRITER_IDLE){
            if (curTime<beatTime){
                curTime++;
                ctx.writeAndFlush("biubiu");
            }
        }
    }
}

7、客戶端重連

/**
     * 連接服務端 and 重連
     */
    protected void doConnect() {
 
        if (channel != null && channel.isActive()){
            return;
        }       
        ChannelFuture connect = bootstrap.connect("127.0.0.1", 8081);
        //實現監聽通道連接的方法
        connect.addListener(new ChannelFutureListener() {
 
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
 
                if(channelFuture.isSuccess()){
                    channel = channelFuture.channel();
                    System.out.println("連接服務端成功");
                }else{
                    System.out.println("每隔2s重連....");
                    channelFuture.channel().eventLoop().schedule(new Runnable() {
 
                        @Override
                        public void run() {
                            doConnect();
                        }
                    },2,TimeUnit.SECONDS);
                }   
            }
        });     
    } 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM