並發編程 12—— 任務取消與關閉 之 shutdownNow 的局限性


 
 
概述
 

第1 部分 問題引入

  當通過 shutdownNow  來強行關閉 ExecutorService 時,它會嘗試取消正在執行的任務,並返回所有已提交但尚未開始的任務,從而將這些任務寫入日志或者保存起來以便之后進行處理。

  然而,我們無法通過常規方法來找出哪些任務已經開始但尚未結束。這意味着這我們無法在關閉過程中知道正在執行的任務的狀態,除非任務本身會執行某種檢查。要知道哪些任務還沒有完成,你不僅需要知道哪些任務還沒有開始,而且還需知道當 Executor 關閉時哪些任務正在執行。

第2 部分 實例

  在下面程序 TrackingExecutor 中給出了如何在關閉過程中判斷正在執行的任務。通過封裝 ExecutorService 並使得execute 記錄哪些任務是在關閉后取消的,TrackingExecutor 可以找出哪些任務已經開始但還沒有正常完成。在 Executor 結束后,getCancelledTasks 返回被取消的任務清單。

  1 /**
  2  * 7.21 在 ExecutorService 中跟蹤在關閉之后取消的任務
  3  * @ClassName: TrackingExecutor
  4  * @author xingle
  5  * @date 2014-11-12 下午8:39:33
  6  */
  7 public class TrackingExecutor extends AbstractExecutorService{
  8     private final ExecutorService exec;
  9     private final Set<Runnable> tasksCancelledAtShutdown = Collections
 10             .synchronizedSet(new HashSet<Runnable>());
 11     
 12     public TrackingExecutor(ExecutorService exec){
 13         this.exec = exec;
 14     }
 15     
 16     public List<Runnable> getCancelledTasks(){
 17         if(!exec.isTerminated())
 18             throw new IllegalStateException();
 19         return new ArrayList<Runnable>(tasksCancelledAtShutdown);
 20     }
 21     
 22     /**
 23      * 
 24      * @Description: TODO
 25      * @param command
 26      * @author xingle
 27      * @data 2014-11-13 上午9:06:56
 28      */
 29     @Override
 30     public void execute(final Runnable runnable) {
 31         exec.execute(new Runnable() {
 32             
 33             @Override
 34             public void run() {
 35                 try{
 36                     runnable.run();
 37                 }finally{
 38                     if(isShutdown() && Thread.currentThread().isInterrupted())
 39                         tasksCancelledAtShutdown.add(runnable);
 40                 }
 41             }
 42         });
 43     }
 44 
 45     /**
 46      * 下面將ExecutorService 的其他方法委托給 exec
 47      */
 48         
 49     /**
 50      * 
 51      * @Description: TODO
 52      * @author xingle
 53      * @data 2014-11-13 上午9:06:56
 54      */
 55     @Override
 56     public void shutdown() {
 57         exec.shutdown();
 58     }
 59 
 60     /**
 61      * 
 62      * @Description: TODO
 63      * @return
 64      * @author xingle
 65      * @data 2014-11-13 上午9:06:56
 66      */
 67     @Override
 68     public List<Runnable> shutdownNow() {
 69         return exec.shutdownNow();
 70     }
 71 
 72     /**
 73      * 
 74      * @Description: TODO
 75      * @return
 76      * @author xingle
 77      * @data 2014-11-13 上午9:06:56
 78      */
 79     @Override
 80     public boolean isShutdown() {
 81         return exec.isShutdown();
 82     }
 83 
 84     /**
 85      * 
 86      * @Description: TODO
 87      * @return
 88      * @author xingle
 89      * @data 2014-11-13 上午9:06:56
 90      */
 91     @Override
 92     public boolean isTerminated() {
 93         return exec.isTerminated();
 94     }
 95 
 96     /**
 97      * 
 98      * @Description: TODO
 99      * @param timeout
100      * @param unit
101      * @return
102      * @throws InterruptedException
103      * @author xingle
104      * @data 2014-11-13 上午9:06:56
105      */
106     @Override
107     public boolean awaitTermination(long timeout, TimeUnit unit)
108             throws InterruptedException {
109         return exec.awaitTermination(timeout, unit);
110     }
111 
112 
113 }

 

  在程序 WebCrawler 中給出了 TrackingExecutor 的用法。網頁爬蟲程序的工作通常是無窮盡的,因此當爬蟲程序必須關閉時,我們通常希望保持它的狀態,以便稍后重啟動。CrawlTask 提供了一個 getPage 方法,該方法能找出正在處理的頁面。當爬蟲程序關閉時,無論是還沒有開始的任務,還是那些被取消的任務,都將記錄他們的URL,因此當爬蟲程序程序啟動時,就可以將這些URL 的頁面抓取任務加入到任務隊列中。

  1 /**
  2  * 7.22 使用TrackingExecutorService 來保存未完成的任務以備后續執行
  3  * @ClassName: WebCrawler
  4  * TODO
  5  * @author xingle
  6  * @date 2014-11-13 上午9:17:54
  7  */
  8 public abstract class WebCrawler {
  9     private volatile TrackingExecutor exec;
 10     @GuardedBy("this")
 11     public final Set<URL> urlsToCrawl = new HashSet<URL>();
 12 
 13     private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
 14     private static final long TIMEOUT = 500;
 15     private static final TimeUnit UNIT = TimeUnit.MICROSECONDS;
 16     
 17     public WebCrawler(URL startUrl){
 18         urlsToCrawl.add(startUrl);
 19     }
 20     
 21     public synchronized void start(){
 22         exec = new TrackingExecutor(Executors.newCachedThreadPool());
 23         for (URL url: urlsToCrawl)
 24             submitCrawlTask(url);
 25         urlsToCrawl.clear();
 26     }
 27     
 28     /**
 29      * 提交爬蟲任務
 30      * @param url
 31      * @author xingle
 32      * @data 2014-11-13 上午9:46:01
 33      */
 34     private void submitCrawlTask(URL url) {
 35         exec.execute(new CrawlTask(url));
 36     }
 37     
 38     protected abstract List<URL> processPage(URL url);
 39     
 40     /**
 41      * 保存未完成的
 42      * @param urlsToCrawl
 43      * @author xingle
 44      * @data 2014-11-13 上午10:10:07
 45      */
 46     private void saveUncrawled(List<Runnable> uncrawled) {
 47         for (Runnable task:uncrawled){
 48             URL url = ((CrawlTask)task).getPage();
 49             System.out.println("保存未完成的URL:"+url);
 50             urlsToCrawl.add(url);    
 51         }
 52             
 53     }
 54     
 55     //爬蟲任務
 56     private class CrawlTask implements Runnable{
 57         private final URL url;
 58         
 59         CrawlTask(URL url){
 60             this.url = url;
 61         }
 62         
 63         private int count = 1;
 64 
 65         boolean alreadyCrawled() {
 66             return seen.putIfAbsent(url, true) != null;
 67         }
 68 
 69         void markUncrawled() {
 70             seen.remove(url);
 71             System.out.printf("marking %s uncrawled%n", url);
 72         }
 73                 
 74         @Override
 75         public void run() {
 76             for (URL link :processPage(url)){
 77                 if(Thread.currentThread().isInterrupted())
 78                     return;
 79                 System.out.println("提交的爬蟲url:"+link);
 80                 submitCrawlTask(link);
 81             }            
 82         }
 83         
 84         public URL getPage(){
 85             return url;
 86         }        
 87     }
 88     
 89     public synchronized void stop() throws InterruptedException{
 90         try {
 91             saveUncrawled(exec.shutdownNow());
 92             if (exec.awaitTermination(100, UNIT)){
 93                 saveUncrawled(exec.getCancelledTasks());
 94             }
 95                 
 96         } finally {
 97             exec = null;
 98         }
 99     }    
100 }

 

測試程序:

 1 public class WebCrawler_Main {
 2     
 3     public static void main(String[] args) throws MalformedURLException{
 4         WebCrawler webc = new WebCrawler(new URL("http://site.baidu.com/")) {
 5             
 6             @Override
 7             protected List<URL> processPage(URL url) {
 8                 //獲取該url下所有的鏈接
 9                 //這里省略了該功能
10                 List<URL> url2 = new ArrayList<URL>();
11                 try {
12                     url2.add(new URL("http://www.cnblogs.com/xingele0917/"));
13                     //url2.add(new URL("http://www.zhihu.com/"));
14                 } catch (MalformedURLException e) {
15                     e.printStackTrace();
16                 }
17                 return url2;
18                 
19             }
20             
21         };
22 
23         webc.start();
24         try {
25             Thread.sleep(10);
26             webc.stop();
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         }
30     }
31 
32 }

執行結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM