分析dubbo心跳檢測機制


  • 目的: 
  • 維持provider和consumer之間的長連接
  • 實現: 
  • dubbo心跳時間heartbeat默認是60s,超過heartbeat時間沒有收到消息,就發送心跳消息(provider,consumer一樣),如果連着3次(heartbeatTimeout為heartbeat*3)沒有收到心跳響應,provider會關閉channel,而consumer會進行重連;不論是provider還是consumer的心跳檢測都是通過啟動定時任務的方式實現;
  • provider綁定和consumer連接的入口:
public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}
  • provider啟動心跳檢測:
public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        //心跳超時時間默認為心跳時間的3倍
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        //如果心跳超時時間小於心跳時間的兩倍則拋異常
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        startHeatbeatTimer();
    }
  • startHeatbeatTimer的實現 
  • 先停止已有的定時任務,啟動新的定時任務:
private void startHeatbeatTimer() {
        // 停止原有定時任務
        stopHeartbeatTimer();
        // 發起新的定時任務
        if (heartbeat > 0) {
            heatbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        public Collection<Channel> getChannels() {
                            return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);
        }
    }
  • HeartBeatTask的實現 
  • 遍歷所有的channel,檢測心跳間隔,如果超過心跳間隔沒有讀或寫,則發送需要回復的心跳消息,最有判斷是否心跳超時(heartbeatTimeout),如果超時,provider關閉channel,consumer進行重連
public void run() {
        try {
            long now = System.currentTimeMillis();
            for (Channel channel : channelProvider.getChannels()) {
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                    Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                    // 讀寫的時間,任一超過心跳間隔,發送心跳
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        Request req = new Request();
                        req.setVersion("2.0.0");
                        req.setTwoWay(true); // 需要響應的心跳事件
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                        }
                    }
                    // 最后讀的時間,超過心跳超時時間
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        // 客戶端側,重新連接服務端
                        if (channel instanceof Client) {
                            try {
                                ((Client) channel).reconnect();
                            } catch (Exception e) {
                                //do nothing
                            }
                        // 服務端側,關閉客戶端連接
                        } else {
                            channel.close();
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }
  • consumer端的實現 
  • 默認需要心跳檢測
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        // 創建 HeaderExchangeChannel 對象
        this.channel = new HeaderExchangeChannel(client);
        // 讀取心跳相關配置
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) { // 避免間隔太短
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        // 發起心跳定時器
        if (needHeartbeat) {
            startHeatbeatTimer();
        }
    }

我們可以看到dubbo的心跳檢測,服務端會發送發送心跳包,客戶端也會發送心跳包,與一般只有客戶端發送心跳包,服務端接受心跳是有所不同的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM