什么是心跳機制?
心跳說的是在客戶端和服務端在互相建立ESTABLISH狀態的時候,如何通過發送一個最簡單的包來保持連接的存活,還有監控另一邊服務的可用性等。
心跳包的作用
-
保活
Q:為什么說心跳機制能保持連接的存活,它是集群中或長連接中最為有效避免網絡中斷的一個重要的保障措施?
A:之所以說是“避免網絡中斷的一個重要保障措施”,原因是:我們得知公網IP是一個寶貴的資源,一旦某一連接長時間的占用並且不發數據,這怎能對得起網絡給此連接分配公網IP,這簡直是對網絡資源最大的浪費,所以基本上所有的NAT路由器都會定時的清除那些長時間沒有數據傳輸的映射表項。一是回收IP資源,二是釋放NAT路由器本身內存的資源,這樣問題就來了,連接被從中間斷開了,雙發還都不曉得對方已經連通不了了,還會繼續發數據,這樣會有兩個結果:a) 發方會收到NAT路由器的RST包,導致發方知道連接已中斷;b) 發方沒有收到任何NAT的回執,NAT只是簡單的drop相應的數據包
通常我們測試得出的是第二種情況會多些,就是客戶端是不知道自己應經連接斷開了,所以這時候心跳就可以和NAT建立關聯了,只要我們在NAT認為合理連接的時間內發送心跳數據包,這樣NAT會繼續keep連接的IP映射表項不被移除,達到了連接不會被中斷的目的。 -
檢測另一端服務是否可用
TCP的斷開可能有時候是不能瞬時探知的,甚至是不能探知的,也可能有很長時間的延遲,如果前端沒有正常的斷開TCP連接,四次握手沒有發起,服務端無從得知客戶端的掉線,這個時候我們就需要心跳包來檢測另一端服務是否還存活可用。
基於TCP的keepalive機制實現
基於TCP的keepalive機制,由具體的TCP協議棧來實現長連接的維持。如在netty中可以在創建channel的時候,指定SO_KEEPALIVE參數來實現:

存在的問題:Netty只能控制SO_KEEPALIVE這個參數,其他參數,則需要從系統的sysctl中讀取,其中比較關鍵的是tcp_keepalive_time,發送心跳包檢測的時間間隔,默認為7200s,即空閑后,每2小時檢測一次。如果客戶端在這2小時內斷開了,那么服務端也要維護這個連接2小時,浪費服務端資源;另外就是對於需要實時傳輸數據的場景,客戶端斷開了,服務端也要2小時后才能發現。服務端發送心跳檢測,具體可能出現的情況如下:
(1)連接正常:客戶端仍然存在,網絡連接狀況良好。此時客戶端會返回一個 ACK 。 服務端接收到ACK后重置計時器,在2小時后再發送探測。如果2小時內連接上有數據傳輸,那么在該時間基礎上向后推延2個小時;
(2)連接斷開:客戶端異常關閉,或是網絡斷開。在這兩種情況下,客戶端都不會響應。服務器沒有收到對其發出探測的響應,並且在一定時間(系統默認為 1000 ms )后重復發送 keep-alive packet ,並且重復發送一定次數。
(3)客戶端曾經崩潰,但已經重啟:這種情況下,服務器將會收到對其存活探測的響應,但該響應是一個復位,從而引起服務器對連接的終止。
基於Netty的IdleStateHandler實現
什么是 IdleStateHandler
當連接的空閑時間(讀或者寫)太長時,將會觸發一個 IdleStateEvent 事件。然后,你可以通過你的 ChannelInboundHandler 中重寫 userEventTrigged 方法來處理該事件。
如何使用?
IdleStateHandler 既是出站處理器也是入站處理器,繼承了 ChannelDuplexHandler 。通常在 initChannel 方法中將 IdleStateHandler 添加到 pipeline 中。然后在自己的 handler 中重寫 userEventTriggered 方法,當發生空閑事件(讀或者寫),就會觸發這個方法,並傳入具體事件。
這時,你可以通過 Context 對象嘗試向目標 Socekt 寫入數據,並設置一個 監聽器,如果發送失敗就關閉 Socket (Netty 准備了一個 ChannelFutureListener.CLOSE_ON_FAILURE 監聽器用來實現關閉 Socket 邏輯)。
這樣,就實現了一個簡單的心跳服務。
源碼分析
構造方法
該類有 3 個構造方法,主要對一下 4 個屬性賦值:
private final boolean observeOutput;// 是否考慮出站時較慢的情況。默認值是false(不考慮)。 private final long readerIdleTimeNanos; // 讀事件空閑時間,0 則禁用事件 private final long writerIdleTimeNanos;// 寫事件空閑時間,0 則禁用事件 private final long allIdleTimeNanos; //讀或寫空閑時間,0 則禁用事件
可以分別控制讀,寫,讀寫超時的時間,單位為秒,如果是0表示不檢測,所以如果全是0,則相當於沒添加這個IdleStateHandler,連接是個普通的短連接。
handlerAdded 方法
IdleStateHandler是在創建IdleStateHandler實例並添加到ChannelPipeline時添加定時任務來進行定時檢測的,具體在initialize(ctx)方法實現;同時在從ChannelPipeline移除或Channel關閉時,移除這個定時檢測,具體在destroy()實現
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive() && ctx.channel().isRegistered()) { this.initialize(ctx); } } public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { this.destroy(); }
initialize
private void initialize(ChannelHandlerContext ctx) { switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { // 這里的 schedule 方法會調用 eventLoop 的 schedule 方法,將定時任務添加進隊列中 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
只要給定的參數大於0,就創建一個定時任務,每個事件都創建。同時,將 state 狀態設置為 1,防止重復初始化。調用 initOutputChanged 方法,初始化 “監控出站數據屬性”,代碼如下:
private void initOutputChanged(ChannelHandlerContext ctx) { if (observeOutput) { Channel channel = ctx.channel(); Unsafe unsafe = channel.unsafe(); ChannelOutboundBuffer buf = unsafe.outboundBuffer(); // 記錄了出站緩沖區相關的數據,buf 對象的 hash 碼,和 buf 的剩余緩沖字節數 if (buf != null) { lastMessageHashCode = System.identityHashCode(buf.current()); lastPendingWriteBytes = buf.totalPendingWriteBytes(); } } }
讀事件的 run 方法
代碼如下:
protected void run(ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; if (!reading) { nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. // 用於取消任務 promise readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false; try { // 再次提交任務 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); // 觸發用戶 handler use channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } }
nextDelay的初始化值為超時秒數readerIdleTimeNanos,如果檢測的時候沒有正在讀,且計算多久沒讀了:nextDelay -= 當前時間 - 上次讀取時間,如果小於0,說明左邊的readerIdleTimeNanos小於空閑時間(當前時間 - 上次讀取時間)了,則超時了
則創建IdleStateEvent事件,IdleState枚舉值為READER_IDLE,然后調用channelIdle方法分發給下一個ChannelInboundHandler,通常由用戶自定義一個ChannelInboundHandler來捕獲並處理
總的來說,每次讀取操作都會記錄一個時間,定時任務時間到了,會計算當前時間和最后一次讀的時間的間隔,如果間隔超過了設置的時間,就觸發 UserEventTriggered 方法。就是這么簡單。
寫事件的 run 方法
寫任務的邏輯基本和讀任務的邏輯一樣,唯一不同的就是有一個針對 出站較慢數據的判斷。
if (hasOutputChanged(ctx, first)) { return; }
如果這個方法返回 true,就不執行觸發事件操作了,即使時間到了。看看該方法實現:
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) { if (observeOutput) { // 如果最后一次寫的時間和上一次記錄的時間不一樣,說明寫操作進行過了,則更新此值 if (lastChangeCheckTimeStamp != lastWriteTime) { lastChangeCheckTimeStamp = lastWriteTime; // 但如果,在這個方法的調用間隙修改的,就仍然不觸發事件 if (!first) { // #firstWriterIdleEvent or #firstAllIdleEvent return true; } } Channel channel = ctx.channel(); Unsafe unsafe = channel.unsafe(); ChannelOutboundBuffer buf = unsafe.outboundBuffer(); // 如果出站區有數據 if (buf != null) { // 拿到出站緩沖區的 對象 hashcode int messageHashCode = System.identityHashCode(buf.current()); // 拿到這個 緩沖區的 所有字節 long pendingWriteBytes = buf.totalPendingWriteBytes(); // 如果和之前的不相等,或者字節數不同,說明,輸出有變化,將 "最后一個緩沖區引用" 和 “剩余字節數” 刷新 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) { lastMessageHashCode = messageHashCode; lastPendingWriteBytes = pendingWriteBytes; // 如果寫操作沒有進行過,則任務寫的慢,不觸發空閑事件 if (!first) { return true; } } } } return false; }
- 如果用戶沒有設置了需要觀察出站情況。就返回 false,繼續執行事件。
- 反之,繼續向下, 如果最后一次寫的時間和上一次記錄的時間不一樣,說明寫操作剛剛做過了,則更新此值,但仍然需要判斷這個 first 的值,如果這個值還是 false,說明在這個寫事件是在兩個方法調用間隙完成的 / 或者是第一次訪問這個方法,就仍然不觸發事件。
- 如果不滿足上面的條件,就取出緩沖區對象,如果緩沖區沒對象了,說明沒有發生寫的很慢的事件,就觸發空閑事件。反之,記錄當前緩沖區對象的 hashcode 和 剩余字節數,再和之前的比較,如果任意一個不相等,說明數據在變化,或者說數據在慢慢的寫出去。那么就更新這兩個值,留在下一次判斷。
- 繼續判斷 first ,如果是 fasle,說明這是第二次調用,就不用觸發空閑事件了。
所有事件的 run 方法
這個類叫做 AllIdleTimeoutTask ,表示這個監控着所有的事件。當讀寫事件發生時,都會記錄。代碼邏輯和寫事件的的基本一致,除了這里:
long nextDelay = allIdleTimeNanos; if (!reading) { // 當前時間減去 最后一次寫或讀 的時間 ,若大於0,說明超時了 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime); }
這里的時間計算是取讀寫事件中的最大值來的。然后像寫事件一樣,判斷是否發生了寫的慢的情況。最后調用 ctx.fireUserEventTriggered(evt) 方法。
通常這個使用的是最多的。構造方法一般是:
pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));
讀寫都是 0 表示禁用,30 表示 30 秒內沒有任務讀寫事件發生,就觸發事件。注意,當不是 0 的時候,這三個任務會重疊。
