title: netty長連接,短連接,心跳檢測
date: 2018/4/23 11:12:55
tags: [netty,長連接,短連接,心跳檢測]
categories:
- 開發
- java
https://www.cnblogs.com/superfj/p/9153776.html
短連接
定義
client與server通過三次握手建立連接,client發送請求消息,server返回響應,一次連接就完成了。
這時候雙方任意都可以發起close操作,不過一般都是client先發起close操作。上述可知,短連接一般只會在 client/server間傳遞一次請求操作。
短連接的優缺點
管理起來比較簡單,存在的連接都是有用的連接,不需要額外的控制手段。
使用場景
通常瀏覽器訪問服務器的時候就是短連接。
對於服務端來說,長連接會耗費服務端的資源,而且用戶用瀏覽器訪問服務端相對而言不是很頻繁的
如果有幾十萬,上百萬的連接,服務端的壓力會非常大,甚至會崩潰。
所以對於並發量大,請求頻率低的,建議使用短連接。
長連接
什么是長連接
client向server發起連接,server接受client連接,雙方建立連接。
Client與server完成一次讀寫之后,它們之間的連接並不會主動關閉,后續的讀寫操作會繼續使用這個連接。
生命周期
-
正常情況下,一條TCP長連接建立后,只要雙不提出關閉請求&不出現異常情況,這條連接是一直存在.
-
操作系統不會自動去關閉它,甚至經過物理網絡拓撲的改變之后仍然可以使用。
-
所以一條連接保持幾天、幾個月、幾年或者更長時間都有可能,只要不出現異常情況或由用戶(應用層)主動關閉。客戶端和服務單可一直使用該連接進行數據通信。
優缺點
優點
-
長連接可以省去較多的TCP建立和關閉的操作,減少網絡阻塞的影響,
-
當發生錯誤時,可以在不關閉連接的情況下進行提示,
-
減少CPU及內存的使用,因為不需要經常的建立及關閉連接。
缺點
- 連接數過多時,影響服務端的性能和並發數量
使用場景
- 數據庫的連接就是采用TCP長連接.
- RPC一個服務進程頻繁調用另一個服務進程,可使用長連接,減少連接花費的時間。
特殊的長連接模式
一種比較特殊的長連接模式,在一段時間內,“數據中心”與設備之間數據交互頻繁,但是過了這段時間,很長一段時間內,都沒有任何數據需要交互的情況下,最好的辦法就是:在交互頻繁的那段時間,保持長連接 ,一旦過了那段時間,立馬斷開連接,下次需要交互時,再獲取連接即可,這種方式,主要可以為“數據中心”服務器節省資源的浪費。
第三種情況的使用,需要考慮兩個問題:
- 如果客戶端或者“數據中心”,連接超時,無數據交互后,如何去斷開連接?
- 客戶端主動斷開連接,很好辦,下次需要再連接即可,那么如果是“數據中心”主動斷開連接了,客戶端應該如何去與“數據中心”再次建立連接?
長連接模式下,維護連接的方案:
如果是長連接的情況下,一般我們都需要做連接的維護工作,方案主要有以下兩種:
1、客戶端間隔5分鍾,向服務器發起一次“心跳”報文,如果服務器正常回應,那就無所謂,如果不回應,一般就直接斷開“連接”,然后重新向服務器再次申請新的連接即可。
2、客戶端或者服務端開啟一個定時任務,間隔5分鍾,判斷在這5分鍾內,是否有向服務器交互數據,如果有交互,那么就繼續維護這個連接,如果沒有交互,那么就直接斷開連接即可,下次再需要交互時,再向服務器申請新的連接即可。這樣做的好處,是給服務器減壓
客戶端如果主動自己斷開,這個一般不需要做特殊處理,直接在下次連接時,申請新的連接即可。
服務器如果自己因為什么原因,斷開了,那么客戶端,需要定義一個定時任務,間隔10分鍾,或者多少時間,去不斷的嘗試服務器是否恢復。
長連接的實現
心跳機制
應用層協議大多都有HeartBeat機制,通常是客戶端每隔一小段時間向服務器發送一個數據包,通知服務器自己仍然在線。並傳輸一些可能必要的數據。使用心跳包的典型協議是IM,比如QQ/MSN/飛信等協議。
TCP . SO_KEEPALIVE。系統默認是設置的2小時的心跳頻率。但是它檢查不到機器斷電、網線拔出、防火牆這些斷線。而且邏輯層處理斷線可能也不是那么好處理。一般,如果只是用於保活還是可以的。
Why心跳機制?
- 網絡的不可靠性
- 有可能在 TCP 保持長連接的過程中, 由於某些突發情況,(網線被拔出, 突然斷電等),會造成連接中斷
- 在這些突發情況下, 如果恰好服務器和客戶端之間沒有交互的話, 那么它們是不能在短時間內發現對方已經掉線的.心跳機制即可解決此類問題。
TCP協議的KeepAlive機制
默認KeepAlive狀態是不打開的。
需要將setsockopt將SOL_SOCKET.SO_KEEPALIVE設置為1才是打開KeepAlive狀態,
並且可以設置三個參數:
tcp_keepalive_time ,tcp_keepalive_probes , tcp_keepalive_intvl,
分別表示:連接閑置多久開始發keepalive的ack包、發幾個ack包不回復才當對方已斷線、兩個ack包之間的間隔。
很多網絡設備,尤其是NAT路由器,由於其硬件的限制(例如內存、CPU處理能力),無法保持其上的所有連接,因此在必要的時候,會在連接池中選擇一些不活躍的連接踢掉。
典型做法是LRU,把最久沒有數據的連接給T掉。
通過使用TCP的KeepAlive機制(修改那個time參數),可以讓連接每隔一小段時間就產生一些ack包,以降低被踢掉的風險,當然,這樣的代價是額外的網絡和CPU負擔。
如何實現心跳機制
兩種方式實現心跳機制:
- 使用 TCP 協議層面的 keepalive 機制.
- 在應用層上實現自定義的心跳機制.
雖然在 TCP 協議層面上, 提供了 keepalive 保活機制, 但是使用它有幾個缺點:
- 它不是 TCP 的標准協議, 並且是默認關閉的.
- TCP keepalive 機制依賴於操作系統的實現, 默認的 keepalive 心跳時間是 兩個小時, 並且對 keepalive 的修改需要系統調用(或者修改系統配置), 靈活性不夠.
- TCP keepalive 與 TCP 協議綁定, 因此如果需要更換為 UDP 協議時, keepalive 機制就失效了.
使用 TCP 層面的 keepalive 機制比自定義的應用層心跳機制節省流量,
Netty心跳和斷線重連
https://github.com/JayTange/LearnTCP
IdleStateHandler
//構造函數
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
//HeartBeatServer.java(服務端)
serverBootstrap.childHandler(new HeartBeatServerInitializer());
private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 監聽讀操作,讀超時時間為5秒,超過5秒關閉channel;
pipeline.addLast("ping", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new HeartbeatServerHandler());
}
}
- readerIdleTimeSeconds, 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發 IdleStateEvent 事件(READER_IDLE)
- writerIdleTimeSeconds, 寫超時. 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個IdleStateEvent 事件(WRITE_IDLE)
- allIdleTimeSeconds, 讀和寫都超時. 即當在指定的時間間隔內沒有讀並且寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.
// HeartBeatServerHandler.java extends SimpleChannelInboundHandler<T>
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state()==IdleState.READER_IDLE){
// 失敗計數器次數大於等於3次的時候,關閉鏈接,等待client重連
if (unRecPingTimes >= MAX_UN_REC_PING_TIMES) {
System.out.println("===>服務端===(讀超時,關閉chanel)");
// 連續超過N次未收到client的ping消息,那么關閉該通道,等待client重連
ctx.close();
} else {
// 失敗計數器加1
unRecPingTimes++;
}
}else {
super.userEventTriggered(ctx,evt);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("一個客戶端已連接");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("一個客戶端已斷開連接");
}
客戶端HeartBeatClient
// 客戶端
public class HeartBeatClient {
private Random random = new Random();
public Channel channel;
public Bootstrap bootstrap;
protected String host = "127.0.0.1";
protected int port = 9817;
public static void main(String args[]) throws Exception {
HeartBeatClient client = new HeartBeatClient();
client.run();
client.sendData();
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleClientInitializer(HeartBeatClient.this));
doConncet();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 發送數據
* @throws Exception
*/
public void sendData() throws Exception {
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while (true){
String cmd = in.readLine();
switch (cmd){
case "close" :
channel.close();
break;
default:
channel.writeAndFlush(in.readLine());
break;
}
}
}
/**
* 連接服務端
*/
public void doConncet() {
if (channel != null && channel.isActive()) {
return;
}
ChannelFuture channelFuture = bootstrap.connect(host, port);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (channelFuture.isSuccess()) {
channel = futureListener.channel();
System.out.println("connect server successfully");
} else {
System.out.println("Failed to connect to server, try connect after 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doConncet();
}
}, 10, TimeUnit.SECONDS);
}
}
});
}
private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {
private HeartBeatClient client;
public SimpleClientInitializer(HeartBeatClient client) {
this.client = client;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 5, 0));
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("handler", new HeartBeatClientHandler(client));
}
}
}
HeartBeatClientHandler
客戶端handler
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
private HeartBeatClient client;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));
public HeartBeatClientHandler(HeartBeatClient client) {
this.client = client;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("收到服務端回復:"+msg);
if (msg.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.err.println("客戶端與服務端斷開連接,斷開的時間為:"+format.format(new Date()));
// 定時線程 斷線重連
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
client.doConncet();
}
}, 10, TimeUnit.SECONDS);
}
}
長連接優化
https://www.jianshu.com/p/54f9bfcd054b
物聯網(IoT ) 時代
internet of things 飛速發展,對服務端的協議接入和處理能力要求極高,而且IoT設備接入它有以下特點
- 使用移動網絡(Wifi),網絡質量不穩定
- 海量接入,而且是長連接,服務端壓力大
- 不穩定,消息丟失,重發,延遲,過期發送時有發生
- 協議不統一,私有協議 ,開發測試成本高
長連接海量接入優化方向
以下這些調優,都屬於小方法,小技巧,如果系統對性能要求很高,最優的還是采用分布式集群的方式來提升整個服務端的處理能力。
要想實現海量設備的長連接接入,需要對
- 操作系統相關參數
- Netty框架、JVM GC參數
- 業務代碼針對性的優化
- 甚至各種優化要素互相影響
操作系統層面
文件描述符
在 /etc/sysctl.conf 插入 fs.file-max = 1000000
設置單進程打開的最大句柄數
TCP/IP相關參數
多網卡隊列軟中斷
Netty調優
線程優化
- boss線程池優化
對於Netty服務端,通常只需要啟動一個監聽端口用於端側設備接入,但是如果集群實例較少,甚至是單機部署,那么在短時間內大量設備接入時,需要對服務端的監聽方式和線程模型做優化,即服務端監聽多個端口,利用主從Reactor線程模型。由於同時監聽了多個端口,每個ServerSocketChannel都對應一個獨立的Acceptor線程,這樣就能並行處理,加速端側設備的接人速度,減少端側設備的連接超時失敗率,提高單節點服務端的處理性能。 - work線程池優化(I/O工作線程池)
對於I/O工作線程池的優化,可以先采用系統默認值(cpu內核數*2)進行性能測試,在性能測試過程中采集I/O線程的CPU占用大小,看是否存在瓶頸,具體策略如下:
心跳優化
心跳檢測周期通常不要超過60s,心跳檢測超時通常為心跳檢測周期的2倍
接發緩沖區調優
.childOption(channelOption.SO_RCVBUF, 8*1024)
內存池
java對於緩沖區Buffer的分配和回收是一個耗時的工作(特別是堆外直接內存)。
為了盡量重用緩沖區,netty提供了基於內存池的緩沖區重用機制。
內存池實現上可以分為兩類:堆外直接內存和堆內存。
DirectBypeBuf的創建成本高,因為需要配合內存池使用
netty默認的io讀寫操作采用的都是內存池的堆外直接內存模式,如果用戶需要額外使用ByteBuf,建議也采用內存池方式 ; 如果不涉及網絡io操作(單純的內存操作)可以使用堆內存池
// void initchannel(SocketChannel channel)
channel.config().setAllocator(UnpooledByteBufAllocator.DEAFAULT);
io線程與業務線程隔離
JVM調優
GC指標
吞吐量,延遲,內存占用
GC優化原則
-
Minor GC (每次新生代回收盡可能多的內存,以減小FULL GC)
-
GC內存最大化原則:GC使用的內存越大,效率越高
-
大多數IoT應用,吞吐量優先
-XX printGC
visual GC工具