webmagic 增量爬取


 webmagic  是一個很好並且很簡單的爬蟲框架,其教程網址:http://my.oschina.net/flashsword/blog/180623

   webmagic參考了scrapy的模塊划分,分為Spider(整個爬蟲的調度框架)、Downloader(頁面下載)、PageProcessor(鏈接提取和頁面分析)、Scheduler(URL管理)、Pipeline(離線分析和持久化)幾部分。只不過scrapy通過middleware實現擴展,而webmagic則通過定義這幾個接口,並將其不同的實現注入主框架類Spider來實現擴展。

關於Scheduler(URL管理) 最基本的功能是實現對已經爬取的URL進行標示。

目前scheduler有三種實現方式:

  1)內存隊列

  2)文件隊列

  3)redis隊列

文件隊列保存URL,能實現中斷后,繼續爬取時,實現增量爬取。

  如果我只有一個主頁的URL,比如:http://www.cndzys.com/yundong/。如果直接引用webmagic的FileCacheQueueScheduler的話,你會發現第二次啟動的時候,什么也爬不到。可以說第二次啟動基本不爬取數據了。因為FileCacheQueueScheduler 把http://www.cndzys.com/yundong/ 記錄了,然后不再進行新的爬取。雖然是第二次增量爬取,但還是需要保留某些URL重新爬取,以保證爬取結果是我們想要的。我們可以重寫FileCacheQueueScheduler里的比較方法。

 

package com.fortunedr.crawler.expertadvice; import java.io.BufferedReader; import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.math.NumberUtils; import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Task; import us.codecraft.webmagic.scheduler.DuplicateRemovedScheduler; import us.codecraft.webmagic.scheduler.MonitorableScheduler; import us.codecraft.webmagic.scheduler.component.DuplicateRemover; /** * Store urls and cursor in files so that a Spider can resume the status when shutdown.<br> *增加去重的校驗,對需要重復爬取的網址進行正則過濾 * @author code4crafter@gmail.com <br> * @since 0.2.0 */
public class SpikeFileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable { private String filePath = System.getProperty("java.io.tmpdir"); private String fileUrlAllName = ".urls.txt"; private Task task; private String fileCursor = ".cursor.txt"; private PrintWriter fileUrlWriter; private PrintWriter fileCursorWriter; private AtomicInteger cursor = new AtomicInteger(); private AtomicBoolean inited = new AtomicBoolean(false); private BlockingQueue<Request> queue; private Set<String> urls; private ScheduledExecutorService flushThreadPool; private String regx; public SpikeFileCacheQueueScheduler(String filePath) { if (!filePath.endsWith("/") && !filePath.endsWith("\\")) { filePath += "/"; } this.filePath = filePath; initDuplicateRemover(); } private void flush() { fileUrlWriter.flush(); fileCursorWriter.flush(); } private void init(Task task) { this.task = task; File file = new File(filePath); if (!file.exists()) { file.mkdirs(); } readFile(); initWriter(); initFlushThread(); inited.set(true); logger.info("init cache scheduler success"); } private void initDuplicateRemover() { setDuplicateRemover( new DuplicateRemover() { @Override public boolean isDuplicate(Request request, Task task) { if (!inited.get()) { init(task); } boolean temp=false; String url=request.getUrl(); temp=!urls.add(url);//原來驗證URL是否存在 //正則匹配
                        if(url.matches(regx)){//二次校驗,如果符合我們需要重新爬取的,返回false。可以重新爬取
                            temp=false; } return temp; } @Override public void resetDuplicateCheck(Task task) { urls.clear(); } @Override public int getTotalRequestsCount(Task task) { return urls.size(); } }); } private void initFlushThread() { flushThreadPool = Executors.newScheduledThreadPool(1); flushThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { flush(); } }, 10, 10, TimeUnit.SECONDS); } private void initWriter() { try { fileUrlWriter = new PrintWriter(new FileWriter(getFileName(fileUrlAllName), true)); fileCursorWriter = new PrintWriter(new FileWriter(getFileName(fileCursor), false)); } catch (IOException e) { throw new RuntimeException("init cache scheduler error", e); } } private void readFile() { try { queue = new LinkedBlockingQueue<Request>(); urls = new LinkedHashSet<String>(); readCursorFile(); readUrlFile(); // initDuplicateRemover();
        } catch (FileNotFoundException e) { //init
            logger.info("init cache file " + getFileName(fileUrlAllName)); } catch (IOException e) { logger.error("init file error", e); } } private void readUrlFile() throws IOException { String line; BufferedReader fileUrlReader = null; try { fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName))); int lineReaded = 0; while ((line = fileUrlReader.readLine()) != null) { urls.add(line.trim()); lineReaded++; if (lineReaded > cursor.get()) { queue.add(new Request(line)); } } } finally { if (fileUrlReader != null) { IOUtils.closeQuietly(fileUrlReader); } } } private void readCursorFile() throws IOException { BufferedReader fileCursorReader = null; try { fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor))); String line; //read the last number
            while ((line = fileCursorReader.readLine()) != null) { cursor = new AtomicInteger(NumberUtils.toInt(line)); } } finally { if (fileCursorReader != null) { IOUtils.closeQuietly(fileCursorReader); } } } public void close() throws IOException { flushThreadPool.shutdown(); fileUrlWriter.close(); fileCursorWriter.close(); } private String getFileName(String filename) { return filePath + task.getUUID() + filename; } @Override protected void pushWhenNoDuplicate(Request request, Task task) { queue.add(request); fileUrlWriter.println(request.getUrl()); } @Override public synchronized Request poll(Task task) { if (!inited.get()) { init(task); } fileCursorWriter.println(cursor.incrementAndGet()); return queue.poll(); } @Override public int getLeftRequestsCount(Task task) { return queue.size(); } @Override public int getTotalRequestsCount(Task task) { return getDuplicateRemover().getTotalRequestsCount(task); } public String getRegx() { return regx; } /** * 設置保留需要重復爬取url的正則表達式 * @param regx */
    public void setRegx(String regx) { this.regx = regx; } }
View Code

那么在爬蟲時就引用自己特定的FileCacheQueueScheduler就可以

spider.addRequest(requests); SpikeFileCacheQueueScheduler file=new SpikeFileCacheQueueScheduler(filePath); file.setRegx(regx);//http://www.cndzys.com/yundong/(index)?[0-9]*(.html)?
        spider.setScheduler(file );

  這樣就實現了增量爬取。

優化的想法:一般某個網站的內容列表都是首頁是最新內容。上面的方式是可以實現增量爬取,但是還是需要爬取很多“無用的”列表頁面。

能不能實現,當爬取到上次"最新"URL之后就不再爬取。就是不用爬取其他多余的leib

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM