《java.util.concurrent 包源碼閱讀》15 線程池系列之ScheduledThreadPoolExecutor 第二部分


這篇文章主要說說DelayedWorkQueue。

在ScheduledThreadPoolExecutor使用DelayedWorkQueue來存放要執行的任務,因為這些任務是帶有延遲的,而每次執行都是取第一個任務執行,因此在DelayedWorkQueue中任務必然按照延遲時間從短到長來進行排序的。

DelayedWorkQueue使用堆來實現的。

和以前分析BlockingQueue的實現類一樣,首先來看offer方法,基本就是一個添加元素到堆的邏輯。

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                // 因為元素時存儲在一個數組中,隨着堆變大,當數組存儲不夠時,需要對數組擴容
                if (i >= queue.length)
                    grow();
                size = i + 1;
                // 如果原來隊列為空
                if (i == 0) {
                    queue[0] = e;

                    // 這個i就是RunnableScheduledFuture用到的heapIndex
                    setIndex(e, 0);
                } else {
                    // 添加元素到堆中
                    siftUp(i, e);
                }
                // 如果隊列原先為空,那么可能有線程在等待元素,這時候既然添加了元
                // 素,就需要通過Condition通知這些線程
                if (queue[0] == e) {
                    // 因為有元素新添加了,第一個等待的線程可以結束等待了,因此這里
                    // 刪除第一個等待線程
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

這里順帶看一下siftUp,熟悉堆的實現的朋友應該很容易看懂這是一個把元素添加已有堆中的算法。

        private void siftUp(int k, RunnableScheduledFuture key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

那么接着就看看poll:

        public RunnableScheduledFuture poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 因為即使拿到任務,線程還是需要等待,而這個等待過程是由隊列幫助完成的
                // 因此poll方法只能返回已經到執行時間點的任務
                RunnableScheduledFuture first = queue[0];
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                    return null;
                else
                    return finishPoll(first);
            } finally {
                lock.unlock();
            }
        }

因為poll方法只能返回已經到了執行時間點的任務,所以對於我們理解隊列如何實現延遲執行沒有意義,因此重點看看take方法:

        public RunnableScheduledFuture take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    // 嘗試獲取第一個元素,如果隊列為空就進入等待
                    RunnableScheduledFuture first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        // 獲取任務執行的延遲時間
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        // 如果任務不用等待,立刻返回該任務給線程
                        if (delay <= 0)
                            // 從堆中拿走任務
                            return finishPoll(first);
                        // 如果任務需要等待,而且前面有個線程已經等待執行任務(leader線程
                        // 已經拿到任務了,但是執行時間沒有到,延遲時間肯定是最短的),
                        // 那么執行take的線程肯定繼續等待,
                        else if (leader != null)
                            available.await();
                        // 當前線程的延遲時間是最短的情況,那么更新leader線程
                        // 用Condition等待直到時間到點,被喚醒或者被中斷
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                // 重置leader線程以便進行下一次循環
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                // 隊列不為空發出signal很好理解,這里附帶了沒有leader線程
                // 的條件是因為leader線程存在時表示leader線程正在等待執行時間點的
                // 到來,如果此時發出signal會觸發awaitNanos提前返回
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

take方法的重點就是leader線程,因為存在延遲時間,即使拿到任務,線程還是需要等待的,leader線程就那個最先執行任務的線程。

因為線程拿到任務之后還是需要等待一段延遲執行的時間,所以對於超時等待的poll方法來說就有點意思了:

        public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture first = queue[0];
                    // 任務隊列為空的情況
                    if (first == null) {
                        // nanos小於等於0有兩種可能:
                        // 1. 參數值設定
                        // 2. 等待已經超時
                        if (nanos <= 0)
                            return null;
                        else
                            // 等待一段時間,返回剩余的等待時間
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        if (nanos <= 0)
                            return null;
                        // leader線程存在並且nanos大於delay的情況下,
                        // 依然等待nanos這么長時間,不用擔心會超過delay設定
                        // 的時間點,因為leader線程到時間之后會發出signal
                        // 喚醒線程,而那個時候顯然還沒有到delay設定的時間點
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                long timeLeft = available.awaitNanos(delay);
                                // 剩余的超時時間
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

通過分析以上代碼基本上已經理清楚了DelayedWorkQueue實現延遲執行的原理:

1. 按照執行延遲從短到長的順序把任務存儲到堆;

2. 通過leader線程讓拿到任務的線程等到規定的時間點再執行任務;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM