作者:ksfzhaohui317
https://segmentfault.com/a/1190000022591346
前言
談到RPC肯定繞不開TCP通信,而主流的RPC框架都依賴於Netty等通信框架,這時候我們還要考慮是使用長連接還是短連接:
-
短連接:每次通信結束后關閉連接,下次通信需要重新創建連接;優點就是無需管理連接,無需保活連接;
-
長連接:每次通信結束不關閉連接,連接可以復用,保證了性能;缺點就是連接需要統一管理,並且需要保活;
主流的RPC框架都會追求性能選擇使用長連接,所以如何保活連接就是一個重要的話題,也是本文的主題,下面會重點介紹一些保活策略;
為什么需要保活
上面介紹的長連接、短連接並不是TCP提供的功能,所以長連接是需要應用端自己來實現的,包括:連接的統一管理,如何保活等;如何保活之前我們了解一下為什么需要保活?
主要原因是網絡不是100%可靠的,我們創建好的連接可能由於網絡原因導致連接已經不可用了,如果連接一直有消息往來,那么系統馬上可以感知到連接斷開;
但是我們系統可能長時間沒有消息來往,導致系統不能及時感知到連接不可用,也就是不能及時處理重連或者釋放連接;常見的保活策略使用心跳機制由應用層來實現,還有網絡層提供的TCP Keepalive保活探測機制;
TCP Keepalive機制
TCP Keepalive是操作系統實現的功能,並不是TCP協議的一部分,需要在操作系統下進行相關配置,開啟此功能后,如果連接在一段時間內沒有數據往來,TCP將發送Keepalive探針來確認連接的可用性,Keepalive幾個內核參數配置:
-
tcp_keepalive_time:連接多長時間沒有數據往來發送探針請求,默認為7200s(2h);
-
tcp_keepalive_probes:探測失敗重試的次數默認為10次;
-
tcp_keepalive_intvl:重試的間隔時間默認75s;
以上參數可以修改到/etc/sysctl.conf文件中;是否使用Keepalive用來保活就夠了,其實還不夠,Keepalive只是在網絡層就行保活,如果網絡本身沒有問題,但是系統由於其他原因已經不可用了,這時候Keepalive並不能發現;所以往往還需要結合心跳機制來一起使用;
心跳機制
何為心跳機制,簡單來講就是客戶端啟動一個定時器用來定時發送請求,服務端接到請求進行響應,如果多次沒有接受到響應,那么客戶端認為連接已經斷開,可以斷開半打開的連接或者進行重連處理;下面以Dubbo為例來看看是如何具體實施的;
Dubbo2.6.X
在HeaderExchangeClient中啟動了定時器ScheduledThreadPoolExecutor來定期執行心跳請求:
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2,
new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
在實例化HeaderExchangeClient時啟動心跳定時器:
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
heartbeat默認為60秒,heartbeatTimeout默認為heartbeat*3,可以理解至少出現三次心跳請求還未收到回復才會任務連接已經斷開;HeartBeatTask為執行心跳的任務:
public void run() {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_READ\_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_WRITE\_TIMESTAMP);
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
// 發送心跳
}
if (lastRead != null && now - lastRead > heartbeatTimeout) {
if (channel instanceof Client) {
((Client) channel).reconnect();
} else {
channel.close();
}
}
}
}
因為Dubbo雙端都會發送心跳請求,所以可以發現有兩個時間點分別是:lastRead和lastWrite;當然時間和最后讀取,最后寫的時間間隔大於heartbeat就會發送心跳請求;
如果多次心跳未返回結果,也就是最后讀取消息時間大於heartbeatTimeout會判定當前是Client還是Server,如果是Client會發起reconnect,Server會關閉連接,這樣的考慮是合理的,客戶端調用是強依賴可用連接的,而服務端可以等待客戶端重新建立連接;
以上只是介紹的Client,同樣Server端也有相同的心跳處理,在可以查看HeaderExchangeServer;
Dubbo2.7.0
Dubbo2.7.0的心跳機制在2.6.X的基礎上得到了加強,同樣在HeaderExchangeClient中使用HashedWheelTimer開啟心跳檢測,這是Netty提供的一個時間輪定時器,在任務非常多,並且任務執行時間很短的情況下,HashedWheelTimer比Schedule性能更好,特別適合心跳檢測;
HashedWheelTimer heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,
TimeUnit.MILLISECONDS, Constants.TICKS\_PER\_WHEEL);
分別啟動了兩個定時任務:startHeartBeatTask和startReconnectTask:
private void startHeartbeatTimer() {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout); // init task and start timer.
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}
HeartbeatTimerTask:用來定時發送心跳請求,心跳間隔時間默認為60秒;這里重新計算了時間,其實就是在原來的基礎上除以3,其實就是縮短了檢測間隔時間,增大了及時發現死鏈的概率;分別看一下兩個任務:
protected void doTask(Channel channel) {
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat)
|| (lastWrite != null && now() - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
}
}
同上檢測最后讀寫時間和heartbeat的大小,注:普通請求和心跳請求都會更新讀寫時間;Netty 在 Dubbo 中是如何應用的?這篇推薦大家看一下。
protected void doTask(Channel channel) {
Long lastRead = lastRead(channel);
Long now = now(); if (lastRead != null && now - lastRead > heartbeatTimeout) { if (channel instanceof Client) {
((Client) channel).reconnect();
} else {
channel.close();
}
}
}
同樣的在超時的情況下,Client重連,Server關閉連接;同樣Server端也有相同的心跳處理,在可以查看HeaderExchangeServer;
Dubbo2.7.1-X
在Dubbo2.7.1之后,借助了Netty提供的IdleStateHandler來實現心跳機制服務:
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
-
readerIdleTime:讀超時時間;
-
writerIdleTime:寫超時時間;
-
allIdleTime:所有類型的超時時間;
根據設置的超時時間,循環檢查讀寫事件多久沒有發生了,在pipeline中加入IdleSateHandler之后,可以在此pipeline的任意Handler的userEventTriggered方法之中檢測IdleStateEvent事件;下面看看具體Client和Server端添加的IdleStateHandler:
Client端
protected void initChannel(Channel ch) throws Exception {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
ch.pipeline().addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
}
Client端在NettyClient中添加了IdleStateHandler,指定了讀寫超時時間默認為60秒;60秒內沒有讀寫事件發生,會觸發IdleStateEvent事件在NettyClientHandler處理:
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { try {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
可以發現接收到IdleStateEvent事件發送了心跳請求;至於Client端如何處理重連,同樣在HeaderExchangeClient中使用HashedWheelTimer定時器啟動了兩個任務:心跳任務和重連任務,感覺這里已經不需要心跳任務了,至於重連任務其實也可以放到userEventTriggered中處理;
Server端
protected void initChannel(NioSocketChannel ch) throws Exception { int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
ch.pipeline().addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
Server端指定的超時時間默認為60*3秒,在NettyServerHandler中處理userEventTriggered
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try {
channel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
super.userEventTriggered(ctx, evt);
}
Server端在指定的超時時間內沒有發生讀寫,會直接關閉連接;相比之前現在只有Client發送心跳,單向發送心跳;
同樣的在HeaderExchangeServer中並沒有啟動多個認為,僅僅啟動了一個CloseTimerTask,用來檢測超時時間關閉連接;感覺這個任務是不是也可以不需要了,IdleStateHandler已經實現了此功能;
綜上:在使用IdleStateHandler的情況下來同時在HeaderExchangeClient啟動心跳+重連機制,HeaderExchangeServer啟動了關閉連接機制;主要是因為IdleStateHandler是Netty框架特有了,而Dubbo是支持多種底層通訊框架的包括Mina,Grizzy等,應該是為了兼容此類框架存在的;
總結
本文首先介紹了RPC中引入的長連接方式,繼而引出長連接的保活機制,為什么需要保活?然后分別介紹了網絡層保活機制TCP Keepalive機制,應用層心跳機制;最后已Dubbo為例看各個版本中對心跳機制的進化。
推薦去我的博客閱讀更多:
2.Spring MVC、Spring Boot、Spring Cloud 系列教程
3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程
覺得不錯,別忘了點贊+轉發哦!