- 目的:
- 維持provider和consumer之間的長連接
- 實現:
- dubbo心跳時間heartbeat默認是60s,超過heartbeat時間沒有收到消息,就發送心跳消息(provider,consumer一樣),如果連着3次(heartbeatTimeout為heartbeat*3)沒有收到心跳響應,provider會關閉channel,而consumer會進行重連;不論是provider還是consumer的心跳檢測都是通過啟動定時任務的方式實現;
- provider綁定和consumer連接的入口:
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
- provider啟動心跳檢測:
public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); //心跳超時時間默認為心跳時間的3倍 this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); //如果心跳超時時間小於心跳時間的兩倍則拋異常 if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } startHeatbeatTimer(); }
- startHeatbeatTimer的實現
- 先停止已有的定時任務,啟動新的定時任務:
private void startHeatbeatTimer() { // 停止原有定時任務 stopHeartbeatTimer(); // 發起新的定時任務 if (heartbeat > 0) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { public Collection<Channel> getChannels() { return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }
- HeartBeatTask的實現
- 遍歷所有的channel,檢測心跳間隔,如果超過心跳間隔沒有讀或寫,則發送需要回復的心跳消息,最有判斷是否心跳超時(heartbeatTimeout),如果超時,provider關閉channel,consumer進行重連
public void run() { try { long now = System.currentTimeMillis(); for (Channel channel : channelProvider.getChannels()) { if (channel.isClosed()) { continue; } try { Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP); Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 讀寫的時間,任一超過心跳間隔,發送心跳 if ((lastRead != null && now - lastRead > heartbeat) || (lastWrite != null && now - lastWrite > heartbeat)) { Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); // 需要響應的心跳事件 req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); } } // 最后讀的時間,超過心跳超時時間 if (lastRead != null && now - lastRead > heartbeatTimeout) { logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); // 客戶端側,重新連接服務端 if (channel instanceof Client) { try { ((Client) channel).reconnect(); } catch (Exception e) { //do nothing } // 服務端側,關閉客戶端連接 } else { channel.close(); } } } catch (Throwable t) { logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); } } } catch (Throwable t) { logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); } }
- consumer端的實現
- 默認需要心跳檢測
public HeaderExchangeClient(Client client, boolean needHeartbeat) { if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client; // 創建 HeaderExchangeChannel 對象 this.channel = new HeaderExchangeChannel(client); // 讀取心跳相關配置 String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { // 避免間隔太短 throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } // 發起心跳定時器 if (needHeartbeat) { startHeatbeatTimer(); } }
我們可以看到dubbo的心跳檢測,服務端會發送發送心跳包,客戶端也會發送心跳包,與一般只有客戶端發送心跳包,服務端接受心跳是有所不同的。