ScheduledThreadPoolExecutor使用及原理


看到alibaba的nacos注冊中心中client端用這個作為心跳任務工具
BeatReactor類中
executorService spring 管理。

      this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.beat.sender");
                return thread;
            }
        });
threadCount 取值UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT
DEFAULT_CLIENT_BEAT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() / 2 : 1;

ThreadFactory定義,線程池中的線程為守護線程。thread.setDaemon(true);
守護線程:
定義:守護線程--也稱“服務線程”,在沒有用戶線程可服務時會自動離開。
優先級:守護線程的優先級比較低,用於為系統中的其它對象和線程提供服務。
設置:通過setDaemon(true)來設置線程為“守護線程”;將一個用戶線程設置為
守護線程的方式是在 線程對象創建 之前 用線程對象的setDaemon方法。

調用

// 注冊時新建任務
this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); 5秒后執行一次。
static {
      DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L); // 心跳超時時間
      DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L); // 刪除注冊服務提供者時間
      DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5L); // 心跳調用服務提供者時間
}
beatInfo.getPeriod()為心跳時間5秒

ScheduledThreadPoolExecutor其底層數據結果是小根堆結構。
堆結果是完全二叉樹,分為大根堆和小根堆,大根堆,父節點大於兩個子節點。小根堆,父節點小於兩個子節點。
因此每次循環都是最先到時間的任務。

new BeatReactor.BeatTask(beatInfo) 每次執行完就重新新建任務

class BeatTask implements Runnable {
        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        public void run() {
            if (!this.beatInfo.isStopped()) {
                long nextTime = this.beatInfo.getPeriod();

                try {
                    JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
                    long interval = result.get("clientBeatInterval").asLong();
                    boolean lightBeatEnabled = false;
                    if (result.has("lightBeatEnabled")) {
                        lightBeatEnabled = result.get("lightBeatEnabled").asBoolean();
                    }

                    BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                    if (interval > 0L) {
                        nextTime = interval;
                    }

                    int code = 10200;
                    if (result.has("code")) {
                        code = result.get("code").asInt();
                    }

                    if (code == 20404) {
                        Instance instance = new Instance();
                        instance.setPort(this.beatInfo.getPort());
                        instance.setIp(this.beatInfo.getIp());
                        instance.setWeight(this.beatInfo.getWeight());
                        instance.setMetadata(this.beatInfo.getMetadata());
                        instance.setClusterName(this.beatInfo.getCluster());
                        instance.setServiceName(this.beatInfo.getServiceName());
                        instance.setInstanceId(instance.getInstanceId());
                        instance.setEphemeral(true);

                        try {
                            BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
                        } catch (Exception var10) {
                        }
                    }
                } catch (NacosException var11) {
                    LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", new Object[]{JacksonUtils.toJson(this.beatInfo), var11.getErrCode(), var11.getErrMsg()});
                }
                // 新創建任務
                BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
            }
        }
    }

take方法(延遲任務實現)

take方法通過lock來實現任務獲取的線程安全性,通過一個死循環來使申請任務的工作線程等待,直到獲取到任務。獲取任務可以分為以下幾步:

獲取鎖。
若隊列為空,則當前工作線程等待,否則進行下一步。
獲取隊列頭結點,判斷該節點任務的延遲時間是否還有剩余,時間到則返回該任務節點並調整堆,否則執行下一步。
判斷leader是否為空,為空說明該任務已被其它工作線程獲取,陷入等待,否則執行下一步。
等任務執行時間到,然后獲得該任務,直接執行。
喚醒下一個等待的工作線程來獲取任務,釋放鎖。

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {            // 死循環,直到獲得任務為止
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();    // 隊列為空,工作線程等待
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)                  // 任務執行時間到了  
                            return finishPoll(first);    // 任務出隊,然后調整堆
                        // 任務執行時間未到
                        first = null; // don't retain ref while waiting
                        if (leader != null)      // 該任務已被其它工作線程鎖定,等待
                            available.await();
                        else {                    // 可以獲取任務
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 等任務執行時間到,然后執行
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();                // 喚醒其它工作線程來獲取任務
                lock.unlock();
            }
        }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM