Netty——心跳機制


 

前言

所謂心跳, 即在 TCP 長連接中, 客戶端和服務器之間定期發送的一種特殊的數據包, 通知對方自己還在線, 以確保 TCP 連接的有效性。

心跳包還有另一個作用,經常被忽略,即:一個連接如果長時間不用,防火牆或者路由器就會斷開該連接

 

操作系統內核心跳

Netty 是 基於 TCP 協議開發的,在四層協議 TCP 協議的實現中也提供了 keepalive 報文用來探測對端是否可用。

TCP 層將在定時時間到后發送相應的 KeepAlive 探針以確定連接可用性。

tcp-keepalive操作系統內核支持,但是默認不開啟,應用需要自行開啟,開啟之后有三個參數會生效,來決定一個 keepalive 的行為。

net.ipv4.tcp_keepalive_time = 7200
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_intvl = 75
  • tcp_keepalive_time: 在 TCP 保活打開的情況下,最后一次數據交換到 TCP 發送第一個保活探測包的間隔,即允許的持續空閑時長,或者說每次正常發送心跳的周期,默認值為7200s(2h)
  • tcp_keepalive_probes: 在 tcp_keepalive_time 之后,沒有接收到對方確認,繼續發送保活探測包次數,默認值為9(次)
  • tcp_keepalive_intvl:在 tcp_keepalive_time 之后,沒有接收到對方確認,繼續發送保活探測包的發送頻率,默認值為75s 

可以通過如下命令查看系統tcp-keepalive參數配置:

sysctl -a | grep keepalive

cat /proc/sys/net/ipv4/tcp_keepalive_time

sysctl net.ipv4.tcp_keepalive_time

Netty 中設置 tcp-keepalive :

bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
               @Override
               protected void initChannel(SocketChannel channel) throws Exception {
                   ProtostuffCodecUtil util = new ProtostuffCodecUtil();
                   ChannelPipeline pipeline = channel.pipeline();
                   pipeline.addLast(new ProtostuffEncoder(util));
                   pipeline.addLast(new ProtostuffDecoder(util));
                   pipeline.addLast(handler);
               }
           }).option(ChannelOption.SO_BACKLOG, 1024)
             .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
             .childOption(ChannelOption.SO_KEEPALIVE, true)
             .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

TCP KeepAlive 是用於檢測連接的死活,而心跳機制則附帶一個額外的功能:檢測通訊雙方的存活狀態。兩者聽起來似乎是一個意思,但實際上卻大相徑庭。

考慮一種情況,某台服務器因為某些原因導致負載超高,CPU 100%,無法響應任何業務請求,但是使用 TCP 探針則仍舊能夠確定連接狀態,

這就是典型的連接活着但業務提供方已死的狀態,對客戶端而言,這時的最好選擇就是斷線后重新連接其他服務器,

而不是一直認為當前服務器是可用狀態一直向當前服務器發送些必然會失敗的請求。

所以基礎協議對應用來說不是那么盡善盡美,一個 Netty 服務端可能會面臨上萬個連接,如何去維護這些連接是應用應該去處理的事情。

 

應用實現心跳

Netty心跳檢測說明

1、在 Netty 中, 實現心跳機制的關鍵是 IdleStateHandler, 看下它的構造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

三個參數的含義如下:

  • readerIdleTimeSeconds: 讀超時。即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發一個 READER_IDLE IdleStateEvent 事件。
  • writerIdleTimeSeconds: 寫超時。 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個 WRITER_IDLE IdleStateEvent 事件。
  • allIdleTimeSeconds: 讀/寫超時。 即當在指定的時間間隔內沒有讀或寫操作時, 會觸發一個 ALL_IDLE IdleStateEvent 事件。
2、要實現Netty服務端心跳檢測機制需要在服務器端的 ChannelInitializer中加入如下的代碼:
 
        
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));

 

Netty心跳檢測代碼示例

服務端Server類:

public class HeartBeatServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            //IdleStateHandler的readerIdleTime參數指定超過5秒還沒收到客戶端的連接,
                            //會觸發IdleStateEvent事件並且交給下一個handler處理,下一個handler必須
                            //實現userEventTriggered方法處理對應事件
                            pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
                            pipeline.addLast(new HeartBeatServerHandler());
                        }
                    });
            System.out.println("netty server start。。");
            ChannelFuture future = bootstrap.bind(9000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

服務端Handler處理類:

@Slf4j
public class HeartBeatServerHandler extends SimpleChannelInboundHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("server channelActive");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        if ("heartbeat".equals(message)) {
            log.info(ctx.channel().remoteAddress() + "===>server: " + message);
            ctx.write("heartbeat");
            ctx.flush();
        }
    }

    /**
     * 如果5s沒有讀請求,則向客戶端發送心跳
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (IdleState.READER_IDLE.equals((event.state()))) {
                ctx.writeAndFlush("heartbeat").addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

客戶端Client類:

public class HeartBeatClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                 pipeline.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)); pipeline.addLast(
new HeartBeatClientHandler()); } });         ChannelFuture future = bootstrap.connect("127.0.0.1", 9000).sync();
        future.channel().writeAndFlush("Hello world, i'm online");
        future.channel().closeFuture().sync();
}
catch (Exception e) { e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } }

客戶端Handler類:

@Slf4j
public class HeartBeatClientHandler extends SimpleChannelInboundHandler {

    /** 客戶端請求的心跳命令 */
    private static final ByteBuf HEARTBEAT_SEQUENCE =
            Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("heartbeat", CharsetUtil.UTF_8));

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String)msg;
        if("heartbeat".equals(message)) {
            log.info(ctx.channel().remoteAddress() + "===>client: " + msg);
        }
    }

    /**
     * 如果4s沒有收到寫請求,則向服務端發送心跳請求
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if(IdleState.WRITER_IDLE.equals(event.state())) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("client channelActive");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("Client is close");
    }
}

解釋一下代碼的邏輯:

服務端添加了以下代碼:每隔5s檢查一下是否有讀事件發生,如果沒有就處罰 handler 中的 userEventTriggered(ChannelHandlerContext ctx, Object evt)邏輯。

pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));

客戶端添加了以下代碼:每隔4s檢查一下是否有寫事件,如果沒有就觸發 handler 中的 userEventTriggered(ChannelHandlerContext ctx, Object evt)邏輯。

pipeline.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

 

Netty心跳源碼分析

心跳檢測也是一種 Handler,在啟動時添加到 ChannelPipeline 管道中,當有讀寫操作時消息在其中傳遞。

1、首先我們看到 IdleStateHandler 繼承了 ChannelDuplexHandler:

public class IdleStateHandler extends ChannelDuplexHandler {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (this.readerIdleTimeNanos > 0L || this.allIdleTimeNanos > 0L) {
            this.reading = true;
            this.firstReaderIdleEvent = this.firstAllIdleEvent = true;
        }

        ctx.fireChannelRead(msg);
    }
}

表明 IdleStateHandler 也可以同時處理入站和出站事件,所以可以同時監控讀事件和寫事件。

2、然后IdleStateHandler 的 channelActive() 方法在 socket 通道建立時被觸發:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    this.initialize(ctx);
    super.channelActive(ctx);
}

3、其中 channelActive() 方法調用 Initialize() 方法,根據配置的 readerIdleTimewriteIdleTIme 等超時事件參數往任務隊列 taskQueue 中添加定時任務 task:

private void initialize(ChannelHandlerContext ctx) {
  // Avoid the case where destroy() is called before scheduling timeouts.
  // See: https://github.com/netty/netty/issues/143
  //這里判斷狀態,避免重復初始化
  switch (state) {
    case 1:
    case 2:
      return;
  }

  state = 1;

  EventExecutor loop = ctx.executor();
    //初始化最后一次讀寫時間
  lastReadTime = lastWriteTime = System.nanoTime();
  // 根據用戶設置的讀空閑時間啟動一個定時任務,讀空閑時間為頻率執行
  // 這里的 schedule 方法會調用 eventLoop 的 schedule 方法,將定時任務添加進隊列中
  if (readerIdleTimeNanos > 0) {
    readerIdleTimeout = loop.schedule(
      new ReaderIdleTimeoutTask(ctx),
      readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  }
  // 根據用戶設置的寫空閑時間啟動一個定時任務,寫空閑時間為頻率執行
  if (writerIdleTimeNanos > 0) {
    writerIdleTimeout = loop.schedule(
      new WriterIdleTimeoutTask(ctx),
      writerIdleTimeNanos, TimeUnit.NANOSECONDS);
  }
  // 根據用戶設置的讀寫空閑時間啟動一個定時任務,讀寫空閑時間為頻率執行
  if (allIdleTimeNanos > 0) {
    allIdleTimeout = loop.schedule(
      new AllIdleTimeoutTask(ctx),
      allIdleTimeNanos, TimeUnit.NANOSECONDS);
  }
}

上面有一個 state 字段:

private byte state; 
0:初始狀態,1:已經初始化, 2: 已經銷毀。

上面的 switch 判斷只有當前狀態為 0 即初始化狀態的時候才執行下面的操作,避免多次提交定時任務。

定時任務添加到對應線程 EventLoopExecutor 對應的任務隊列 taskQueue 中,在對應線程的 run() 方法中循環執行:

  • 用當前時間減去最后一次 channelRead 方法調用的時間判斷是否空閑超時;
  • 如果空閑超時則創建空閑超時事件並傳遞到 channelPipeline 中。

只要給定的參數大於0,就創建一個定時任務,每個事件都創建。同時,將 state 狀態設置為 1,防止重復初始化。

讀事件處理:ReaderIdleTimeoutTask

private final class ReaderIdleTimeoutTask implements Runnable {

  private final ChannelHandlerContext ctx;

  ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
    this.ctx = ctx;
  }

  @Override
  public void run() {
    if (!ctx.channel().isOpen()) {
      return;
    }
    // nextDelay = 當前時間-最后一次時間
    long nextDelay = readerIdleTimeNanos;
    if (!reading) {
      nextDelay -= System.nanoTime() - lastReadTime;
    }

    if (nextDelay <= 0) {
      // 重新定義readerIdleTimeout schedule,與initialize方法設置的相同,繼續執行定時任務
      readerIdleTimeout =
        ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
      try {
        // event = new IdleStateEvent(IdleState.READER_IDLE, true),將event設置為讀空閑
        IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, firstReaderIdleEvent);
        if (firstReaderIdleEvent) {
          firstReaderIdleEvent = false;
        }
        //channelIdle的主要工作就是將evt傳輸給下一個Handler
        channelIdle(ctx, event);
      } catch (Throwable t) {
        ctx.fireExceptionCaught(t);
      }
    } else {
      // 如果nextDelay>0,則說明客戶端在規定時間內已經寫入數據了
      // 重新定義readerIdleTimeout schedule,以nextDelay為執行頻率
      readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
    }
  }
}

nextDelay的初始化值為超時秒數readerIdleTimeNanos,如果檢測的時候沒有正在讀,就計算多久沒讀了:

nextDelay = nextDelay - 當前時間 - 上次讀取時間

如果小於0,說明左邊的 readerIdleTimeNanos 小於空閑時間(當前時間 - 上次讀取時間),表示已經超時,
創建 IdleStateEvent 事件,IdleState 枚舉值為 READER_IDLE,然后調用 channelIdle(ctx, event) 方法分發給下一個 ChannelInboundHandler。

總的來說,每次讀取操作都會記錄一個時間,定時任務時間到了,會計算當前時間和最后一次讀的時間的間隔,如果間隔超過了設置的時間,就觸發 UserEventTriggered() 方法。

 

Netty心跳機制總結

Netty 通過 IdleStateHandler 實現最常見的心跳機制不是一種雙向心跳的 PING-PONG 模式,而是客戶端發送心跳數據包,服務端接收心跳但不回復,

因為如果服務端同時有上千個連接,心跳的回復需要消耗大量網絡資源。

如果服務端一段時間內一直沒收到客戶端的心跳數據包則認為客戶端已經下線,將通道關閉避免資源的浪費。在這種心跳模式下服務端可以感知客戶端的存活情況,

無論是宕機的正常下線還是網絡問題的非正常下線,服務端都能感知到,而客戶端不能感知到服務端的非正常下線。

要想實現客戶端感知服務端的存活情況,需要進行雙向的心跳;Netty 中的 channelInactive() 方法是通過 Socket 連接關閉時揮手數據包觸發的

因此可以通過 channelInactive() 方法感知正常的下線情況,但是因為網絡異常等非正常下線則無法感知。

上面的示例只做了客戶端和服務端雙向心跳測試,大家可以補充一下如果一段時間內都收到的是客戶端的心跳包則判定連接無效關閉連接的邏輯。

 

 

引用:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM