延時隊列:Java中的DelayQueue


Java中的DelayQueue位於java.util.concurrent包下,本質是由PriorityQueue和BlockingQueue實現的阻塞優先級隊列。

放入隊列的元素需要實現java.util.concurrent包的Delayed接口:

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

通過實現這個接口,來完成對隊列中元素,按照時間延遲先后排序的目的。

從隊列中取元素:

看DelayedQueue的take()方法:

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    } 

可以看到,在這段代碼里,在第一個元素的延遲時間還沒到的情況下:

  • 如果當前沒有其他線程等待,則阻塞當前線程直到延遲時間。
  • 如果有其他線程在等待,則阻塞當前線程。

向隊列中放入元素:

    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

 

在放入元素的時候,會喚醒等待中的讀線程。

 

如果我們不考慮分布式運行和任務持久化的話,Java中的DelayQueue是一個很理想的方案,精巧好用。但是如果我們需要分布式運行和任務持久化,就需要引入一些外部組件。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM