寫在開頭,zk客戶端、服務器對負載比較敏感,對於類似大數據處理的應用,zk心跳時間設置和監測很關鍵,否則非常容易系統不穩定,建議可能長時間高負載導致GC時間過長的非OLTP的盡量不使用zk或rpc,而是使用MQ或HTTP。
dubbo consumer和provider的心跳機制
dubbo客戶端和dubbo服務端之間存在心跳,目的是維持provider和consumer之間的長連接。由dubbo客戶端主動發起,可參見dubbo源碼 HeartbeatTask。dubbo心跳時間heartbeat默認是60s,超過heartbeat時間沒有收到消息,就發送心跳消息(provider,consumer一樣),如果連着3次(heartbeatTimeout為heartbeat*3)沒有收到心跳響應(所以如果是批處理的話,很可能就會無響應導致被踢掉(例如gc時間超過1分鍾,亦或是並發過高,cpu長時間100%使得心跳線程無法被調度),此時就需要加長超時次數或心跳值(我司用的是改過的版本,超時時間默認15秒,所以LZ改源碼讀application.properties配置自己實現了)【本質上,rpc長連接不適合於服務需要長時間完成的場景,只不過歷史問題,應該輪詢或發消息】),provider會關閉channel,而consumer會進行重連;不論是provider還是consumer的心跳檢測都是通過啟動定時任務的方式實現。
- provider綁定和consumer連接的入口:
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
- provider啟動心跳檢測
public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); //心跳超時時間默認為心跳時間的3倍 this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); //如果心跳超時時間小於心跳時間的兩倍則拋異常 if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } startHeatbeatTimer(); }
- startHeatbeatTimer的實現
- 先停止已有的定時任務,啟動新的定時任務
private void startHeatbeatTimer() { // 停止原有定時任務 stopHeartbeatTimer(); // 發起新的定時任務 if (heartbeat > 0) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { public Collection<Channel> getChannels() { return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }
- HeartBeatTask的實現
- 遍歷所有的channel,檢測心跳間隔,如果超過心跳間隔沒有讀或寫,則發送需要回復的心跳消息,最有判斷是否心跳超時(heartbeatTimeout),如果超時,provider關閉channel,consumer進行重連
public void run() { try { long now = System.currentTimeMillis(); for (Channel channel : channelProvider.getChannels()) { if (channel.isClosed()) { continue; } try { Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP); Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 讀寫的時間,任一超過心跳間隔,發送心跳 if ((lastRead != null && now - lastRead > heartbeat) || (lastWrite != null && now - lastWrite > heartbeat)) { Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); // 需要響應的心跳事件 req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); } } // 最后讀的時間,超過心跳超時時間 if (lastRead != null && now - lastRead > heartbeatTimeout) { logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); // 客戶端側,重新連接服務端 if (channel instanceof Client) { try { ((Client) channel).reconnect(); } catch (Exception e) { //do nothing } // 服務端側,關閉客戶端連接 } else { channel.close(); } } } catch (Throwable t) { logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); } } } catch (Throwable t) { logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); } }
- consumer端的實現
- 默認需要心跳檢測
public HeaderExchangeClient(Client client, boolean needHeartbeat) { if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client; // 創建 HeaderExchangeChannel 對象 this.channel = new HeaderExchangeChannel(client); // 讀取心跳相關配置 String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { // 避免間隔太短 throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } // 發起心跳定時器 if (needHeartbeat) { startHeatbeatTimer(); }
dubbo客戶端/服務端和注冊中心(zk)存在心跳
由dubbo客戶端或服務端發起,這是基於zk集群和zk客戶端之間的心跳機制。由zk服務器參數tickTime(這個時間是作為Zookeeper服務器之間或客戶端與服務器之間維持心跳的時間間隔,每隔tickTime時間就會發送一個心跳;最小的session過期時間為2倍tickTime)控制間隔,但是實際情況是我們發現心跳間隔是tickTime的1/2(此例中服務器太忙,以至於zk客戶端沒有及時給服務器發心跳),如下:
[] 2019-08-07 14:51:46 [5189311] [ERROR] Curator-Framework-0 org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:200) Connection timed out for connection string (localhost:2181) and timeout (5000) / elapsed (27053) org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) [curator-client-2.10.0.jar!/:?] at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:88) [curator-client-2.10.0.jar!/:?] at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:116) [curator-client-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835) [curator-framework-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) [curator-framework-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) [curator-framework-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) [curator-framework-2.10.0.jar!/:?] at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_211] at java.lang.Thread.run(Unknown Source) [?:1.8.0_211] [] 2019-08-07 14:51:46 [5190082] [WARN] Curator-Framework-0-SendThread(0:0:0:0:0:0:0:1:2181) org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1102) Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_211] at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[?:1.8.0_211] at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) ~[zookeeper-3.4.6.jar!/:3.4.6-1569965] at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) [zookeeper-3.4.6.jar!/:3.4.6-1569965] [] 2019-08-07 14:51:47 [5190312] [ERROR] Curator-Framework-0 org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:200) Connection timed out for connection string (localhost:2181) and timeout (5000) / elapsed (28054) org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) [curator-client-2.10.0.jar!/:?] at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:88) [curator-client-2.10.0.jar!/:?] at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:116) [curator-client-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835) [curator-framework-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) [curator-framework-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) [curator-framework-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) [curator-framework-2.10.0.jar!/:?] at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_211] at java.lang.Thread.run(Unknown Source) [?:1.8.0_211] [] 2019-08-07 14:51:47 [5191182] [INFO] Curator-Framework-0-SendThread(k3ctest.yidooo.com:2181) org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect(ClientCnxn.java:975) Opening socket connection to server k3ctest.yidooo.com/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) [] 2019-08-07 14:51:48 [5191314] [ERROR] Curator-Framework-0 org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:200) Connection timed out for connection string (localhost:2181) and timeout (5000) / elapsed (29056) org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) [curator-client-2.10.0.jar!/:?] at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:88) [curator-client-2.10.0.jar!/:?] at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:116) [curator-client-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835) [curator-framework-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) [curator-framework-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) [curator-framework-2.10.0.jar!/:?] at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) [curator-framework-2.10.0.jar!/:?] at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_211] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_211] at java.lang.Thread.run(Unknown Source) [?:1.8.0_211]
在啟動的時候,是可以看到客戶端設置的超時時間及和服務端協商后確定的超時時間。如下:
21:28:57.442 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@5afd2f4e 21:28:57.475 [main-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 21:28:57.477 [main-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 21:28:57.508 [main-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x10025cfdf590000, negotiated timeout = 60000 21:28:57.515 [main-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
zk集群節點間的同步時間控制
(1)initLimit
此配置表示,允許follower(相對於Leader言的“客戶端”)連接並同步到Leader的初始化連接時間,以tickTime(這是最基礎的參數,設置了所有時間相關的參數的基本單位)為單位。當初始化連接時間超過該值,則表示連接失敗。
(2)syncLimit
此配置項表示Leader與Follower之間發送消息時,請求和應答時間長度。如果follower在設置時間內不能與leader通信,那么此follower將會被丟棄。
客戶端的會話超時時間
對於會話的超時時間,客戶端將sessionTimeout的值傳給zk時,zk還會根據minSessionTimeout(默認為tickTime的2倍)與maxSessionTimeout(默認為tickTime的20倍)兩個參數重新調整最后的超時值,所以默認40秒就會超時了,所以對於超長的問題例如gc導致服務器STW時間較長,應確保一次gc的STW時間小於等於tickTime,可以極大的緩解zk重連,也可以考慮加大maxSessionTimeout和minSessionTimeout。
public int getMinSessionTimeout() { return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout; } public int getMaxSessionTimeout() { return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout; }
int minSessionTimeout = zk.getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = zk.getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; }
這倆參數的含義如下:
minSessionTimeout
(No Java system property)
New in 3.3.0: the minimum session timeout in milliseconds that the server will allow the client to negotiate. Defaults to 2 times the tickTime.
maxSessionTimeout
(No Java system property)
New in 3.3.0: the maximum session timeout in milliseconds that the server will allow the client to negotiate. Defaults to 20 times the tickTime.
最后要知道Leader節點是單點的,負責所有事務的協調,如果leader掛掉,需要知道它如何被重新選舉出,可以參考:zookeeper核心原理詳解。