Java並發編程實踐 目錄
並發編程 04—— 閉鎖CountDownLatch 與 柵欄CyclicBarrier
並發編程 06—— CompletionService : Executor 和 BlockingQueue
並發編程 10—— 任務取消 之 關閉 ExecutorService
並發編程 12—— 任務取消與關閉 之 shutdownNow 的局限性
並發編程 13—— 線程池的使用 之 配置ThreadPoolExecutor 和 飽和策略
第1 部分 問題引入
當通過 shutdownNow 來強行關閉 ExecutorService 時,它會嘗試取消正在執行的任務,並返回所有已提交但尚未開始的任務,從而將這些任務寫入日志或者保存起來以便之后進行處理。
然而,我們無法通過常規方法來找出哪些任務已經開始但尚未結束。這意味着這我們無法在關閉過程中知道正在執行的任務的狀態,除非任務本身會執行某種檢查。要知道哪些任務還沒有完成,你不僅需要知道哪些任務還沒有開始,而且還需知道當 Executor 關閉時哪些任務正在執行。
第2 部分 實例
在下面程序 TrackingExecutor 中給出了如何在關閉過程中判斷正在執行的任務。通過封裝 ExecutorService 並使得execute 記錄哪些任務是在關閉后取消的,TrackingExecutor 可以找出哪些任務已經開始但還沒有正常完成。在 Executor 結束后,getCancelledTasks 返回被取消的任務清單。
1 /** 2 * 7.21 在 ExecutorService 中跟蹤在關閉之后取消的任務 3 * @ClassName: TrackingExecutor 4 * @author xingle 5 * @date 2014-11-12 下午8:39:33 6 */ 7 public class TrackingExecutor extends AbstractExecutorService{ 8 private final ExecutorService exec; 9 private final Set<Runnable> tasksCancelledAtShutdown = Collections 10 .synchronizedSet(new HashSet<Runnable>()); 11 12 public TrackingExecutor(ExecutorService exec){ 13 this.exec = exec; 14 } 15 16 public List<Runnable> getCancelledTasks(){ 17 if(!exec.isTerminated()) 18 throw new IllegalStateException(); 19 return new ArrayList<Runnable>(tasksCancelledAtShutdown); 20 } 21 22 /** 23 * 24 * @Description: TODO 25 * @param command 26 * @author xingle 27 * @data 2014-11-13 上午9:06:56 28 */ 29 @Override 30 public void execute(final Runnable runnable) { 31 exec.execute(new Runnable() { 32 33 @Override 34 public void run() { 35 try{ 36 runnable.run(); 37 }finally{ 38 if(isShutdown() && Thread.currentThread().isInterrupted()) 39 tasksCancelledAtShutdown.add(runnable); 40 } 41 } 42 }); 43 } 44 45 /** 46 * 下面將ExecutorService 的其他方法委托給 exec 47 */ 48 49 /** 50 * 51 * @Description: TODO 52 * @author xingle 53 * @data 2014-11-13 上午9:06:56 54 */ 55 @Override 56 public void shutdown() { 57 exec.shutdown(); 58 } 59 60 /** 61 * 62 * @Description: TODO 63 * @return 64 * @author xingle 65 * @data 2014-11-13 上午9:06:56 66 */ 67 @Override 68 public List<Runnable> shutdownNow() { 69 return exec.shutdownNow(); 70 } 71 72 /** 73 * 74 * @Description: TODO 75 * @return 76 * @author xingle 77 * @data 2014-11-13 上午9:06:56 78 */ 79 @Override 80 public boolean isShutdown() { 81 return exec.isShutdown(); 82 } 83 84 /** 85 * 86 * @Description: TODO 87 * @return 88 * @author xingle 89 * @data 2014-11-13 上午9:06:56 90 */ 91 @Override 92 public boolean isTerminated() { 93 return exec.isTerminated(); 94 } 95 96 /** 97 * 98 * @Description: TODO 99 * @param timeout 100 * @param unit 101 * @return 102 * @throws InterruptedException 103 * @author xingle 104 * @data 2014-11-13 上午9:06:56 105 */ 106 @Override 107 public boolean awaitTermination(long timeout, TimeUnit unit) 108 throws InterruptedException { 109 return exec.awaitTermination(timeout, unit); 110 } 111 112 113 }
在程序 WebCrawler 中給出了 TrackingExecutor 的用法。網頁爬蟲程序的工作通常是無窮盡的,因此當爬蟲程序必須關閉時,我們通常希望保持它的狀態,以便稍后重啟動。CrawlTask 提供了一個 getPage 方法,該方法能找出正在處理的頁面。當爬蟲程序關閉時,無論是還沒有開始的任務,還是那些被取消的任務,都將記錄他們的URL,因此當爬蟲程序程序啟動時,就可以將這些URL 的頁面抓取任務加入到任務隊列中。
1 /** 2 * 7.22 使用TrackingExecutorService 來保存未完成的任務以備后續執行 3 * @ClassName: WebCrawler 4 * TODO 5 * @author xingle 6 * @date 2014-11-13 上午9:17:54 7 */ 8 public abstract class WebCrawler { 9 private volatile TrackingExecutor exec; 10 @GuardedBy("this") 11 public final Set<URL> urlsToCrawl = new HashSet<URL>(); 12 13 private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>(); 14 private static final long TIMEOUT = 500; 15 private static final TimeUnit UNIT = TimeUnit.MICROSECONDS; 16 17 public WebCrawler(URL startUrl){ 18 urlsToCrawl.add(startUrl); 19 } 20 21 public synchronized void start(){ 22 exec = new TrackingExecutor(Executors.newCachedThreadPool()); 23 for (URL url: urlsToCrawl) 24 submitCrawlTask(url); 25 urlsToCrawl.clear(); 26 } 27 28 /** 29 * 提交爬蟲任務 30 * @param url 31 * @author xingle 32 * @data 2014-11-13 上午9:46:01 33 */ 34 private void submitCrawlTask(URL url) { 35 exec.execute(new CrawlTask(url)); 36 } 37 38 protected abstract List<URL> processPage(URL url); 39 40 /** 41 * 保存未完成的 42 * @param urlsToCrawl 43 * @author xingle 44 * @data 2014-11-13 上午10:10:07 45 */ 46 private void saveUncrawled(List<Runnable> uncrawled) { 47 for (Runnable task:uncrawled){ 48 URL url = ((CrawlTask)task).getPage(); 49 System.out.println("保存未完成的URL:"+url); 50 urlsToCrawl.add(url); 51 } 52 53 } 54 55 //爬蟲任務 56 private class CrawlTask implements Runnable{ 57 private final URL url; 58 59 CrawlTask(URL url){ 60 this.url = url; 61 } 62 63 private int count = 1; 64 65 boolean alreadyCrawled() { 66 return seen.putIfAbsent(url, true) != null; 67 } 68 69 void markUncrawled() { 70 seen.remove(url); 71 System.out.printf("marking %s uncrawled%n", url); 72 } 73 74 @Override 75 public void run() { 76 for (URL link :processPage(url)){ 77 if(Thread.currentThread().isInterrupted()) 78 return; 79 System.out.println("提交的爬蟲url:"+link); 80 submitCrawlTask(link); 81 } 82 } 83 84 public URL getPage(){ 85 return url; 86 } 87 } 88 89 public synchronized void stop() throws InterruptedException{ 90 try { 91 saveUncrawled(exec.shutdownNow()); 92 if (exec.awaitTermination(100, UNIT)){ 93 saveUncrawled(exec.getCancelledTasks()); 94 } 95 96 } finally { 97 exec = null; 98 } 99 } 100 }
測試程序:
1 public class WebCrawler_Main { 2 3 public static void main(String[] args) throws MalformedURLException{ 4 WebCrawler webc = new WebCrawler(new URL("http://site.baidu.com/")) { 5 6 @Override 7 protected List<URL> processPage(URL url) { 8 //獲取該url下所有的鏈接 9 //這里省略了該功能 10 List<URL> url2 = new ArrayList<URL>(); 11 try { 12 url2.add(new URL("http://www.cnblogs.com/xingele0917/")); 13 //url2.add(new URL("http://www.zhihu.com/")); 14 } catch (MalformedURLException e) { 15 e.printStackTrace(); 16 } 17 return url2; 18 19 } 20 21 }; 22 23 webc.start(); 24 try { 25 Thread.sleep(10); 26 webc.stop(); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 } 31 32 }
執行結果: