Java中的DelayQueue位於java.util.concurrent包下,本質是由PriorityQueue和BlockingQueue實現的阻塞優先級隊列。
放入隊列的元素需要實現java.util.concurrent包的Delayed接口:
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
通過實現這個接口,來完成對隊列中元素,按照時間延遲先后排序的目的。
從隊列中取元素:
看DelayedQueue的take()方法:
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
可以看到,在這段代碼里,在第一個元素的延遲時間還沒到的情況下:
- 如果當前沒有其他線程等待,則阻塞當前線程直到延遲時間。
- 如果有其他線程在等待,則阻塞當前線程。
向隊列中放入元素:
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
在放入元素的時候,會喚醒等待中的讀線程。
如果我們不考慮分布式運行和任務持久化的話,Java中的DelayQueue是一個很理想的方案,精巧好用。但是如果我們需要分布式運行和任務持久化,就需要引入一些外部組件。