摘自《Java編程思想》
package com.test.concurrency; import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> { private Random rand = new Random(47); private static int counter = 0; private final int id = counter++; private final int priority;// 用於表示當前任務的執行優先級 protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>(); public PrioritizedTask(int priority) { this.priority = priority; sequence.add(this); } /** * 用於比較任務的優先級 */ @Override public int compareTo(PrioritizedTask o) { return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0); } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(rand.nextInt(250)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this); } @Override public String toString() { return String.format("[%1$-3d]", priority) + " Task " + id; } public String summary() { return "(" + id + ":" + priority + ")"; } } /** * 該任務負責展示當前sequence中的任務與正常執行的任務進行對比以及停止線程 */ class EndSentinel extends PrioritizedTask { private ExecutorService exec; public EndSentinel(ExecutorService e) { super(-1); exec = e; } public void run() { int count = 0; for (PrioritizedTask pt : sequence) { System.out.println(pt.summary()); if (++count % 5 == 0) { System.out.println(); } } System.out.println(); System.out.println(this + " Calling shutdownNow"); exec.shutdownNow(); } } /** *負責生成任務,分別造不同級別的任務添加到執行隊列中 * */ class PrioritizedTaskProduncer implements Runnable { private Random rand = new Random(47); private Queue<Runnable> queue; private ExecutorService exec; public PrioritizedTaskProduncer(Queue<Runnable> q, ExecutorService e) { queue = q; exec = e; } @Override public void run() { //20個不同優先級的任務,通過隨機數來產生 for (int i = 0; i < 20; i++) { queue.add(new PrioritizedTask(rand.nextInt(10))); Thread.yield(); } try { //20個優先級高的任務 for (int i = 0; i < 10; i++) { TimeUnit.MILLISECONDS.sleep(250); queue.add(new PrioritizedTask(10)); } //優先級0-9的執行任務 for (int i = 0; i < 10; i++) { queue.add(new PrioritizedTask(i)); } queue.add(new EndSentinel(exec)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Finished PrioritizedTaskProducer"); } } /** *負責消費任務 */ class PrioritizedTaskConsumer implements Runnable { private PriorityBlockingQueue<Runnable> q; public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) { this.q = q; } @Override public void run() { try { while (!Thread.interrupted()) { q.take().run(); } } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Finished PrioritizedTaskConsumer"); } } public class PriorityBlockingQueueDemo { public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(); exec.execute(new PrioritizedTaskProduncer(queue, exec)); //此處需特別注意,需要等待所有的任務已經生成完后,再執行消費,否則,可能會看到消費並非按照優先級執行的 Thread.sleep(5000); exec.execute(new PrioritizedTaskConsumer(queue)); } }