dubbo 心跳


HeartBeatTask 類封裝了心跳定時任務,需要了解的是 provider 和 consumer 都有可能發送心跳。

final class HeartBeatTask implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger( HeartBeatTask.class );
    private ChannelProvider channelProvider;
    private int             heartbeat;
    private int             heartbeatTimeout;

    HeartBeatTask( ChannelProvider provider, int heartbeat, int heartbeatTimeout ) {
        this.channelProvider = provider;
        this.heartbeat = heartbeat;
        this.heartbeatTimeout = heartbeatTimeout;
    }

    public void run() {
        try {
            long now = System.currentTimeMillis();
            for ( Channel channel : channelProvider.getChannels() ) {
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    Long lastRead = ( Long ) channel.getAttribute(
                            HeaderExchangeHandler.KEY_READ_TIMESTAMP );
                    Long lastWrite = ( Long ) channel.getAttribute(
                            HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );
                    if ( ( lastRead != null && now - lastRead > heartbeat )
                            || ( lastWrite != null && now - lastWrite > heartbeat ) ) {
                        Request req = new Request();
                        req.setVersion( "2.0.0" );
                        req.setTwoWay( true );
                        req.setEvent( Request.HEARTBEAT_EVENT );
                        channel.send( req );                  
                    }
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        //如果是 consumer 端
                        if (channel instanceof Client) {
                           ((Client)channel).reconnect();
                        } else { // provider
                            channel.close();
                        }
                    }
                } catch ( Throwable t ) {
                }
            }
        } catch ( Throwable t ) {
            logger.warn( "Unhandled exception when heartbeat, cause: " + t.getMessage(), t );
        }
    }

    interface ChannelProvider {
        Collection<Channel> getChannels();
    }

}

對於 consumer,是在 HeaderExchangeClient 類中啟動心跳定時器,而 provider,則是在 HeaderExchangeServer 中啟動心跳定時器。

consumer發送請求時,更新 lastWrite 值,接收響應時,更新 lastRead 值。心跳定時器定時檢查 lastRead 和 lastWrite,發送心跳、重連。

public class HeaderExchangeClient implements ExchangeClient {
    private static final ScheduledThreadPoolExecutor scheduled = 
        new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
    // 心跳定時器
    private ScheduledFuture<?> heatbeatTimer;
    private int heartbeat;
    private int heartbeatTimeout;
    
    public HeaderExchangeClient(Client client){
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        this.channel = new HeaderExchangeChannel(client);
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        //heartbeat = 60000
        this.heartbeat = client.getUrl().getParameter( Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0 );
        //heartbeatTimeout = 180000
        this.heartbeatTimeout = client.getUrl().getParameter( Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3 );
        if ( heartbeatTimeout < heartbeat * 2 ) {
            throw new IllegalStateException( "heartbeatTimeout < heartbeatInterval * 2" );
        }
        startHeatbeatTimer();
    }

    public ResponseFuture request(Object request) throws RemotingException {
        return channel.request(request);
    }
    
    private void startHeatbeatTimer() {
        stopHeartbeatTimer();
        if ( heartbeat > 0 ) {
            heatbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
                        public Collection<Channel> getChannels() {
                            return Collections.<Channel>singletonList( HeaderExchangeClient.this );
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS );
        }
    }
}

在 HeartbeatHandler 類中設置 lastRead 和 lastWrite 值:

public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
    //省略其他代碼
    private void setReadTimestamp(Channel channel) {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    }

    private void setWriteTimestamp(Channel channel) {
        channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
    }
}

設置 lastWrite 的調用棧:

設置 lastRead 的調用棧:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM