netty長(短)連接,心跳檢測



title: netty長連接,短連接,心跳檢測
date: 2018/4/23 11:12:55
tags: [netty,長連接,短連接,心跳檢測]
categories:

  • 開發
  • java

https://www.cnblogs.com/superfj/p/9153776.html

短連接

定義

client與server通過三次握手建立連接,client發送請求消息,server返回響應,一次連接就完成了。

這時候雙方任意都可以發起close操作,不過一般都是client先發起close操作。上述可知,短連接一般只會在 client/server間傳遞一次請求操作。

短連接的優缺點

管理起來比較簡單,存在的連接都是有用的連接,不需要額外的控制手段。

使用場景

通常瀏覽器訪問服務器的時候就是短連接。

對於服務端來說,長連接會耗費服務端的資源,而且用戶用瀏覽器訪問服務端相對而言不是很頻繁的

如果有幾十萬,上百萬的連接,服務端的壓力會非常大,甚至會崩潰。

所以對於並發量大,請求頻率低的,建議使用短連接。

長連接

什么是長連接

client向server發起連接,server接受client連接,雙方建立連接。

Client與server完成一次讀寫之后,它們之間的連接並不會主動關閉,后續的讀寫操作會繼續使用這個連接。

生命周期

  • 正常情況下,一條TCP長連接建立后,只要雙不提出關閉請求&不出現異常情況,這條連接是一直存在.

  • 操作系統不會自動去關閉它,甚至經過物理網絡拓撲的改變之后仍然可以使用。

  • 所以一條連接保持幾天、幾個月、幾年或者更長時間都有可能,只要不出現異常情況或由用戶(應用層)主動關閉。客戶端和服務單可一直使用該連接進行數據通信。

優缺點

優點

  • 長連接可以省去較多的TCP建立和關閉的操作,減少網絡阻塞的影響,

  • 當發生錯誤時,可以在不關閉連接的情況下進行提示,

  • 減少CPU及內存的使用,因為不需要經常的建立及關閉連接。

缺點

  • 連接數過多時,影響服務端的性能和並發數量

使用場景

  • 數據庫的連接就是采用TCP長連接.
  • RPC一個服務進程頻繁調用另一個服務進程,可使用長連接,減少連接花費的時間。

特殊的長連接模式

一種比較特殊的長連接模式,在一段時間內,“數據中心”與設備之間數據交互頻繁,但是過了這段時間,很長一段時間內,都沒有任何數據需要交互的情況下,最好的辦法就是:在交互頻繁的那段時間,保持長連接 ,一旦過了那段時間,立馬斷開連接,下次需要交互時,再獲取連接即可,這種方式,主要可以為“數據中心”服務器節省資源的浪費。

第三種情況的使用,需要考慮兩個問題:

  • 如果客戶端或者“數據中心”,連接超時,無數據交互后,如何去斷開連接?
  • 客戶端主動斷開連接,很好辦,下次需要再連接即可,那么如果是“數據中心”主動斷開連接了,客戶端應該如何去與“數據中心”再次建立連接?

長連接模式下,維護連接的方案:

如果是長連接的情況下,一般我們都需要做連接的維護工作,方案主要有以下兩種:

1、客戶端間隔5分鍾,向服務器發起一次“心跳”報文,如果服務器正常回應,那就無所謂,如果不回應,一般就直接斷開“連接”,然后重新向服務器再次申請新的連接即可。

2、客戶端或者服務端開啟一個定時任務,間隔5分鍾,判斷在這5分鍾內,是否有向服務器交互數據,如果有交互,那么就繼續維護這個連接,如果沒有交互,那么就直接斷開連接即可,下次再需要交互時,再向服務器申請新的連接即可。這樣做的好處,是給服務器減壓

客戶端如果主動自己斷開,這個一般不需要做特殊處理,直接在下次連接時,申請新的連接即可。

服務器如果自己因為什么原因,斷開了,那么客戶端,需要定義一個定時任務,間隔10分鍾,或者多少時間,去不斷的嘗試服務器是否恢復。

長連接的實現

心跳機制

應用層協議大多都有HeartBeat機制,通常是客戶端每隔一小段時間向服務器發送一個數據包,通知服務器自己仍然在線。並傳輸一些可能必要的數據。使用心跳包的典型協議是IM,比如QQ/MSN/飛信等協議。

TCP . SO_KEEPALIVE。系統默認是設置的2小時的心跳頻率。但是它檢查不到機器斷電、網線拔出、防火牆這些斷線。而且邏輯層處理斷線可能也不是那么好處理。一般,如果只是用於保活還是可以的。

Why心跳機制?

  • 網絡的不可靠性
    • 有可能在 TCP 保持長連接的過程中, 由於某些突發情況,(網線被拔出, 突然斷電等),會造成連接中斷
    • 在這些突發情況下, 如果恰好服務器和客戶端之間沒有交互的話, 那么它們是不能在短時間內發現對方已經掉線的.心跳機制即可解決此類問題。

TCP協議的KeepAlive機制

默認KeepAlive狀態是不打開的。

需要將setsockopt將SOL_SOCKET.SO_KEEPALIVE設置為1才是打開KeepAlive狀態,

並且可以設置三個參數:

tcp_keepalive_time ,tcp_keepalive_probes , tcp_keepalive_intvl

分別表示:連接閑置多久開始發keepalive的ack包、發幾個ack包不回復才當對方已斷線、兩個ack包之間的間隔。

很多網絡設備,尤其是NAT路由器,由於其硬件的限制(例如內存、CPU處理能力),無法保持其上的所有連接,因此在必要的時候,會在連接池中選擇一些不活躍的連接踢掉。

典型做法是LRU,把最久沒有數據的連接給T掉。

通過使用TCP的KeepAlive機制(修改那個time參數),可以讓連接每隔一小段時間就產生一些ack包,以降低被踢掉的風險,當然,這樣的代價是額外的網絡和CPU負擔。

如何實現心跳機制

兩種方式實現心跳機制:

  • 使用 TCP 協議層面的 keepalive 機制.
  • 應用層上實現自定義的心跳機制.

雖然在 TCP 協議層面上, 提供了 keepalive 保活機制, 但是使用它有幾個缺點:

  1. 它不是 TCP 的標准協議, 並且是默認關閉的.
  2. TCP keepalive 機制依賴於操作系統的實現, 默認的 keepalive 心跳時間是 兩個小時, 並且對 keepalive 的修改需要系統調用(或者修改系統配置), 靈活性不夠.
  3. TCP keepalive 與 TCP 協議綁定, 因此如果需要更換為 UDP 協議時, keepalive 機制就失效了.

使用 TCP 層面的 keepalive 機制比自定義的應用層心跳機制節省流量,

Netty心跳和斷線重連

https://github.com/JayTange/LearnTCP

IdleStateHandler

//構造函數
public IdleStateHandler(
    long readerIdleTime, long writerIdleTime, long allIdleTime,
    TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

//HeartBeatServer.java(服務端)

serverBootstrap.childHandler(new HeartBeatServerInitializer());

private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {
    
      @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // 監聽讀操作,讀超時時間為5秒,超過5秒關閉channel;
            pipeline.addLast("ping", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0, TimeUnit.SECONDS));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());

            pipeline.addLast("handler", new HeartbeatServerHandler());
        }
}

  • readerIdleTimeSeconds, 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發 IdleStateEvent 事件(READER_IDLE)
  • writerIdleTimeSeconds, 寫超時. 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個IdleStateEvent 事件(WRITE_IDLE)
  • allIdleTimeSeconds, 讀和寫都超時. 即當在指定的時間間隔內沒有讀並且寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.
// HeartBeatServerHandler.java extends SimpleChannelInboundHandler<T>
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent event = (IdleStateEvent) evt;
        if (event.state()==IdleState.READER_IDLE){
            // 失敗計數器次數大於等於3次的時候,關閉鏈接,等待client重連
            if (unRecPingTimes >= MAX_UN_REC_PING_TIMES) {
                System.out.println("===>服務端===(讀超時,關閉chanel)");
                // 連續超過N次未收到client的ping消息,那么關閉該通道,等待client重連
                ctx.close();
            } else {
                // 失敗計數器加1
                unRecPingTimes++;
            }
        }else {
            super.userEventTriggered(ctx,evt);
        }
    }
}


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    super.channelActive(ctx);
    System.out.println("一個客戶端已連接");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    super.channelInactive(ctx);
    System.out.println("一個客戶端已斷開連接");
}

客戶端HeartBeatClient

// 客戶端
public class HeartBeatClient {

    private Random random = new Random();
    public Channel channel;
    public Bootstrap bootstrap;

    protected String host = "127.0.0.1";
    protected int port = 9817;

    public static void main(String args[]) throws Exception {
        HeartBeatClient client = new HeartBeatClient();
        client.run();
        client.sendData();

    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleClientInitializer(HeartBeatClient.this));
            doConncet();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 發送數據
     * @throws Exception
     */
    public void sendData() throws Exception {
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        while (true){
            String cmd = in.readLine();
            switch (cmd){
                case "close" :
                    channel.close();
                    break;
                default:
                channel.writeAndFlush(in.readLine());
                    break;
            }
        }
    }

    /**
     * 連接服務端
     */
    public void doConncet() {
        if (channel != null && channel.isActive()) {
            return;
        }
        ChannelFuture channelFuture = bootstrap.connect(host, port);
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (channelFuture.isSuccess()) {
                    channel = futureListener.channel();
                    System.out.println("connect server successfully");
                } else {
                    System.out.println("Failed to connect to server, try connect after 10s");
                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConncet();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });

    }


    private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {

        private HeartBeatClient client;

        public SimpleClientInitializer(HeartBeatClient client) {
            this.client = client;
        }

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 5, 0));
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("handler", new HeartBeatClientHandler(client));
        }
    }

}

HeartBeatClientHandler

客戶端handler

public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
    private HeartBeatClient client;

    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    public HeartBeatClientHandler(HeartBeatClient client) {
        this.client = client;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("收到服務端回復:"+msg);
        if (msg.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.err.println("客戶端與服務端斷開連接,斷開的時間為:"+format.format(new Date()));
        // 定時線程 斷線重連
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(new Runnable() {
            @Override
            public void run() {
                client.doConncet();
            }
        }, 10, TimeUnit.SECONDS);
    }

}

長連接優化

https://www.jianshu.com/p/54f9bfcd054b

物聯網(IoT ) 時代

internet of things 飛速發展,對服務端的協議接入和處理能力要求極高,而且IoT設備接入它有以下特點

  1. 使用移動網絡(Wifi),網絡質量不穩定
  2. 海量接入,而且是長連接,服務端壓力大
  3. 不穩定,消息丟失,重發,延遲,過期發送時有發生
  4. 協議不統一,私有協議 ,開發測試成本高

長連接海量接入優化方向

以下這些調優,都屬於小方法,小技巧,如果系統對性能要求很高,最優的還是采用分布式集群的方式來提升整個服務端的處理能力。

要想實現海量設備的長連接接入,需要對

  • 操作系統相關參數
  • Netty框架、JVM GC參數
  • 業務代碼針對性的優化
  • 甚至各種優化要素互相影響
操作系統層面
文件描述符

在 /etc/sysctl.conf 插入 fs.file-max = 1000000

設置單進程打開的最大句柄數

TCP/IP相關參數
多網卡隊列軟中斷
Netty調優
線程優化
  • boss線程池優化
    對於Netty服務端,通常只需要啟動一個監聽端口用於端側設備接入,但是如果集群實例較少,甚至是單機部署,那么在短時間內大量設備接入時,需要對服務端的監聽方式和線程模型做優化,即服務端監聽多個端口,利用主從Reactor線程模型。由於同時監聽了多個端口,每個ServerSocketChannel都對應一個獨立的Acceptor線程,這樣就能並行處理,加速端側設備的接人速度,減少端側設備的連接超時失敗率,提高單節點服務端的處理性能。
  • work線程池優化(I/O工作線程池)
    對於I/O工作線程池的優化,可以先采用系統默認值(cpu內核數*2)進行性能測試,在性能測試過程中采集I/O線程的CPU占用大小,看是否存在瓶頸,具體策略如下:
心跳優化

心跳檢測周期通常不要超過60s,心跳檢測超時通常為心跳檢測周期的2倍

接發緩沖區調優

.childOption(channelOption.SO_RCVBUF, 8*1024)

內存池

java對於緩沖區Buffer的分配和回收是一個耗時的工作(特別是堆外直接內存)。

為了盡量重用緩沖區,netty提供了基於內存池的緩沖區重用機制。

內存池實現上可以分為兩類:堆外直接內存和堆內存。

DirectBypeBuf的創建成本高,因為需要配合內存池使用

netty默認的io讀寫操作采用的都是內存池的堆外直接內存模式,如果用戶需要額外使用ByteBuf,建議也采用內存池方式 ; 如果不涉及網絡io操作(單純的內存操作)可以使用堆內存池

// void initchannel(SocketChannel channel)
channel.config().setAllocator(UnpooledByteBufAllocator.DEAFAULT);
io線程與業務線程隔離
JVM調優
GC指標

吞吐量,延遲,內存占用

GC優化原則
  1. Minor GC (每次新生代回收盡可能多的內存,以減小FULL GC)

  2. GC內存最大化原則:GC使用的內存越大,效率越高

  3. 大多數IoT應用,吞吐量優先

-XX printGC

visual GC工具


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM