記錄初試Netty(2)-服務端心跳檢測


今天在在搭建的netty框架中添加心跳機制,特此記錄一下;

     1.什么是心跳機制?

    • 心跳是在TCP長連接中,客戶端和服務端定時向對方發送數據包通知對方自己還在線,保證連接的有效性的一種機制
    • 在服務器和客戶端之間一定時間內沒有數據交互時, 即處於 idle 狀態時, 客戶端或服務器會發送一個特殊的數據包給對方, 當接收方收到這個數據報文后, 也立即發送一個特殊的數據報文, 回應發送方, 此即一個 PING-PONG 交互. 自然地, 當某一端收到心跳消息后, 就知道了對方仍然在線, 這就確保 TCP 連接的有效性

       2.如何實現?

    • 使用TCP協議層的Keeplive機制,但是該機制默認的心跳時間是2小時,依賴操作系統實現不夠靈活;

    • 應用層實現自定義心跳機制,比如Netty實現心跳機制;

      3.心跳檢測的實現

     (1)服務端

    • 服務端添加IdleStateHandler心跳檢測處理器,並添加自定義處理Handler類實現userEventTriggered()方法作為超時事件的邏輯處理;
    • 設定IdleStateHandler心跳檢測每五秒進行一次讀檢測,如果五秒內ChannelRead()方法未被調用則觸發一次userEventTrigger()方法
 1 ServerBootstrap b= new ServerBootstrap();  2 b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)  3         .option(ChannelOption.SO_BACKLOG,1024)  4         .childHandler(new ChannelInitializer<SocketChannel>() {  5  @Override  6             protected void initChannel(SocketChannel socketChannel) throws Exception {  7              socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));  8                 socketChannel.pipeline().addLast(new StringDecoder());  9                 socketChannel.pipeline().addLast(new HeartBeatServerHandler()); 10  } 11         });

        自定義處理類Handler繼承ChannlInboundHandlerAdapter,實現其userEventTriggered()方法,在出現超時事件時會被觸發,包括讀空閑超時或者寫空閑超時;

 1 class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {  2     private int lossConnectCount = 0;  3 
 4  @Override  5     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  6         System.out.println("已經5秒未收到客戶端的消息了!");  7         if (evt instanceof IdleStateEvent){  8             IdleStateEvent event = (IdleStateEvent)evt;  9             if (event.state()== IdleState.READER_IDLE){ 10                 lossConnectCount++; 11                 if (lossConnectCount>2){ 12                     System.out.println("關閉不活躍通道!"); 13  ctx.channel().close(); 14  } 15  } 16         }else { 17             super.userEventTriggered(ctx,evt); 18  } 19  } 20 
21  @Override 22     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 23         lossConnectCount = 0; 24         System.out.println("client says: "+msg.toString()); 25  } 26 
27  @Override 28     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 29  ctx.close(); 30  } 31 }

    (2)客戶端

  • 客戶端添加IdleStateHandler心跳檢測處理器,並添加自定義處理Handler類實現userEventTriggered()方法作為超時事件的邏輯處理;
  • 設定IdleStateHandler心跳檢測每四秒進行一次寫檢測,如果四秒內write()方法未被調用則觸發一次userEventTrigger()方法,實現客戶端每四秒向服務端發送一次消息;
 1 Bootstrap b = new Bootstrap();  2 b.group(group).channel(NioSocketChannel.class)  3         .handler(new ChannelInitializer<SocketChannel>() {  4  @Override  5             protected void initChannel(SocketChannel socketChannel) throws Exception {  6                 socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));  7                 socketChannel.pipeline().addLast(new StringEncoder());  8                 socketChannel.pipeline().addLast(new HeartBeatClientHandler());  9  } 10  }); 11  
  • 自定義處理類Handler繼承ChannlInboundHandlerAdapter,實現自定義userEventTrigger()方法,如果出現超時時間就會被觸發,包括讀空閑超時或者寫空閑超時;
 1 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  2     System.out.println("客戶端循環心跳監測發送: "+new Date());  3     if (evt instanceof IdleStateEvent){  4         IdleStateEvent event = (IdleStateEvent)evt;  5         if (event.state()== IdleState.WRITER_IDLE){  6             if (curTime<beatTime){  7                 curTime++;  8                 ctx.writeAndFlush("biubiu");  9  } 10  } 11  } 12 } 13  

 

  3.IdleStateHandler源碼分析

 

  • IdleStateHandler構造器

 

    • readerIdleTime讀空閑超時時間設定,如果channelRead()方法超過readerIdleTime時間未被調用則會觸發超時事件調用userEventTrigger()方法;

 

    • writerIdleTime寫空閑超時時間設定,如果write()方法超過writerIdleTime時間未被調用則會觸發超時事件調用userEventTrigger()方法;

 

    • allIdleTime所有類型的空閑超時時間設定,包括讀空閑和寫空閑;

 

    • unit時間單位,包括時分秒等;

 

1 public IdleStateHandler( 2         long readerIdleTime, long writerIdleTime, long allIdleTime, 3  TimeUnit unit) { 4     this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); 5 }
  • 心跳檢測也是一種Handler,在啟動時添加到ChannelPipeline管道中,當有讀寫操作時消息在其中傳遞;
  • socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
    • IdleStateHandler的channelActive()方法在socket通道建立時被觸發
1 @Override 2 public void channelActive(ChannelHandlerContext ctx) throws Exception { 3  initialize(ctx); 4     super.channelActive(ctx); 5 }

 

 

 

 

  • channelActive()方法調用Initialize()方法,根據配置的readerIdleTime,WriteIdleTIme等超時事件參數往任務隊列taskQueue中添加定時任務task ;

 

 

 1 private void initialize(ChannelHandlerContext ctx) {  2     switch (state) {  3     case 1:  4     case 2:  5         return;  6  }  7 
 8     state = 1;  9  initOutputChanged(ctx); 10 
11     lastReadTime = lastWriteTime = ticksInNanos(); 12     if (readerIdleTimeNanos > 0) { 13         readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), 14  readerIdleTimeNanos, TimeUnit.NANOSECONDS); 15  } 16     if (writerIdleTimeNanos > 0) { 17         writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), 18  writerIdleTimeNanos, TimeUnit.NANOSECONDS); 19  } 20     if (allIdleTimeNanos > 0) { 21         allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), 22  allIdleTimeNanos, TimeUnit.NANOSECONDS); 23  } 24 }

 

  • 定時任務添加到對應線程EventLoopExecutor對應的任務隊列taskQueue中,在對應線程的run()方法中循環執行
    • 用當前時間減去最后一次channelRead方法調用的時間判斷是否空閑超時;
    • 如果空閑超時則創建空閑超時事件並傳遞到channelPipeline中;

 

 

 1 protected void run(ChannelHandlerContext ctx) {  2     long nextDelay = readerIdleTimeNanos;  3     if (!reading) {  4         nextDelay -= ticksInNanos() - lastReadTime;  5  }  6 
 7     if (nextDelay <= 0) {  8         // Reader is idle - set a new timeout and notify the callback.
 9         readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); 10 
11         boolean first = firstReaderIdleEvent; 12         firstReaderIdleEvent = false; 13 
14         try { 15             IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); 16  channelIdle(ctx, event); 17         } catch (Throwable t) { 18  ctx.fireExceptionCaught(t); 19  } 20     } else { 21         // Read occurred before the timeout - set a new timeout with shorter delay.
22         readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); 23  } 24 }

 

  • 在管道中傳遞調用自定義的userEventTrigger()方法
    1 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { 2  ctx.fireUserEventTriggered(evt); 3 }

     

  4.總結

    • IdleStateHandler心跳檢測主要是通過向線程任務隊列中添加定時任務,判斷channelRead()方法或write()方法是否調用空閑超時,如果超時則觸發超時事件執行自定義userEventTrigger()方法;
    • Netty通過IdleStateHandler實現最常見的心跳機制不是一種雙向心跳的PING-PONG模式,而是客戶端發送心跳數據包,服務端接收心跳但不回復,因為如果服務端同時有上千個連接,心跳的回復需要消耗大量網絡資源;如果服務端一段時間內內有收到客戶端的心跳數據包則認為客戶端已經下線,將通道關閉避免資源的浪費;在這種心跳模式下服務端可以感知客戶端的存活情況,無論是宕機的正常下線還是網絡問題的非正常下線,服務端都能感知到,而客戶端不能感知到服務端的非正常下線;
    • 要想實現客戶端感知服務端的存活情況,需要進行雙向的心跳;Netty中的channelInactive()方法是通過Socket連接關閉時揮手數據包觸發的,因此可以通過channelInactive()方法感知正常的下線情況,但是因為網絡異常等非正常下線則無法感知;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM