Duplicate Elimination in Scrapy(轉)


之前介紹 Scrapy 的時候提過 Spider Trap ,實際上,就算是正常的網絡拓撲,也是很復雜的相互鏈接,雖然我當時給的那個例子對於我感興趣的內容是可以有一個線性順序依次爬下來的,但是這樣的情況在真正的網絡結構中通常是少之又少,一但鏈接網絡出現環路,就無法進行拓撲排序而得出一個依次遍歷的順序了,所以 duplicate elimination 可以說是每一個 non-trivial 的必備組件之一,這樣就算在遍歷的過程中遇到環路也不用怕,排重組件會檢測到已經訪問過的地址,從而避免在環路上無限地循環下去。最簡單的辦法也就是每次抓取頁面的時候記錄下 URL ,然后每次抓取新的 URL 之前先檢測一下是否已經有記錄了。不過,通常我們並不直接按字符比較 URL ,因為那樣通常會漏掉許多本來確實是重復的 URL ,特別是現在動態頁面盛行的情況,例如在 cc98 (ZJU 的一個校內論壇)上下面幾個 URL 路徑是等價的:

 

  1. /dispbbs.asp?boardID=60&ID=541996&star=5704
  2. /dispbbs.asp?ID=541996&boardID=60&star=5704
  3. /dispbbs.asp?ID=541996&boardID=60&star=5704&page=1

1 和 2 是參數位置交換,這個問題幾乎存在於所有動態頁面上,因為通常的 CGI (姑且統稱為 CGI 吧)並不在意參數出現的順序,而 3 則是 cc98 自己的問題,實際上 page 這個參數對於現實一個帖子沒有什么用處,寫成多少都無所謂,它是帖子標題列表那個頁面的頁數,但是 cc98 有時確實會在現實帖子的時候把那個參數也附上。所以,判重組件要做到火眼金睛還是相當困難的,事實上,Internet 上的 URL 和它對應的內容是多對多的關系,即使同一個 URL 在不同時間訪問也有可能得到不同的結果(例如一個 Google 的搜索結果頁面),所以,判重組件錯判和漏判都是有可能的,雖然如此,我們可以利用一些經驗知識來做到盡量完善,另外,和上次說的一樣,如果問題被限制在一個已知的領域(比如,某個特定的網站而不是混亂的 Internet ),問題又會變得簡單許多了。

扯了半天,再回到 Scrapy 。因為自己之前做的一些小實驗發現如果給他重復的 URL 的話,它是會義無反顧的地再抓一遍的,而在它的 Tutorial 里也只字未提相關的東西,所以我一直以為它沒有提供現成的東西,雖然一個號稱已經在實際中使用了的爬蟲框架沒有判重組件多少是一件有點讓人難以置信的事。不過事實證明它其實是有判重組件的,從它的結構圖(見上一篇介紹 Scrapy 的 blog )中可以看到,判重組件如果要自己寫的話,應該是一個 Scheduler Middleware ,本來想看一下 Scheduler Middleware 的接口是怎樣的,打開文檔一看,才發現已經有了一個現成的 DuplicatesFilterMiddleware 了。

如果要添加自己的 Scheduler Middleware ,應該在 settings.py 里定義 SCHEDULER_MIDDLEWARES 變量,這是一個 dict 對象,key 是中間件的完整類名,value 則是 priority 。不過在系統級別的 SCHEDULER_MIDDLEWARES_BASE 里已經有了這個中間件了:

SCHEDULER_MIDDLEWARES_BASE = {
    'scrapy.contrib.schedulermiddleware.duplicatesfilter.DuplicatesFilterMiddleware': 500,
}

再經過各種跟蹤(之間還不會用 Python 調試器,都是直接打開庫的源代碼插入 print 語句 -,-bb),發現中間件確實被啟動起來了,而且判重的方法也被調用了,並且也檢測到了重復,不過,問題出在這里:

def enqueue_request(self, domain, request):
    seen = self.dupefilter.request_seen(domain, request)
    if seen and not request.dont_filter:
        raise IgnoreRequest('Skipped (request already seen)')

那個 dont_filter 屬性在作怪,由於 spider 對象的 make_requests_from_url 方法把 Request 的 dont_filter 屬性設成了 True ,因此導致判重組件失效了:

def make_requests_from_url(self, url):
    return Request(url, callback=self.parse, dont_filter=True)

可以看到這個方法其實非常簡單,也可以自己手工構建 Request 對象,指定 callback ,並且 dont_filter 默認是 False 的,這樣就能得到想要的效果了。

其實 Scrapy 提供的 duplicate filter 是相當靈活的,它把中間件和判重算法分離開來,預置了兩種判重的實現,一個是 NullDupeFilter ,什么都不管,只會返回“不重復”,另一個是 RequestFingerprintDupeFilter (也是默認裝配的那個),使用一個 Request 的 fingerprint 來進行比對。fingerprint 主要是通過 url 取 hash 計算出來的,當然為了能處理簡單的參數位置變換的情況,減少漏判,具體可以參見 utils/request.py 的 request_fingerprint 方法。

要實現自己的 Duplicate Filter 有兩種方法,一種是以算法的形式,在 settings.py 里將 DUPEFILTER_CLASS 指定為自己定義的類,這樣會用自己的算法替換掉系統的算法;另一種方法是不影響系統默認的 filter ,另外再實現一個 filter middleware 添加到 SCHEDULER_MIDDLEWARES 里,寫法大同小異,只是接口有稍許不同,下面介紹第二種寫法。新建一個文件 scheduler_middleware.py (其實名字可以隨便取),在里面實現我們的判重中間件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from scrapy.core.exceptions import IgnoreRequest
from scrapy.extension import extensions
 
from crawl.cc98_util import extract_url, DOMAIN
 
class DuplicatesFilterMiddleware(object):
    def open_domain(self, domain):
        if domain == DOMAIN:
            self.init_fingerprints()
 
    def close_domain(self, domain):
        if domain == DOMAIN:
            self.fingerprints = None
 
    def enqueue_request(self, domain, request):
        if domain != DOMAIN or request.dont_filter:
            return
        fp = self.make_fingerprint(extract_url(request.url))
        if fp in self.fingerprints:
            raise IgnoreRequest('Skipped (request already seen)')
        self.fingerprints.add(fp)
 
    def make_fingerprint(self, dic):
        return '%s,%s,%s' % (dic['board_id'],dic['thread_id'],dic['page_num'])
 
    def init_fingerprints(self):
        self.fingerprints = set()

主要是要實現三個方法:open_domain, close_domain 和 enqueue_request ,如果發現 Request 對象應該丟棄的話,直接拋出 IgnoreRequest 異常即可。這里我用 extract_url 方法(就是正則匹配,就不細說了)提取出 board_id, thread_id 和 page_num 三個參數,將他們的值排列起來做成一個 fingerprint ,用在 cc98 這里是正好的。然后在 settings.py 里加入:

SCHEDULER_MIDDLEWARES = {
    'crawl.scheduler_middlewares.DuplicatesFilterMiddleware': 500
}

就可以用上我們自己的判重過濾了。 :) 到此為止本來關於本文標題的東西可以說已經講完了,不過這個 crawler 要完整還需要一些額外的東西,我就順便多說一下吧。

首先是抓取結果的處理,這次我並不是直接存儲 raw 的 HTML 頁面,而是將內容解析之后按照帖子結構存儲在數據庫里。在最近更新過之后發現原來的 ScrapedItem 在將來的版本里將會由 Item 來替代了,現在可以用類似於 ORM 的方式來定義 Item ,也許以后會做得像 Django 的 Model 那樣方便地用於數據庫上吧:

from scrapy.item import Item, Field
 
class CrawlItem(Item):
    board_id = Field()
    thread_id = Field()
    page_num = Field()
    raw = Field()
 
    def __str__(self):
        return '<CrawlItem %s,%s,%s>' % (self['board_id'],
                                         self['thread_id'],
                                         self['page_num'])
 
class PostBundleItem(Item):
    posts = Field()
 
    def __str__(self):
        return '<PostBundleItem %d>' % len(self['posts'])

一次下載的一個頁面會得到一個 CrawlItem 對象,這是論壇里一頁的內容,一頁內通常有多個 post ,所以我再添加了一個 pipeline 來將一個頁面解析成多個 post ,存儲在一個 PostBundleItem 對象中。pipeline 就不細說了,上次介紹過,只要定義 process_item 方法即可,這樣在 settings.py 里我就依次有兩個 pipeline :

ITEM_PIPELINES = ['crawl.pipelines.PostParsePipeline', 'crawl.pipelines.PostStorePipeline']

代碼也不多帖了,畫一個圖直觀一點(畫這個圖里的字體實在是太丑了,但是手邊沒有好用的工具,也只能暫時將就了 -,-bb):

cc98_crawl

不過,如果你有注意到,不管是 Scrapy 內置的判重組件還是我上面的組件,所用的數據結構都是直接放在內存里的,所以說如果你一次 crawl 結束(包括正常結束,或者斷電、斷網、程序出錯等異常結束)之后,如果再重新啟動 crawler ,判重組件會從零開始,於是許多的頁面又要重新下載一次。這當然不是我們說希望的,因此我要在 crawler 啟動的時候從數據庫里提取出已經抓取了的頁面來初始化 duplicates filter ;另外,為了達到增量抓取的目的,我希望每次 crawler 啟動的時候從上一次結束的地方開始抓取,而不是每次都使用同一個固定的 seed url ,這也需要用到數據庫里已經存在的數據。

由於各個組件都要訪問數據庫,因此我做一個 Scrapy Extension 來管理數據庫連接。在 Scrapy 中做一個 Extension 也是一件很容易的事情,隨意寫一個類就可以作為 Extension ,沒有任何限制或規定,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import sqlite3
from os import path
 
from scrapy.conf import settings
from scrapy.core import signals
from scrapy.xlib.pydispatch import dispatcher
from scrapy.core.exceptions import NotConfigured
 
class SqliteManager(object):
    def __init__(self):
        if settings.get('SQLITE_DB_FILE') is None:
            raise NotConfigured
 
        self.conn = None
        self.initialize()
        dispatcher.connect(self.finalize, signals.engine_stopped)
 
    def initialize(self):
        filename = settings['SQLITE_DB_FILE']
        if path.exists(filename):
            self.conn = sqlite3.connect(filename)
        else:
            self.conn = self.create_table(filename)
 
    def finalize(self):
        if self.conn is not None:
            self.conn.commit()
            self.conn.close()
            self.conn = None
 
    def create_table(self, filename):
        # ... snipped ...

然后在 settings.py 里指定加載該 Extension 即可:

EXTENSIONS = {
    'crawl.extensions.SqliteManager': 500
}

同 middleware 一樣,后面那個 500 表示優先級。另外,上面的代碼中如果發現沒有定義 SQLITE_DB_FILE 變量(也是在 settings.py 中)的話會拋出 NotConfigured 異常,這個異常並不會導致 crawler 啟動出錯,此時 Scrapy 只是會簡單地選擇不啟用該 Extension 。其實我這里的 SqliteManager 是一個相當核心的組件,如果不啟用的話整個系統就沒法正常工作了,所以這樣的行為似乎應該修改一下。 :)

Extension 定義好之后在程序中引用也很方便,把 scrapy.extension 里的 extensions 對象 import 進來,然后用 extensions.enabled['SqliteManager'] 就可以引用到系統為你初始化好的那個 Extension 對象了,以這種引用方式看來,Extension 的類名似乎得是 unique 的才行。

有一點要注意的地方就是各個組件之間的依賴關系,特別是在初始化的時候,例如,我這里 DuplicatesFilterMiddleware 和 spider 在初始化的時候都會用到 SqliteManager 的數據庫連接,因此 SqliteManager 需要在對象構造的時候就建立好連接(或者惰性按需建立也可以),而不是像上一篇文章中那樣在 signals.engine_started 的時候再建立連接。而且,由於 Scrapy 建立在 Twisted 這個看起來非常魔幻的異步網絡庫的基礎上,程序出錯之后想要輕松地調試幾乎是不可能的,得到的錯誤信息和 trackback 通常都是風馬牛不相及,這個時候似乎只有反復檢查代碼是最終有效的“調試”方式了。 :D

這樣,我們將前面定義的 init_fingerprints 方法稍作修改,不再是只建立一個空的 set ,而是從數據庫里做一些初始化工作:

26
27
28
29
30
31
32
def init_fingerprints(self):
    self.fingerprints = set()
    mgr = extensions.enabled['SqliteManager']
    cursor = mgr.conn.execute('select distinct board_id, thread_id, page_num from posts')
    for board_id, thread_id, page_num in cursor:
        fp = self.make_fingerprint({'board_id':board_id,'thread_id':thread_id,'page_num':page_num})
        self.fingerprints.add(fp)

在上一篇文章的介紹中,spider 使用 start_urls 屬性作為 seed url ,其實實際使用的是一個 start_requests 方法,不過 BaseSpider 提供了一個默認實現,就是從 start_urls 構建初始 Requests ,我們為了實現增量 crawler ,只要重新定義 spider 的該方法即可:

15
16
17
18
19
20
21
22
23
24
25
26
def start_requests(self):
    mgr = extensions.enabled['SqliteManager']
    val = mgr.conn.execute('select max(page_num) from posts').fetchone()[0]
    if val is None:
        page_num = 1
    else:
        page_num = val
 
    # the last page may be incomplete, so we set dont_filter to be True to
    # force re-crawling it
    return [Request(make_url(board_id=self.board_id, thread_id=self.thread_id,
                             page_num=page_num), callback=self.parse, dont_filter=True)]

雖然跑題已經跑得有點遠了,不過這樣一來,我們就得到了一個比先前更加完善的爬蟲了。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM