ScheduledThreadPoolExecutor(定時任務、周期任務)和DelayQueue


1.ScheduledExecutorService介紹

Timer對應的是單個后台線程,ScheduledExecutorService可以在構造函數中指定多個核心線程數,並且其最大線程數默認為Integer.MAX_VALUE

對於希望某段時間后執行一次的定時任務和某段時間后周期執行(周期為兩次任務開始間隔時間-可能延遲,或者下次開始距離上次任務時間),可以使用ScheduledExecutorService來提交執行。

注意:

  1. 調用周期任務執行方法scheduleAtFixedRate如果任務有延遲,兩個任務是不能並行執行的,例如周期設置2s,任務執行時間3s,則周期實際表現為3s,不會出現上個任務執行到第2s時下一個任務開始並行執行;
  2. 核心線程池數量一般設置成要執行周期任務的數量,如1,設置多了浪費;
  3. ScheduledExecutorService最多可設置核心線程池、線程工廠和拒絕策略三個參數,最大線程數為Integer.MAX_VALUE,工作隊列使用DelayedWorkQueue,額外線程存活時間為0s,構造器源碼如下
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
2.周期任務使用代碼示例
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //如果要提交兩個周期任務並且耗時特別長,則可以使用兩個線程
        ScheduledExecutorService schedulePool= Executors.newScheduledThreadPool(2);

        //只執行一次:有返回值
        Callable cal=()->1;
        int res=(int)schedulePool.schedule(cal, 1, TimeUnit.SECONDS).get();

        Runnable task=()->System.out.println("dgenkui");
        //只執行一次
        schedulePool.schedule(task,5,TimeUnit.SECONDS);

        /**
         *
         * 創建並執行一個周期性操作,該操作在給定的初始延遲后首先啟用,隨后在給定的時間段內啟用;
         * 即執行將在initialDelay之后開始,然后是initialDelay + period,然后是initialDelay + 2 * period,依此類推。
         *  1.如果任務的任何執行遇到異常,則后續執行被禁止。 否則,任務將僅通過取消或終止執行者來終止。
         *  2.如果此任務的執行時間超過其周期,則后續執行可能會延遲,但不會同時執行。
         *
         */
        schedulePool.scheduleAtFixedRate(task,0,5, TimeUnit.SECONDS);

        /**
         * 同上也是周期動作。但周期是上一次任務結束和下一次任務開始的時間間隔
         * 如果任務的任何執行遇到異常,則后續執行被禁止。 否則,任務將僅通過取消或終止執行者來終止
         */
        schedulePool.scheduleWithFixedDelay(task,2,10,TimeUnit.SECONDS);
    }
3.DelayedWorkQueue延遲隊列
DelayedWorkQueue基本結構

延時任務使用的是靜態內部類DelayedWorkQueue,其任務放在數組中,存取都需要鎖

private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

//存放
public void put(Runnable e) {
    offer(e);
}
        
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
    return offer(e);
}

public boolean add(Runnable e) {
    return offer(e);
}

    public boolean offer(Runnable x) {
            //存入元素為null則拋異常
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            //獲取鎖
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                //釋放鎖
                lock.unlock();
            }
            return true;
        }

//取
        //不需要移除元素
        public RunnableScheduledFuture<?> peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return queue[0];
            } finally {
                lock.unlock();
            }
        }
2)延時隊列和定時任務線程池的交互
  • 當調用ScheduledThreadPoolExecutorscheduleXXX(Runnable command,long delay,TimeUnit unit)方法時,會向延時隊列添加了實現了RunnableScheduledFuture接口的ScheduledFutureTask。源碼如下:
//ScheduledThreadPoolExecutor
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
    ...判斷command是否為null和period是否為正...
        
    //構造含有觸發時間和周期信息的任務對象
    ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
    delayedExecute(sft);
    return sft;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {
    super.getQueue().add(task);
}

線程執行某個周期任務的流程如下:

  1. 獲取任務:線程從延時隊列DelayWorkQueue中獲取到期的任務ScheduledFutureTask
  2. 執行任務:線程執行ScheduledFutureTask並修改其time屬性(初始延時+周期);
  3. 將任務放回延時隊列中;

源碼如下:

        public RunnableScheduledFuture<?> take() throws InterruptedException{
            //獲取鎖
            final ReentrantLock lock = this.lock;//private final Condition available = lock.newCondition();
            lock.lockInterruptibly();
            try {
                //輪詢檢查延時隊列中的任務是否到期
                for (;;) {
                    //獲取隊頭元素
                    RunnableScheduledFuture<?> first = queue[0];
                    //如果元素為空,則放棄鎖等待,直到其他線程放進元素並喚醒此線程時,開始下一輪檢查
                    if (first == null)
                        available.await();
                    else {
                        //注意getDelay()是返回ScheduleFutureTask的time屬性和當前時間的納秒差值
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)//time-now()<=0標示到達了開始執行任務的時間,則返回任務
                            return finishPoll(first);
                        //否則在等待一段時間
                        else {
                            available.awaitNanos(delay);
                        }
                    }
                }
            } finally {
                //如果隊列中含有任務元素,則喚醒其他等待獲取任務的線程,並放棄鎖
                if (queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
        

//執行任務
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                //執行任務
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();//設置下次執行時間
                reExecutePeriodic(outerTask);//重新返回延時隊列
            }
        }
//設置下次執行時間
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
        }
//放回延時隊列
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
3)ScheduledFutureTask

ScheduledFutureTask是延時隊列中的任務元素,主要變量有:

  1. long time:第一次執行的延時;
  2. long sequenceNumber:任務被添加到ScheduledThreadPoolExecutor中的序號;
  3. long period:周期;

隊列中的任務會先比較time,如果相同則比較sequenceNumber,都是小的先執行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM