netty 服務器端和客戶端(超時機制)


Netty心跳機制

一、概念介紹
網絡中的接收和發送數據都是使用操作系統中的SOCKET進行實現。但是如果此套接字已經斷開,那發送數據和接收數據的時候就一定會有問題。可是如何判斷這個套接字是否還可以使用呢?這個就需要在系統中創建心跳機制。其實TCP中已經為我們實現了一個叫做心跳的機制。如果你設置了心跳,那TCP就會在一定的時間(比如你設置的是3秒鍾)內發送你設置的次數的心跳(比如說2次),並且此信息不會影響你自己定義的協議。所謂“心跳”就是定時發送一個自定義的結構體(心跳包),讓對方知道自己還活着。 以確保鏈接的有效性。

所謂的心跳包就是客戶端定時發送簡單的信息給服務器端告訴它我還在而已。代碼就是每隔幾分鍾發送一個固定信息給服務端,服務端收到后回復一個固定信息如果服務端幾分鍾內沒有收到客戶端信息則視客戶端斷開。比如有些通信軟件長時間不使用,要想知道它的狀態是在線還是離線就需要心跳包,定時發包收包。發包方:可以是客戶也可以是服務端,看哪邊實現方便合理。一般是客戶端。服務器也可以定時輪詢發心跳下去。心跳包之所以叫心跳包是因為:它像心跳一樣每隔固定時間發一次,以此來告訴服務器,這個客戶端還活着。事實上這是為了保持長連接,至於這個包的內容,是沒有什么特別規定的,不過一般都是很小的包,或者只包含包頭的一個空包。

在TCP的機制里面,本身是存在有心跳包的機制的,也就是TCP的選項。系統默認是設置的是2小時的心跳頻率。但是它檢查不到機器斷電、網線拔出、防火牆這些斷線。而且邏輯層處理斷線可能也不是那么好處理。一般,如果只是用於保活還是可以的。心跳包一般來說都是在邏輯層發送空的包來實現的。下一個定時器,在一定時間間隔下發送一個空包給客戶端,然后客戶端反饋一個同樣的空包回來,服務器如果在一定時間內收不到客戶端發送過來的反饋包,那就只有認定說掉線了。只需要send或者recv一下,如果結果為零,則為掉線。

但是,在長連接下,有可能很長一段時間都沒有數據往來。理論上說,這個連接是一直保持連接的,但是實際情況中,如果中間節點出現什么故障是難以知道的。更要命的是,有的節點(防火牆)會自動把一定時間之內沒有數據交互的連接給斷掉。在這個時候,就需要我們的心跳包了,用於維持長連接,保活。在獲知了斷線之后,服務器邏輯可能需要做一些事情,比如斷線后的數據清理呀,重新連接呀當然,這個自然是要由邏輯層根據需求去做了。總的來說,心跳包主要也就是用於長連接的保活和斷線處理。一般的應用下,判定時間在30-40秒比較不錯。如果實在要求高,那就在6-9秒。

二、心跳實現
使用TCP協議層的Keeplive機制,但是該機制默認的心跳時間是2小時,依賴操作系統實現不夠靈活;

心跳機制一般來說都是在邏輯層發送空的包來實現的,比如Netty的IdleStateHandler類實現心跳機制。

心跳機制實現邏輯:每隔幾分鍾發送一個固定信息給服務端,服務端收到后回復一個固定信息給客戶端,如果服務端幾分鍾內沒有收到客戶端信息則視客戶端斷開。

在Netty中IdleStateHandler主要用來檢測遠端是否存活,如果不存活或活躍則對空閑Socket連接進行處理避免資源的浪費;IdleStateHandler實現對三種心跳的檢測,分別是readerIdleTime、writerIdleTime和allIdleTime,參數解釋如下: 
1)readerIdleTime:讀超時時間
2)writerIdleTime:寫超時時間
3)allIdleTime:所有類型的超時時間

所以在channelPipeline中加入IdleStateHandler,我們在handler中提示的是5秒讀,所以我們服務端的配置的是:

ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));

因為服務端必須5秒接受一次心跳請求,那么客戶端的配置:

ph.addLast( new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

userEventTriggered是Netty 處理心跳超時事件,在IdleStateHandler設置超時時間,如果達到了,就會直接調用該方法。如果沒有超時則不調用。我們重寫該方法的話,就可以自行進行相關的業務邏輯處理了。

三、IdleStateHandler心跳檢測實例
a、服務端
HeartNettyServer——服務端啟動類

復制代碼
package com.dxfx.netty.demo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 服務端啟動類
 * 
 * @author Administrator
 *
 */
public class HeartNettyServer {
     public static void main(String[] args) throws InterruptedException {
         // 首先,netty通過ServerBootstrap啟動服務端
         ServerBootstrap server = new ServerBootstrap();
         EventLoopGroup parentGroup = new NioEventLoopGroup();
         EventLoopGroup childGroup =new NioEventLoopGroup();
         //第1步定義兩個線程組,用來處理客戶端通道的accept和讀寫事件
         //parentGroup用來處理accept事件,childgroup用來處理通道的讀寫事件
         //parentGroup獲取客戶端連接,連接接收到之后再將連接轉發給childgroup去處理
         server.group(parentGroup, childGroup);
         
         //用於構造服務端套接字ServerSocket對象,標識當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度。
         //用來初始化服務端可連接隊列
         //服務端處理客戶端連接請求是按順序處理的,所以同一時間只能處理一個客戶端連接,多個客戶端來的時候,服務端將不能處理的客戶端連接請求放在隊列中等待處理,backlog參數指定了隊列的大小。
         server.option(ChannelOption.SO_BACKLOG, 128);
         
         //第2步綁定服務端通道
         server.channel(NioServerSocketChannel.class);
         
         //第3步綁定handler,處理讀寫事件,ChannelInitializer是給通道初始化
         server.childHandler(new HeartNettyServerFilter());
         
         //第4步綁定8080端口
         ChannelFuture future = server.bind(8080).sync();
         //當通道關閉了,就繼續往下走
         future.channel().closeFuture().sync();
     }
 
}
復制代碼


HeartNettyServerFilter——服務端過濾器,如編解碼和心跳的設置

復制代碼
package com.dxfx.netty.demo;

import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * 服務端過濾器,如編解碼和心跳的設置
 * 
 * @author Administrator
 *
 */
public class HeartNettyServerFilter extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline cp = sc.pipeline();
        cp.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
        
        // 解碼和編碼,應和客戶端一致
        cp.addLast(new StringDecoder());
        cp.addLast(new StringEncoder());
        
        //處理服務端的業務邏輯
        cp.addLast(new HeartNettyServerHandler()); 
    }
}
復制代碼


HeartNettyServerHandler——處理服務端業務邏輯:心跳超時處理、客服端返回的數據處理

復制代碼
package com.dxfx.netty.demo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * 處理服務端業務邏輯:心跳超時處理、客服端返回的數據處理
 * 
 * @author Administrator
 *
 */
public class HeartNettyServerHandler extends ChannelInboundHandlerAdapter {
    /** 空閑次數 */
    private int idle_count = 1;
    /** 發送次數 */
    private int count = 1;

    /**
     * 超時處理,如果5秒沒有收到客戶端的心跳,就觸發; 如果超過兩次,則直接關閉;
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.READER_IDLE.equals(event.state())) { // 如果讀通道處於空閑狀態,說明沒有接收到心跳命令
                if (idle_count > 2) {
                    System.out.println("超過兩次無客戶端請求,關閉該channel");
                    ctx.channel().close();
                }
                
                System.out.println("已等待5秒還沒收到客戶端發來的消息");
                idle_count++;
            }
        } else {
            super.userEventTriggered(ctx, obj);
        }
    }

    /**
     * 業務邏輯處理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("第" + count + "次" + ",服務端收到的消息:" + msg);
        
        String message = (String) msg;
        // 如果是心跳命令,服務端收到命令后回復一個相同的命令給客戶端
        if ("hb_request".equals(message)) { 
            ctx.write("服務端成功收到心跳信息");
            ctx.flush();
        }
        
        count++;
    }

    /**
     * 異常處理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
復制代碼


b、客戶端
HeartNettyClient——客戶端啟動類

復制代碼
package com.dxfx.netty.demo;

import java.io.IOException;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * 客戶端啟動類
 * 
 * @author Administrator
 *
 */
public class HeartNettyClient {

    public static void main(String[] args) throws InterruptedException, IOException {
        // 首先,netty通過Bootstrap啟動客戶端
        Bootstrap client = new Bootstrap();

        // 第1步 定義線程組,處理讀寫和鏈接事件,沒有了accept事件
        EventLoopGroup group = new NioEventLoopGroup();
        client.group(group);

        // 第2步 綁定客戶端通道
        client.channel(NioSocketChannel.class);

        // 第3步 給NIoSocketChannel初始化handler, 處理讀寫事件
        client.handler(new HeartNettyClientFilter());

        // 連接服務端
        Channel future = client.connect("localhost", 8080).sync().channel();
        
        //給服務端發送數據
        String str = "Hello Netty";
        future.writeAndFlush(str);
        System.out.println("客戶端發送數據:" + str);
    }

}
復制代碼


HeartNettyClientFilter——客戶端過濾器,如編解碼和心跳的設置

復制代碼
package com.dxfx.netty.demo;

import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * 客戶端過濾器,如編解碼和心跳的設置
 * 
 * @author Administrator
 *
 */
public class HeartNettyClientFilter  extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline ph = ch.pipeline();
        //因為服務端設置的超時時間是5秒,所以客戶端設置4秒
        ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));  
        
        // 解碼和編碼,應和服務端一致
        ph.addLast(new StringDecoder());
        ph.addLast(new StringEncoder());
        
        //處理客戶端的業務邏輯
        ph.addLast(new HeartNettyClientHandler());  
    }
}
 
 
復制代碼

 

HeartNettyClientHandler——處理客戶端業務邏輯:心跳超時處理、服務端返回的數據處理

復制代碼
package com.dxfx.netty.demo;

import java.text.SimpleDateFormat;
import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;

/**
 *    處理客戶端業務邏輯:心跳超時處理、服務端返回的數據處理
 * 
 * @author Administrator
 *
 */
public class HeartNettyClientHandler extends ChannelInboundHandlerAdapter {
    /** 客戶端請求的心跳命令 */
    private static final ByteBuf HEARTBEAT_SEQUENCE = 
            Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("hb_request", CharsetUtil.UTF_8));

    /** 空閑次數 */
    private int idle_count = 1;

    /** 發送次數 */
    private int count = 1;

    /** 循環次數 */
    private int fcount = 1;

    /**
     * 建立連接時
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("建立連接時:" + date());
        ctx.fireChannelActive();
    }

    /**
     * 關閉連接時
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("關閉連接時:" + date());
    }

    /**
     * 心跳請求處理,每4秒發送一次心跳請求;
     * 
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        System.out.println("\r\n循環請求的時間:" + date() + ",次數" + fcount);
        
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果寫通道處於空閑狀態就發送心跳命令
                // 設置發送次數,允許發送3次心跳包
                if (idle_count <= 3) { 
                    idle_count++;
                    ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
                } else {
                    System.out.println("心跳包發送結束,不再發送心跳請求!!!");
                }
            }
        }
        
        fcount++;
    }

    /**
     * 業務邏輯處理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("第" + count + "次" + ",客戶端收到的消息:" + msg);
        count++;
    }
    
    private String date(){
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         return sdf.format(new Date());
    }
}
復制代碼

 

c、客戶端輸出信息

復制代碼
客戶端發送數據:Hello Netty
建立連接時:2018-12-14 22:18:05

循環請求的時間:2018-12-14 22:18:09,次數1
第1次,客戶端收到的消息:服務端成功收到心跳信息

循環請求的時間:2018-12-14 22:18:13,次數2
第2次,客戶端收到的消息:服務端成功收到心跳信息

循環請求的時間:2018-12-14 22:18:17,次數3
第3次,客戶端收到的消息:服務端成功收到心跳信息

循環請求的時間:2018-12-14 22:18:21,次數4
心跳包發送結束,不再發送心跳請求!!!

循環請求的時間:2018-12-14 22:18:25,次數5
心跳包發送結束,不再發送心跳請求!!!

循環請求的時間:2018-12-14 22:18:29,次數6
心跳包發送結束,不再發送心跳請求!!!
關閉連接時:2018-12-14 22:18:32
復制代碼

 

d、服務端輸出信息

復制代碼
第1次,服務端收到的消息:Hello Netty
第2次,服務端收到的消息:hb_request
第3次,服務端收到的消息:hb_request
第4次,服務端收到的消息:hb_request
已等待5秒還沒收到客戶端發來的消息
已等待5秒還沒收到客戶端發來的消息
超過兩次無客戶端請求,關閉該channel
已等待5秒還沒收到客戶端發來的消息
復制代碼

 原文:https://www.cnblogs.com/linjiqin/p/10121561.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM