Java並發編程實踐 目錄
並發編程 04—— 閉鎖CountDownLatch 與 柵欄CyclicBarrier
並發編程 06—— CompletionService : Executor 和 BlockingQueue
並發編程 10—— 任務取消 之 關閉 ExecutorService
並發編程 12—— 任務取消與關閉 之 shutdownNow 的局限性
並發編程 13—— 線程池的使用 之 配置ThreadPoolExecutor 和 飽和策略
概述
第1部分 定義
“毒丸”是指一個放在隊列上的對象,其含義是:“當得到這個對象時,立即停止。”在FIFO 隊列中,“毒丸”對象將確保消費者在關閉之前首先完成隊列中的所有工作,在提交“毒丸”對象之前提交的所有工作都會被處理,而生產者在提交了“毒丸”對象后,將不會在提交任何工作。在下面的程序清單中給出了一個單生產者——單消費者的桌面搜索示例,使用了“毒丸”對象來關閉服務。
第2部分 實例
1 /** 2 * 7.17 通過“毒丸”對象來關閉服務 3 * @ClassName: IndexingService 4 * @author xingle 5 * @date 2014-11-12 下午1:58:06 6 */ 7 public class IndexingService { 8 private static final int CAPACITY = 1000; 9 private static final File POISON = new File(""); 10 private final IndexerThread consumer = new IndexerThread(); 11 private final CrawlerThread producer = new CrawlerThread(); 12 private final BlockingQueue<File> queue; 13 //private final FileFilter fileFilter; 14 private final File root; 15 16 public IndexingService(File root) { 17 this.root = root; 18 this.queue = new LinkedBlockingQueue<File>(CAPACITY); 19 20 } 21 22 private boolean alreadyIndexed(File f) { 23 return false; 24 } 25 26 //生產者 27 class CrawlerThread extends Thread { 28 public void run() { 29 try { 30 crawl(root); 31 } catch (InterruptedException e) { /* fall through */ 32 } finally { 33 while (true) { 34 try { 35 System.out.println("放入“毒丸”"); 36 queue.put(POISON); 37 break; 38 } catch (InterruptedException e1) { /* retry */ 39 } 40 } 41 } 42 } 43 44 private void crawl(File root) throws InterruptedException { 45 File[] entries = root.listFiles(); 46 if (entries != null) { 47 for (File entry : entries) { 48 if (entry.isDirectory()) 49 crawl(entry); 50 else if (!alreadyIndexed(entry)){ 51 System.out.println("放入生產者隊列文件:"+entry.getName()+" 來自線程:"+Thread.currentThread().getName()); 52 queue.put(entry); 53 } 54 } 55 } 56 } 57 } 58 59 //消費者 60 class IndexerThread extends Thread { 61 public void run() { 62 try { 63 while (true) { 64 File file = queue.take(); 65 if (file == POISON){ 66 System.out.println("遇到“毒丸”,終止"); 67 break; 68 } 69 70 else 71 indexFile(file); 72 } 73 } catch (InterruptedException consumed) { 74 } 75 } 76 77 public void indexFile(File file) { 78 System.out.println("消費者取出文件:"+file.getName()+" 來自線程:"+Thread.currentThread().getName()); 79 /* ... */ 80 }; 81 } 82 83 public void start() { 84 producer.start(); 85 consumer.start(); 86 } 87 88 public void stop() { 89 producer.interrupt(); 90 } 91 92 public void awaitTermination() throws InterruptedException { 93 consumer.join(); 94 } 95 96 }
測試代碼:
1 /** 2 * 7.17 測試主程序( 通過“毒丸”對象來關閉服務) 3 * @ClassName: IndexingService_Main 4 * @author xingle 5 * @date 2014-11-12 下午2:25:36 6 */ 7 public class IndexingService_Main { 8 public static void main(String[] args) { 9 File file = new File("D:\\test1/"); 10 IndexingService c = new IndexingService(file); 11 c.start(); 12 try { 13 TimeUnit.MICROSECONDS.sleep(100);// 停止XX時間,顯示出消費速度慢於生產速度 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 } 17 c.stop(); 18 } 19 }
單次執行結果:
只有在生產者和消費者的數量都已知的情況下,才可以使用”毒丸“對象。
在 IndexingService中采用的解決方案可以擴展到多個生產者:只需每個生產者都想隊列放入一個”毒丸“對象,並且消費者僅當在接收到N個生產者的”毒丸“對象時才停止。這種方法也可以擴展到多個消費者的情況,只需生產者將N個”毒丸“對象放入隊列。然而,當生產者和消費者的數量較大時,這種方法將變的難以使用。只有在無界隊列中,”毒丸“對象才能可靠地工作。