ribbon中,發現一段比較有水平的代碼.md


ribbon中,發現一段比較有水平的代碼

簡介

有個類,com.netflix.loadbalancer.PollingServerListUpdater,主要功能是,獲取服務的instance列表。

這也是ribbon中的核心代碼。

public class PollingServerListUpdater implements ServerListUpdater {

該接口的功能,給大家看下:

public interface ServerListUpdater {

    /**
     * 1 an interface for the updateAction that actually executes a server list update
     */
    public interface UpdateAction {
        void doUpdate();
    }


    /**
     * start the serverList updater with the given update action
     * This call should be idempotent.
     *
     * @param updateAction
     */
    void start(UpdateAction updateAction);

    /**
     * stop the serverList updater. This call should be idempotent
     */
    void stop();

    /**
     * @return the last update timestamp as a {@link java.util.Date} string
     */
    String getLastUpdate();

    /**
     * @return the number of ms that has elapsed since last update
     */
    long getDurationSinceLastUpdateMs();

    /**
     * @return the number of update cycles missed, if valid
     */
    int getNumberMissedCycles();

    /**
     * @return the number of threads used, if vaid
     */
    int getCoreThreads();
}

其實這個接口的幾個方法,主要就是,維護了一個線程池,然后定期去調用1處的UpdateAction,獲取最新的服務列表,比如調用eureka client的方法,去eureka server獲取服務列表。

靜態類單例模式

下面這個方法,獲取線程池,里面用了一個靜態內部類:

    private static ScheduledThreadPoolExecutor getRefreshExecutor() {
        return LazyHolder._serverListRefreshExecutor;
    }
com.netflix.loadbalancer.PollingServerListUpdater.LazyHolder

private static class LazyHolder {
        private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
  
        private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
  
        private static Thread _shutdownThread;

        static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;

        static {
        	// 1
            int coreSize = poolSizeProp.get();
            ThreadFactory factory = (new ThreadFactoryBuilder())
                    .setNameFormat("PollingServerListUpdater-%d")
                    .setDaemon(true)
                    .build();
            // 2        
            _serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
            // 3
            poolSizeProp.addCallback(new Runnable() {
                @Override
                public void run() {
                	// 4
                    _serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
                }

            });
            // 5
            _shutdownThread = new Thread(new Runnable() {
                public void run() {
                    logger.info("Shutting down the Executor Pool for PollingServerListUpdater");
                    shutdownExecutorPool();
                }
            });
            Runtime.getRuntime().addShutdownHook(_shutdownThread);
        }

        private static void shutdownExecutorPool() {
            if (_serverListRefreshExecutor != null) {
                _serverListRefreshExecutor.shutdown();

                if (_shutdownThread != null) {
                    try {
                        Runtime.getRuntime().removeShutdownHook(_shutdownThread);
                    } catch (IllegalStateException ise) { // NOPMD
                        // this can happen if we're in the middle of a real
                        // shutdown,
                        // and that's 'ok'
                    }
                }

            }
        }
    }
  • 1處,獲取要構造的線程池的core線程的數量

  • 2處,new了一個線程池,使用的就是jdk里面的java.util.concurrent.ScheduledThreadPoolExecutor

  • 3處,這里的poolSizeProp,可以通過Archaius來進行修改,這個Archaius聽說挺強的,但是沒用過,可以理解為一個配置中心吧,可以通過Archaius修改線程池的線程數量,這里加了個回調,意思是,如果線程池數量變更了,調用這個回調方法

  • 4處,這里直接調用了ScheduledThreadPoolExecutor的setCorePoolSize方法,大家可以看看下面的代碼:

       java.util.concurrent.ThreadPoolExecutor#setCorePoolSize
    	public void setCorePoolSize(int corePoolSize) {
            if (corePoolSize < 0)
                throw new IllegalArgumentException();
            int delta = corePoolSize - this.corePoolSize;
            // 4.1
            this.corePoolSize = corePoolSize;
            if (workerCountOf(ctl.get()) > corePoolSize)
                interruptIdleWorkers();
            else if (delta > 0) {
                // We don't really know how many new threads are "needed".
                // As a heuristic, prestart enough new workers (up to new
                // core size) to handle the current number of tasks in
                // queue, but stop if queue becomes empty while doing so.
                int k = Math.min(delta, workQueue.size());
                while (k-- > 0 && addWorker(null, true)) {
                    if (workQueue.isEmpty())
                        break;
                }
            }
        }
    

    看看,4.1,這里對field:corePoolSize進行了修改。

    private volatile int corePoolSize;
    

    那,大家明白為啥要定義成volatile了嗎?

  • 5處,添加一個ShutdownHook,進程關閉時(kill 9不行,要kill 15),會回調該ShutdownHook,關閉線程池。

線程池開始獲取服務列表

	private final AtomicBoolean isActive = new AtomicBoolean(false);
	// 1
    private volatile long lastUpdated = System.currentTimeMillis();
    private final long initialDelayMs;
    private final long refreshIntervalMs;

    private volatile ScheduledFuture<?> scheduledFuture;

	@Override
    public synchronized void start(final UpdateAction updateAction) {
      	// 2
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                  	// 3
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                          	// 4
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                      	// 5
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };
			// 6
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }
  • 1處,需要跨線程訪問的,該加volatile就加
  • 2處,使用原子類,避免多次被執行
  • 3處,使用原子boolean作為開關,方便線程及時停止運行
  • 4處,停止運行時,不忘記關閉定時任務,秀。
  • 5處,可以算作是策略或者模板方法模式吧
  • 6處,新建線程池,重點是保存返回的future

線程池如何停止

 @Override
    public synchronized void stop() {
        if (isActive.compareAndSet(true, false)) {
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
        } else {
            logger.info("Not active, no-op");
        }
    }

其他幾個方法實現


    @Override
    public String getLastUpdate() {
        return new Date(lastUpdated).toString();
    }

    @Override
    public long getDurationSinceLastUpdateMs() {
        return System.currentTimeMillis() - lastUpdated;
    }

    @Override
    public int getNumberMissedCycles() {
        if (!isActive.get()) {
            return 0;
        }
        return (int) ((int) (System.currentTimeMillis() - lastUpdated) / refreshIntervalMs);
    }

    @Override
    public int getCoreThreads() {
        if (isActive.get()) {
            if (getRefreshExecutor() != null) {
                return getRefreshExecutor().getCorePoolSize();
            }
        }
        return 0;
    }

    private static long getRefreshIntervalMs(IClientConfig clientConfig) {
        return clientConfig.get(CommonClientConfigKey.ServerListRefreshInterval, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM