心跳
就是告訴其它人自己還活着。在簡易RPC框架中,采用的是TCP長連接,為了確保長連接有效,就需要客戶端與服務端之間有一種通知機制告知對方的存活狀態。
如何實現
客戶端發送心跳消息
在狀態空閑的時候定時給服務端發送消息類型為PING消息。
服務端接收心跳消息
捕獲通道空閑狀態事件,如果接收客戶端PING消息,則發送PONG消息給服務端。如果在一定時間內沒有收到客戶端的PING消息,則說明客戶端已經不在線,此時關閉通道。
客戶端管理可用連接
由於服務端會因為長時間接收不到服務端的PING消息而關閉通道,這就導致緩存在客戶端的連接的可用性發生變化。需要將不可用的從可用列表中轉移出去,並對不可用連接進行處理,比如直接丟棄或者是重新連接。
預備知識
ChannelPipeline與handle的關系。netty中的這些handle和spring mvc中的filter作用是類似的,ChannelPipeline可以理解成handle的容器,里面可以被注冊眾多處理不同業務功能的事件處理器,比如:
- 編碼
- 解碼
- 心跳
- 權限
- 加密
- 解密
- 業務代碼執行
- ......
具體實現
空閑狀態處理器
可以利用netty提供的IdleStateHandler來發送PING-PONG消息。這個處理器主要是捕獲通道超時事件,主要有三類
- 讀超時,一定時間內沒有從通道內讀取到任何數據
- 寫超時,一定時間內沒有從通道內寫入任何數據
- 讀寫超時,一定時間內沒有從通道內讀取或者是寫入任何數據
客戶端加入空閑狀態處理器
客戶端捕獲讀寫超時,如果事件觸發就給服務端發送PING消息。
服務端加入空閑狀態處理器
服務端只需要捕獲讀超時即可,當讀超時觸發后就關閉通道。
為什么在空閑狀態才發送心跳消息
在正常客戶端與服務端有交互的情況下,說明雙方都在正常工作不需要額外的心跳來告知對方的存活。只有雙方在一定時間內沒有接收到對方的消息時才開始采用心跳消息來探測對方的存活,這也是一種提升效率的做法。
抽象心跳處理器
創建AbstractHeartbeatHandler,並繼承ChannelInboundHandlerAdapter,服務於客戶端與服務端的心跳處理器。在讀取方法中判斷消息類型:
- 如果是PING消息就發送PONG消息給客戶端
- 如果收到的是PONG消息,則直接打印消息說明客戶端已經成功接收到服務端返回的PONG消息
- 如果是其它類型的消息,則通知下一個處理器處理消息
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
if(!(msg instanceof RpcMessage)){
channelHandlerContext.fireChannelRead(msg);
return;
}
RpcMessage message=(RpcMessage)msg;
if(null==message||null==message.getMessageHeader()){
channelHandlerContext.fireChannelRead(msg);
return;
}
if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){
logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody());
}
else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){
this.sendPong(channelHandlerContext);
}
else {
channelHandlerContext.fireChannelRead(msg);
}
}
空閑狀態事件,可以根據不同的狀態做不同的行為處理,定義三個可重寫事件供客戶端與服務端處理器具體確認處理事件。
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
this.handleReaderIdle(ctx);
break;
case WRITER_IDLE:
this.handleWriterIdle(ctx);
break;
case ALL_IDLE:
this.handleAllIdle(ctx);
break;
default:
break;
}
}
}
客戶端心跳處理器
繼承抽象心跳處理器,並重寫事件發送PING消息。
public class ClientHeartbeatHandler extends AbstractHeartbeatHandler {
@Override
protected void handleAllIdle(ChannelHandlerContext ctx) {
this.sendPing(ctx);
}
}
服務端心跳處理器
繼承抽象心跳處理器,並重寫事件關閉通道。
public class ServerHeartbeatHandler extends AbstractHeartbeatHandler {
@Override
protected void handleReaderIdle(ChannelHandlerContext ctx) {
logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel");
ctx.close();
}
}
客戶端ChannelPipeline中加入心跳處理器
比如5秒內未寫入或者讀取通道數據就觸發超時事件。
.addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));
服務端ChannelPipeline中加入心跳處理器
比如10秒未接收到通道消息就觸發讀超時事件。
.addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))
客戶端消息示例
正常情況下心跳消息顯示如下圖所示,消息的內容可以根據自己的情況自行定義。
客戶端下線消息示例
停止客戶端程序,然后服務端讀超時事件觸發,並關閉通道。
客戶端可用連接管理
由於上述的服務端心跳處理器,在觸發讀超時后會關閉通信管道,這導致客戶端緩存的連接狀態會出現不可用的情況,為了讓客戶端一直只能取到可用連接就需要對從緩存中獲取到的連接做狀態判斷,如果可用直接返回,如果不可用則將連接從可用列表中刪除然后取下一個可用連接。
修改獲取連接方法
通過channel的isActive屬性可以判斷連接是否可用,如果不可以做刪除並重新獲取的操作。
public RpcClientInvoker getInvoker() {
// ...
int index = loadbalanceService.index(size);
RpcClientInvoker invoker= RpcClientInvokerCache.get(index);
if(invoker.getChannel().isActive()) {
return invoker;
}
else {
RpcClientInvokerCache.removeHandler(invoker);
logger.info("invoker is not active,so remove it and get next one");
return this.getInvoker();
}
}
后台啟動任務處理不可用連接
啟動一個每隔5秒執行一次任務的線程,定時取出不可用連接,然后重連,並將不可用連接刪除。
這里我處理的重連是直接丟棄原有不可用連接,然后重新創建新連接。
private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class);
static {
executorService.schedule(new Runnable() {
@Override
public void run() {
while (true) {
List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers();
if (!CollectionUtils.isEmpty(notConnectedHandlers)) {
for (RpcClientInvoker invoker : notConnectedHandlers) {
RpcClientInvokerManager.getInstance(referenceConfig).connect();
}
RpcClientInvokerCache.clearNotConnectedHandler();
}
}
}
}, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS);
}
本文源碼
https://github.com/jiangmin168168/jim-framework
文中代碼是依賴上述項目的,如果有不明白的可下載源碼
引用
本文中的圖取自於網格