webmagic源碼學習(一)


  最近工作主要是一些爬蟲相關的東西,由於公司需要構建自己的爬蟲框架,在調研過程中參考了許多優秀的開源作品,包括webmagic,webcollector,Spiderman等,通過學習這些優秀的源碼獲益良多。

     webmagic是一個簡單靈活的爬蟲框架。基於WebMagic,你可以快速開發出一個高效、易維護的爬蟲。(官網地址:http://webmagic.io/)

     本篇是webmagic源碼閱讀第一篇,主要探討webmagic的核心機制,即一個BFS的爬蟲是如何構建出來的。

    webmagic分為以下四大組件,Downloader(頁面下載器),Scheduler(下載調度器),PageProcessor(頁面解析器),Pipeline(管道組件,通常做將抓取結果入庫寫文件等操作)

                          (圖片來自官網)

 以上四個組件由Spider組件組裝起來,爬取數據時協同工作。我們先研究webmagic的核心類Spider。

在Spider中的run()方法中可以清晰的看到典型的BFS代碼,通過一個循環不斷地從scheduler中的內存隊列中取一個抓取任務(Request)並進行相應處理(processRequest),如果抓取成功則回調監聽器中的onSuccess()方法,失敗則調用onError()方法,最后將已抓取頁面的數量自增。如果隊列中沒有任何抓取任務了,爬蟲會在這里停一會防止有新的任務

加入(waitNewURL()),當然,這里的暫停時間是由你自己決定的。

    最后,如果等待一段時間后隊列中仍沒有請求,退出循環,將爬蟲的狀態改為停止並釋放資源。

     /**
     * 爬蟲的核心方法,廣度優先遍歷
     */
    @Override
    public void run() {
//檢查爬蟲狀態:初始化,抓取中,停止 checkRunningStat();
//初始化爬蟲組件 initComponent(); logger.info("Spider " + getUUID() + " started!"); //注意,這里的stat狀態是一個CAS變量,保證了多線程訪問的安全性 //這里是一個BFS算法 while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { final Request request = scheduler.poll(this); if (request == null) { if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added //隊列為空時等待一會以防有新URL加入 waitNewUrl(); } else { threadPool.execute(new Runnable() { @Override public void run() { try { //處理遍歷到的request processRequest(request); //成功時回調我們注冊的所有SpiderListener中的onSuccess()方法 onSuccess(request); } catch (Exception e) { //失敗時回調我們注冊的所有SpiderListener中的onError()方法 onError(request); logger.error("process request " + request + " error", e); } finally { //抓取總數自增,這里同樣是一個CAS操作 pageCount.incrementAndGet(); signalNewUrl(); } } }); } } stat.set(STAT_STOPPED); // release some resources if (destroyWhenExit) { close(); } }

  需要注意的是,這里無論是爬蟲的狀態變量檢查還是最后的自增變量(pageCount)都是CAS操作,因為我們的大多數情況下都會為爬蟲開多個線程(當然,你要確保你的

爬蟲不會被網站封禁,而且最好也不要開過多線程,避免給對方服務器造成太大壓力)。

    這里的另一個核心方法是processRequest(見下圖),對於從scheduler中取到的每個抓取請求,都會做如下操作:

           1.頁面下載:首先使用Downloader進行網頁下載,獲取網頁對象Page,如果抓取內容為空,說明抓取出現錯誤,回調Listener中的onError方法並退出。

           2.頁面解析:接下來Spider會回調我們自己寫的pageProcessor中的process方法,由於每個網頁都有自己的特點,所以需要我們自己進行處理。

           3.新URL抽取:如果事先定義了爬蟲需要循環抓取(needCycleRetry)則從當前頁面中抽取新的鏈接並放入調度隊列中

           4.數據入庫/寫文件:Spider回調我們注冊的所有pipline,在pipline中我們通常會將結果諸如入庫,寫文件或簡單輸出到控制台(webmagic默認支持)。

    /**
     * 處理隊列中的某個請求
     * @param request
     */
    protected void processRequest(Request request) {
        Page page = downloader.download(request, this);
        if (page == null) {
            sleep(site.getSleepTime());
            onError(request);
            return;
        }
        // for cycle retry
        if (page.isNeedCycleRetry()) {
            extractAndAddRequests(page, true);
            sleep(site.getRetrySleepTime());
            return;
        }
        //注意,在這里回調了我們自己寫的process方法
        pageProcessor.process(page);
        //提取鏈接並放入調度隊列中
        extractAndAddRequests(page, spawnUrl);
        //順序調用我們注冊的pipline,在pipline通常將結果入庫,寫文件
        if (!page.getResultItems().isSkip()) {
            for (Pipeline pipeline : pipelines) {
                pipeline.process(page.getResultItems(), this);
            }
        }
        sleep(site.getSleepTime());
    }

        接下來,我們探討一下爬蟲的另一個核心組件Scheduler(任務調度器),以下代碼是webmagic中調度器的接口,我們可以看到,它僅僅需要支持兩個操作,插入待抓取

鏈接(push)和取鏈接(poll)

 1 public interface Scheduler {
 2 
 3     /**
 4      * add a url to fetch
 5      *
 6      * @param request request
 7      * @param task task
 8      */
 9     public void push(Request request, Task task);
10 
11     /**
12      * get an url to crawl
13      *
14      * @param task the task of spider
15      * @return the url to crawl
16      */
17     public Request poll(Task task);
18 
19 }

       下面的代碼是webmagic默認提供的任務調度器,由於內存中的任務需要進行性排重,我們可以看到webmagic默認使用了HashSet排重,有可能你會說使用單機內存進

行排重會OOM,事實上在webmagic-extension(webmagic的擴展包)里支持其他幾種排重方式,包括Redis排重,布隆過濾器排重(如果不了解的話可以維基一下)。當然,

如果使用布隆過濾器的話會有一定的誤差。

public abstract class DuplicateRemovedScheduler implements Scheduler {

    protected Logger logger = LoggerFactory.getLogger(getClass());
    //可以看到,webmagic默認使用HashSet進行鏈接去重
    private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover();

    public DuplicateRemover getDuplicateRemover() {
        return duplicatedRemover;
    }

    public DuplicateRemovedScheduler setDuplicateRemover(DuplicateRemover duplicatedRemover) {
        this.duplicatedRemover = duplicatedRemover;
        return this;
    }

    @Override
    public void push(Request request, Task task) {
        logger.trace("get a candidate url {}", request.getUrl());
        if (shouldReserved(request) || noNeedToRemoveDuplicate(request) || !duplicatedRemover.isDuplicate(request, task)) {
            logger.debug("push to queue {}", request.getUrl());
            pushWhenNoDuplicate(request, task);
        }
    }

    protected boolean shouldReserved(Request request) {
        return request.getExtra(Request.CYCLE_TRIED_TIMES) != null;
    }
    /**
     * 判斷是否需要去重,如果是一個POST請求則不進行去重
     */
    protected boolean noNeedToRemoveDuplicate(Request request) {
        return HttpConstant.Method.POST.equalsIgnoreCase(request.getMethod());
    }

    protected void pushWhenNoDuplicate(Request request, Task task) {

    }
}

  在上圖中,我們可以看到,在webmagic中默認不對POST請求進行排重(或許是POST參數的原因),在實際工作中,你也可以對這里進行修改,比如對POST請求的URL+Request Body做一個MD5操作,再將其放入隊列中,這樣會浪費一些計算時間,但可以對POST請求進行排重,也可以節省一些內存開銷。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM