Netty 中的心跳檢測機制


心跳檢測一般存在於建立長連接 或者 需要保活的場景。

心跳的使用場景

長連接的應用場景非常的廣泛,比如監控系統,IM系統,即時報價系統,推送服務等等。像這些場景都是比較注重實時性,如果每次發送數據都要進行一次DNS解析,建立連接的過程肯定是極其影響體驗。

而長連接的維護必然需要一套機制來控制。比如 HTTP/1.0 通過在 header 頭中添加 Connection:Keep-Alive參數,如果當前請求需要保活則添加該參數作為標識,否則服務端就不會保持該連接的狀態,發送完數據之后就關閉連接。HTTP/1.1以后 Keep-Alive 是默認打開的。

Netty 是 基於 TCP 協議開發的,在四層協議 TCP 協議的實現中也提供了 keepalive 報文用來探測對端是否可用。TCP 層將在定時時間到后發送相應的 KeepAlive 探針以確定連接可用性。

tcp-keepalive,操作系統內核支持,但是不默認開啟,應用需要自行開啟,開啟之后有三個參數會生效,來決定一個 keepalive 的行為。

net.ipv4.tcp_keepalive_time = 7200
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_intvl = 75

tcp_keepalive_time: 在 TCP 保活打開的情況下,最后一次數據交換到 TCP 發送第一個保活探測包的間隔,即允許的持續空閑時長,或者說每次正常發送心跳的周期,默認值為7200s(2h);

tcp_keepalive_probes: 在 tcp_keepalive_time 之后,沒有接收到對方確認,繼續發送保活探測包次數,默認值為9(次);

tcp_keepalive_intvl:在 tcp_keepalive_time 之后,沒有接收到對方確認,繼續發送保活探測包的發送頻率,默認值為75s。

TCP KeepAlive 是用於檢測連接的死活,而心跳機制則附帶一個額外的功能:檢測通訊雙方的存活狀態。兩者聽起來似乎是一個意思,但實際上卻大相徑庭。

考慮一種情況,某台服務器因為某些原因導致負載超高,CPU 100%,無法響應任何業務請求,但是使用 TCP 探針則仍舊能夠確定連接狀態,這就是典型的連接活着但業務提供方已死的狀態,對客戶端而言,這時的最好選擇就是斷線后重新連接其他服務器,而不是一直認為當前服務器是可用狀態一直向當前服務器發送些必然會失敗的請求。

可以通過如下命令查看系統tcp-keepalive參數配置:

sysctl -a | grep keepalive

cat /proc/sys/net/ipv4/tcp_keepalive_time

sysctl net.ipv4.tcp_keepalive_time

Netty 中也提供了設置 tcp-keepalive 的設置:

設置:ChannelOption.SO_KEEPALIVE, true 表示打開 TCP 的 keepAlive 設置。

所以基礎協議對應用來說不是那么盡善盡美,一個 Netty 服務端可能會面臨上萬個連接,如何去維護這些連接是應用應該去處理的事情。在 Netty 中提供了 IdleStateHandler 類專門用於處理心跳。

IdleStateHandler 的構造函數如下:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, 
                        long allIdleTime,TimeUnit unit){
  
}

第一個參數是隔多久檢查一下讀事件是否發生,如果 channelRead() 方法超過 readerIdleTime 時間未被調用則會觸發超時事件調用 userEventTrigger() 方法;

第二個參數是隔多久檢查一下寫事件是否發生,writerIdleTime 寫空閑超時時間設定,如果 write() 方法超過 writerIdleTime 時間未被調用則會觸發超時事件調用 userEventTrigger() 方法;

第三個參數是全能型參數,隔多久檢查讀寫事件;

第四個參數表示當前的時間單位。

所以這里可以分別控制讀,寫,讀寫超時的時間,單位為秒,如果是0表示不檢測,所以如果全是0,則相當於沒添加這個 IdleStateHandler,連接是個普通的短連接。

Netty 中的心跳邏輯

下面演示一下在 Netty 中如果使用 IdleStateHandler, 整體代碼流程請見 :

gitHub

先上代碼:

Server端:

package com.rickiyang.learn.keepAlive;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description: server 端
 */
@Slf4j
public class KpServer {

    private int port;

    public KpServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerChannelInitializer());

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server start fail",e);
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        KpServer server = new KpServer(7788);
        server.start();
    }
}

Initializer:

package com.rickiyang.learn.keepAlive;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
public class ServerChannelInitializer  extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));

        // 字符串解碼 和 編碼
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 自己的邏輯Handler
        pipeline.addLast("handler", new KpServerHandler());
        }
}

Handler:

package com.rickiyang.learn.keepAlive;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class KpServerHandler extends SimpleChannelInboundHandler {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("server channelActive");
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        if ("heartbeat".equals(message)) {
            log.info(ctx.channel().remoteAddress() + "===>server: " + message);
            ctx.write("heartbeat");
            ctx.flush();
        }
    }

    /**
     * 如果5s沒有讀請求,則向客戶端發送心跳
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (IdleState.READER_IDLE.equals((event.state()))) {
                ctx.writeAndFlush("heartbeat").addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

}

客戶端代碼:

Client:

package com.rickiyang.learn.keepAlive;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class KpClient {

    private  int port;
    private  String address;

    public KpClient(int port, String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ClientChannelInitializer());
        try {
            ChannelFuture future = bootstrap.connect(address,port).sync();
            future.channel().writeAndFlush("Hello world, i'm online");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("client start fail",e);
        }finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        KpClient client = new KpClient(7788,"127.0.0.1");
        client.start();
    }
}

Initializer:

package com.rickiyang.learn.keepAlive;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;


public class ClientChannelInitializer extends  ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 客戶端的邏輯
        pipeline.addLast("handler", new KpClientHandler());
    }
}

Handler:

package com.rickiyang.learn.keepAlive;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class KpClientHandler extends SimpleChannelInboundHandler {


    /** 客戶端請求的心跳命令 */
    private static final ByteBuf HEARTBEAT_SEQUENCE =
            Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("heartbeat", CharsetUtil.UTF_8));

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String)msg;
        if("heartbeat".equals(message)) {
            log.info(ctx.channel().remoteAddress() + "===>client: " + msg);
        }
    }

    /**
     * 如果4s沒有收到寫請求,則向服務端發送心跳請求
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if(IdleState.WRITER_IDLE.equals(event.state())) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("client channelActive");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("Client is close");
    }


}

解釋一下代碼的邏輯:

服務端添加了:

pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));

每隔5s檢查一下是否有讀事件發生,如果沒有就處罰 handler 中的 userEventTriggered(ChannelHandlerContext ctx, Object evt)邏輯。

客戶端添加了:

new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)

每隔4s檢查一下是否有寫事件,如果沒有就觸發 handler 中的 userEventTriggered(ChannelHandlerContext ctx, Object evt)邏輯。

大家可以再本地啟動工程,看一下觸發的邏輯。

IdleStateHandler邏輯分析

心跳檢測也是一種 Handler,在啟動時添加到 ChannelPipeline 管道中,當有讀寫操作時消息在其中傳遞。首先我們看到 IdleStateHandler 繼承了 ChannelDuplexHandler:

public class IdleStateHandler extends ChannelDuplexHandler {
  
  
  ...
}

表明 IdleStateHandler 也可以同時處理入站和出站事件,所以可以同時監控讀事件和寫事件。

IdleStateHandler 的 channelActive() 方法在 socket 通道建立時被觸發:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    initialize(ctx);
    super.channelActive(ctx);
}

其中 channelActive() 方法調用 Initialize() 方法,根據配置的 readerIdleTime、writeIdleTIme 等超時事件參數往任務隊列 taskQueue 中添加定時任務 task:

private void initialize(ChannelHandlerContext ctx) {
  // Avoid the case where destroy() is called before scheduling timeouts.
  // See: https://github.com/netty/netty/issues/143
  //這里判斷狀態,避免重復初始化
  switch (state) {
    case 1:
    case 2:
      return;
  }

  state = 1;

  EventExecutor loop = ctx.executor();
	//初始化最后一次讀寫時間
  lastReadTime = lastWriteTime = System.nanoTime();
  // 根據用戶設置的讀空閑時間啟動一個定時任務,讀空閑時間為頻率執行
  // 這里的 schedule 方法會調用 eventLoop 的 schedule 方法,將定時任務添加進隊列中
  if (readerIdleTimeNanos > 0) {
    readerIdleTimeout = loop.schedule(
      new ReaderIdleTimeoutTask(ctx),
      readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  }
  // 根據用戶設置的寫空閑時間啟動一個定時任務,寫空閑時間為頻率執行
  if (writerIdleTimeNanos > 0) {
    writerIdleTimeout = loop.schedule(
      new WriterIdleTimeoutTask(ctx),
      writerIdleTimeNanos, TimeUnit.NANOSECONDS);
  }
  // 根據用戶設置的讀寫空閑時間啟動一個定時任務,讀寫空閑時間為頻率執行
  if (allIdleTimeNanos > 0) {
    allIdleTimeout = loop.schedule(
      new AllIdleTimeoutTask(ctx),
      allIdleTimeNanos, TimeUnit.NANOSECONDS);
  }
}

看到這里或者沒看這里你也應該能想到,這種監控性的任務肯定是使用定時任務類似這種機制來進行。

上面有一個 state 字段:

private byte state; 
0:初始狀態,1:已經初始化, 2: 已經銷毀。

上面的 switch 判斷只有當前狀態為 0 即初始化狀態的時候才執行下面的操作,避免多次提交定時任務。

定時任務添加到對應線程 EventLoopExecutor 對應的任務隊列 taskQueue 中,在對應線程的 run() 方法中循環執行:

  • 用當前時間減去最后一次 channelRead 方法調用的時間判斷是否空閑超時;
  • 如果空閑超時則創建空閑超時事件並傳遞到 channelPipeline 中。

只要給定的參數大於0,就創建一個定時任務,每個事件都創建。同時,將 state 狀態設置為 1,防止重復初始化。

讀事件處理:ReaderIdleTimeoutTask

來看讀事件是如何處理的, ReaderIdleTimeoutTask:

private final class ReaderIdleTimeoutTask implements Runnable {

  private final ChannelHandlerContext ctx;

  ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
    this.ctx = ctx;
  }

  @Override
  public void run() {
    if (!ctx.channel().isOpen()) {
      return;
    }
		// nextDelay = 當前時間-最后一次時間
    long nextDelay = readerIdleTimeNanos;
    if (!reading) {
      nextDelay -= System.nanoTime() - lastReadTime;
    }

    if (nextDelay <= 0) {
     // 重新定義readerIdleTimeout schedule,與initialize方法設置的相同,繼續執行定時任務
      readerIdleTimeout =
        ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
      try {
       // event = new IdleStateEvent(IdleState.READER_IDLE, true),將event設置為讀空閑
        IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, firstReaderIdleEvent);
        if (firstReaderIdleEvent) {
          firstReaderIdleEvent = false;
        }
				//channelIdle的主要工作就是將evt傳輸給下一個Handler
        channelIdle(ctx, event);
      } catch (Throwable t) {
        ctx.fireExceptionCaught(t);
      }
    } else {
      // 如果nextDelay>0,則說明客戶端在規定時間內已經寫入數據了
      // 重新定義readerIdleTimeout schedule,以nextDelay為執行頻率
      readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
    }
  }
}

nextDelay的初始化值為超時秒數readerIdleTimeNanos,如果檢測的時候沒有正在讀,就計算多久沒讀了:

nextDelay = nextDelay - 當前時間 - 上次讀取時間

如果小於0,說明左邊的 readerIdleTimeNanos 小於空閑時間(當前時間 - 上次讀取時間),表示已經超時,
創建 IdleStateEvent 事件,IdleState 枚舉值為 READER_IDLE,然后調用 channelIdle(ctx, event) 方法分發給下一個 ChannelInboundHandler。

總的來說,每次讀取操作都會記錄一個時間,定時任務時間到了,會計算當前時間和最后一次讀的時間的間隔,如果間隔超過了設置的時間,就觸發 UserEventTriggered() 方法。

寫事件處理:WriterIdleTimeoutTask

寫事件,WriterIdleTimeoutTask:

private final class WriterIdleTimeoutTask implements Runnable {

  private final ChannelHandlerContext ctx;

  WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
    this.ctx = ctx;
  }

  @Override
  public void run() {
    if (!ctx.channel().isOpen()) {
      return;
    }

    long lastWriteTime = IdleStateHandler.this.lastWriteTime;
    long nextDelay = writerIdleTimeNanos - (System.nanoTime() - lastWriteTime);
    if (nextDelay <= 0) {
      // Writer is idle - set a new timeout and notify the callback.
      writerIdleTimeout = ctx.executor().schedule(
        this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
      try {
        IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, firstWriterIdleEvent);
        if (firstWriterIdleEvent) {
          firstWriterIdleEvent = false;
        }

        channelIdle(ctx, event);
      } catch (Throwable t) {
        ctx.fireExceptionCaught(t);
      }
    } else {
      // Write occurred before the timeout - set a new timeout with shorter delay.
      writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
    }
  }
}

寫超時時間:

nextDelay = writerIdleTimeNanos - (System.nanoTime() - lastWriteTime)

寫超時也是跟讀超時同理,每次寫操作都記錄操作時間。

IdleStateHandler 心跳檢測主要是通過向線程任務隊列中添加定時任務,判斷 channelRead() 方法或 write() 方法是否調用空閑超時,如果超時則觸發超時事件執行自定義 userEventTrigger() 方法。

Netty 通過 IdleStateHandler 實現最常見的心跳機制不是一種雙向心跳的 PING-PONG 模式,而是客戶端發送心跳數據包,服務端接收心跳但不回復,因為如果服務端同時有上千個連接,心跳的回復需要消耗大量網絡資源。

如果服務端一段時間內一直收到客戶端的心跳數據包則認為客戶端已經下線,將通道關閉避免資源的浪費。在這種心跳模式下服務端可以感知客戶端的存活情況,無論是宕機的正常下線還是網絡問題的非正常下線,服務端都能感知到,而客戶端不能感知到服務端的非正常下線。

要想實現客戶端感知服務端的存活情況,需要進行雙向的心跳;Netty 中的 channelInactive() 方法是通過 Socket 連接關閉時揮手數據包觸發的,因此可以通過 channelInactive() 方法感知正常的下線情況,但是因為網絡異常等非正常下線則無法感知。上面的示例只做了客戶端和服務端雙向心跳測試,大家可以補充一下如果一段時間內都收到的是客戶端的心跳包則判定連接無效關閉連接的邏輯。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM