Dubbo心跳機制


 

 

前言
長連接和短連接
  • 短連接:每次通信結束后關閉連接,下次通信需要重新創建連接;優點就是無需管理連接,無需保活連接;
  • 長連接:每次通信結束不關閉連接,連接可以復用,保證了性能;缺點就是連接需要統一管理,並且需要保活;
主流的RPC框架都會追求性能選擇使用長連接,所以如何保活連接就是一個重要的話題,也是本文的主題,下面會重點介紹一些保活策略;
 
為什么需要保活
上面介紹的長連接、短連接並不是TCP提供的功能,所以長連接是需要應用端自己來實現的,包括:連接的統一管理,如何保活等;如何保活之前我們了解一下為什么需要保活?主要原因是網絡不是100%可靠的,我們創建好的連接可能由於網絡原因導致連接已經不可用了,如果連接一直有消息往來,那么系統馬上可以感知到連接斷開;但是我們系統可能長時間沒有消息來往,導致系統不能及時感知到連接不可用,也就是不能及時處理重連或者釋放連接; 常見的保活策略使用心跳機制由應用層來實現,還有網絡層提供的TCP Keepalive保活探測機制
 
TCP Keepalive機制
TCP Keepalive是操作系統實現的功能,並不是TCP協議的一部分,需要在操作系統下進行相關配置,開啟此功能后,如果連接在一段時間內沒有數據往來,TCP將發送Keepalive探針來確認連接的可用性,Keepalive幾個內核參數配置:
  • tcp_keepalive_time:連接多長時間沒有數據往來發送探針請求,默認為7200s(2h);
  • tcp_keepalive_probes:探測失敗重試的次數默認為10次;
  • tcp_keepalive_intvl:重試的間隔時間默認75s;
以上參數可以修改到/etc/sysctl.conf文件中;是否使用Keepalive用來保活就夠了,其實還不夠,Keepalive只是在網絡層就行保活,如果網絡本身沒有問題,但是系統由於其他原因已經不可用了,這時候Keepalive並不能發現;所以往往還需要結合心跳機制來一起使用;
 
一、心跳機制
如何理解應用層的心跳?簡單的說,就是客戶端會開啟一個定時任務,定時對已經建立連接的對端應用發送請求,服務端則需要特殊處理該請求,返回響應。如果心跳持續多次沒有收到響應,客戶端會認為連接不可用,主動斷開連接。
dubbo心跳時間heartbeat默認是 60s,超過heartbeat時間沒有收到消息,就發送心跳消息(provider,consumer一樣),如果連着3次(heartbeatTimeout為heartbeat*3)沒有收到心跳響應,provider會關閉channel,而consumer會進行重連;不論是provider還是consumer的心跳檢測都是通過啟動定時任務的方式實現;
目的:維持provider和consumer之間的長連接
 
客戶端如何得知請求失敗了?
在失敗的場景下,服務端是不會返回響應的,所以只能在客戶端自身上設計了。
當客戶端發起一個RPC請求時,會設置一個超時時間client_timeout,同時它也會開啟一個延遲的client_timeout的定時器。當接收到正常響應時,會移除該定時器;而當計時器倒計時完畢后,還沒有被移除,則會認為請求超時,構造一個失敗的響應傳遞給客戶端
 
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
    if (client == null) {
        throw new IllegalArgumentException("client == null");
    }
    this.client = client;
    // 創建信息交換通道
    this.channel = new HeaderExchangeChannel(client);
    // 獲得dubbo版本
    String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
    // 獲得心跳周期配置,如果沒有配置,並且dubbo是1.0版本的,則這只為1分鍾,否則設置為0
    this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
    // 獲得心跳超時配置,默認是心跳周期的三倍
    this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
    if (heartbeatTimeout < heartbeat * 2) {
        throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
    }
    // 開啟心跳
    if (needHeartbeat) {
        startHeatbeatTimer();
    }
}

創建了一個HashedWheelTimer開啟心跳檢測,這是 Netty 所提供的一個經典的時間輪定時器實現。HeaderExchangeServer也同時開啟了定時器,代碼邏輯和上述差不多

 
private void startHeatbeatTimer() {
    stopHeartbeatTimer();
    if (heartbeat > 0) {
        heartbeatTimer = scheduled.scheduleWithFixedDelay(
            new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                public Collection<Channel> getChannels() {
                    return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                }
            }, heartbeat, heartbeatTimeout),
            heartbeat, heartbeat, TimeUnit.MILLISECONDS);
    }
}

 

final class HeartBeatTask implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
    // 通道管理
    private ChannelProvider channelProvider;
    // 心跳間隔 單位:ms
    private int heartbeat;
    // 心跳超時時間 單位:ms
    private int heartbeatTimeout;

    HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
        this.channelProvider = provider;
        this.heartbeat = heartbeat;
        this.heartbeatTimeout = heartbeatTimeout;
    }

    public void run() {
        try {
            long now = System.currentTimeMillis();
            // 遍歷所有通道
            for (Channel channel : channelProvider.getChannels()) {
                // 如果通道關閉了,則跳過
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    // 最后一次接收到消息的時間戳
                    Long lastRead = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                    // 最后一次發送消息的時間戳
                    Long lastWrite = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                    // 如果最后一次接收或者發送消息到時間到現在的時間間隔超過了心跳間隔時間
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        // 創建一個request
                        Request req = new Request();
                        // 設置版本號
                        req.setVersion("2.0.0");
                        // 設置需要得到響應
                        req.setTwoWay(true);
                        // 設置事件類型,為心跳事件
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        // 發送心跳請求
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " 
                                         + heartbeat + "ms");
                        }
                    }
                    // 如果最后一次接收消息的時間到現在已經超過了超時時間
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        // 如果該通道是客戶端,也就是請求的服務器掛掉了,客戶端嘗試重連服務器
                        if (channel instanceof Client) {
                            try {
                                // 重新連接服務器
                                ((Client) channel).reconnect();
                            } catch (Exception e) {
                                //do nothing
                            }
                        } else {
                            // 如果不是客戶端,也就是是服務端返回響應給客戶端,但是客戶端掛掉了,則服務端關閉客戶端連接
                            channel.close();
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }

    interface ChannelProvider {
        // 獲得所有的通道集合,需要心跳的通道數組
        Collection<Channel> getChannels();
    }

}

它首先遍歷所有的Channel,在服務端對用的是所有客戶端連接,在客戶端對應的是服務端連接,判斷當前TCP連接是否空閑,如果空閑就發送心跳報文,判斷是否空閑,根據Channel是否有讀或寫來決定,比如一分鍾內沒有讀或寫就發送心跳報文,然后是處理超時的問題,處理客戶端超時重新建立TCP連接,目前的策略是檢查是否在3分鍾內都沒有成功接受或發送報文,如果在服務端檢測則就會主動關閉遠程客戶端連接。

 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM