webmagic 是一個很好並且很簡單的爬蟲框架,其教程網址:http://my.oschina.net/flashsword/blog/180623
webmagic參考了scrapy的模塊划分,分為Spider(整個爬蟲的調度框架)、Downloader(頁面下載)、PageProcessor(鏈接提取和頁面分析)、Scheduler(URL管理)、Pipeline(離線分析和持久化)幾部分。只不過scrapy通過middleware實現擴展,而webmagic則通過定義這幾個接口,並將其不同的實現注入主框架類Spider來實現擴展。
關於Scheduler(URL管理) 最基本的功能是實現對已經爬取的URL進行標示。
目前scheduler有三種實現方式:
1)內存隊列
2)文件隊列
3)redis隊列
文件隊列保存URL,能實現中斷后,繼續爬取時,實現增量爬取。
如果我只有一個主頁的URL,比如:http://www.cndzys.com/yundong/。如果直接引用webmagic的FileCacheQueueScheduler的話,你會發現第二次啟動的時候,什么也爬不到。可以說第二次啟動基本不爬取數據了。因為FileCacheQueueScheduler 把http://www.cndzys.com/yundong/ 記錄了,然后不再進行新的爬取。雖然是第二次增量爬取,但還是需要保留某些URL重新爬取,以保證爬取結果是我們想要的。我們可以重寫FileCacheQueueScheduler里的比較方法。
1 package com.fortunedr.crawler.expertadvice; 2 3 4 import java.io.BufferedReader; 5 import java.io.Closeable; 6 import java.io.File; 7 import java.io.FileNotFoundException; 8 import java.io.FileReader; 9 import java.io.FileWriter; 10 import java.io.IOException; 11 import java.io.PrintWriter; 12 import java.util.LinkedHashSet; 13 import java.util.Set; 14 import java.util.concurrent.BlockingQueue; 15 import java.util.concurrent.Executors; 16 import java.util.concurrent.LinkedBlockingQueue; 17 import java.util.concurrent.ScheduledExecutorService; 18 import java.util.concurrent.TimeUnit; 19 import java.util.concurrent.atomic.AtomicBoolean; 20 import java.util.concurrent.atomic.AtomicInteger; 21 22 import org.apache.commons.io.IOUtils; 23 import org.apache.commons.lang3.math.NumberUtils; 24 25 import us.codecraft.webmagic.Request; 26 import us.codecraft.webmagic.Task; 27 import us.codecraft.webmagic.scheduler.DuplicateRemovedScheduler; 28 import us.codecraft.webmagic.scheduler.MonitorableScheduler; 29 import us.codecraft.webmagic.scheduler.component.DuplicateRemover; 30 31 32 /** 33 * Store urls and cursor in files so that a Spider can resume the status when shutdown.<br> 34 *增加去重的校驗,對需要重復爬取的網址進行正則過濾 35 * @author code4crafter@gmail.com <br> 36 * @since 0.2.0 37 */ 38 public class SpikeFileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable { 39 40 private String filePath = System.getProperty("java.io.tmpdir"); 41 42 private String fileUrlAllName = ".urls.txt"; 43 44 private Task task; 45 46 private String fileCursor = ".cursor.txt"; 47 48 private PrintWriter fileUrlWriter; 49 50 private PrintWriter fileCursorWriter; 51 52 private AtomicInteger cursor = new AtomicInteger(); 53 54 private AtomicBoolean inited = new AtomicBoolean(false); 55 56 private BlockingQueue<Request> queue; 57 58 private Set<String> urls; 59 60 private ScheduledExecutorService flushThreadPool; 61 62 private String regx; 63 64 65 public SpikeFileCacheQueueScheduler(String filePath) { 66 if (!filePath.endsWith("/") && !filePath.endsWith("\\")) { 67 filePath += "/"; 68 } 69 this.filePath = filePath; 70 initDuplicateRemover(); 71 } 72 73 private void flush() { 74 fileUrlWriter.flush(); 75 fileCursorWriter.flush(); 76 } 77 78 private void init(Task task) { 79 this.task = task; 80 File file = new File(filePath); 81 if (!file.exists()) { 82 file.mkdirs(); 83 } 84 readFile(); 85 initWriter(); 86 initFlushThread(); 87 inited.set(true); 88 logger.info("init cache scheduler success"); 89 } 90 91 private void initDuplicateRemover() { 92 setDuplicateRemover( 93 new DuplicateRemover() { 94 @Override 95 public boolean isDuplicate(Request request, Task task) { 96 if (!inited.get()) { 97 init(task); 98 } 99 boolean temp=false; 100 String url=request.getUrl(); 101 temp=!urls.add(url);//原來驗證URL是否存在 102 //正則匹配 103 if(url.matches(regx)){//二次校驗,如果符合我們需要重新爬取的,返回false。可以重新爬取 104 temp=false; 105 } 106 return temp; 107 } 108 109 @Override 110 public void resetDuplicateCheck(Task task) { 111 urls.clear(); 112 } 113 114 @Override 115 public int getTotalRequestsCount(Task task) { 116 return urls.size(); 117 } 118 }); 119 } 120 121 private void initFlushThread() { 122 flushThreadPool = Executors.newScheduledThreadPool(1); 123 flushThreadPool.scheduleAtFixedRate(new Runnable() { 124 @Override 125 public void run() { 126 flush(); 127 } 128 }, 10, 10, TimeUnit.SECONDS); 129 } 130 131 private void initWriter() { 132 try { 133 fileUrlWriter = new PrintWriter(new FileWriter(getFileName(fileUrlAllName), true)); 134 fileCursorWriter = new PrintWriter(new FileWriter(getFileName(fileCursor), false)); 135 } catch (IOException e) { 136 throw new RuntimeException("init cache scheduler error", e); 137 } 138 } 139 140 private void readFile() { 141 try { 142 queue = new LinkedBlockingQueue<Request>(); 143 urls = new LinkedHashSet<String>(); 144 readCursorFile(); 145 readUrlFile(); 146 // initDuplicateRemover(); 147 } catch (FileNotFoundException e) { 148 //init 149 logger.info("init cache file " + getFileName(fileUrlAllName)); 150 } catch (IOException e) { 151 logger.error("init file error", e); 152 } 153 } 154 155 private void readUrlFile() throws IOException { 156 String line; 157 BufferedReader fileUrlReader = null; 158 try { 159 fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName))); 160 int lineReaded = 0; 161 while ((line = fileUrlReader.readLine()) != null) { 162 urls.add(line.trim()); 163 lineReaded++; 164 if (lineReaded > cursor.get()) { 165 queue.add(new Request(line)); 166 } 167 } 168 } finally { 169 if (fileUrlReader != null) { 170 IOUtils.closeQuietly(fileUrlReader); 171 } 172 } 173 } 174 175 private void readCursorFile() throws IOException { 176 BufferedReader fileCursorReader = null; 177 try { 178 fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor))); 179 String line; 180 //read the last number 181 while ((line = fileCursorReader.readLine()) != null) { 182 cursor = new AtomicInteger(NumberUtils.toInt(line)); 183 } 184 } finally { 185 if (fileCursorReader != null) { 186 IOUtils.closeQuietly(fileCursorReader); 187 } 188 } 189 } 190 191 public void close() throws IOException { 192 flushThreadPool.shutdown(); 193 fileUrlWriter.close(); 194 fileCursorWriter.close(); 195 } 196 197 private String getFileName(String filename) { 198 return filePath + task.getUUID() + filename; 199 } 200 201 @Override 202 protected void pushWhenNoDuplicate(Request request, Task task) { 203 queue.add(request); 204 fileUrlWriter.println(request.getUrl()); 205 } 206 207 @Override 208 public synchronized Request poll(Task task) { 209 if (!inited.get()) { 210 init(task); 211 } 212 fileCursorWriter.println(cursor.incrementAndGet()); 213 return queue.poll(); 214 } 215 216 @Override 217 public int getLeftRequestsCount(Task task) { 218 return queue.size(); 219 } 220 221 @Override 222 public int getTotalRequestsCount(Task task) { 223 return getDuplicateRemover().getTotalRequestsCount(task); 224 } 225 226 public String getRegx() { 227 return regx; 228 } 229 /** 230 * 設置保留需要重復爬取url的正則表達式 231 * @param regx 232 */ 233 public void setRegx(String regx) { 234 this.regx = regx; 235 } 236 237 238 }
那么在爬蟲時就引用自己特定的FileCacheQueueScheduler就可以
1 spider.addRequest(requests); 2 SpikeFileCacheQueueScheduler file=new SpikeFileCacheQueueScheduler(filePath); 3 file.setRegx(regx);//http://www.cndzys.com/yundong/(index)?[0-9]*(.html)? 4 spider.setScheduler(file );
這樣就實現了增量爬取。
優化的想法:一般某個網站的內容列表都是首頁是最新內容。上面的方式是可以實現增量爬取,但是還是需要爬取很多“無用的”列表頁面。
能不能實現,當爬取到上次"最新"URL之后就不再爬取。就是不用爬取其他多余的leib