開發環境
基礎語法那章的內容我是在Docker容器中玩的,但是真正做項目的時候,沒有IDE的強大輔助功能來協助的話是很累人的一件事。因此本文中,我選擇使用Jetbrain
的Pycharm
這個IDE來開發、調試代碼。IDE的好處多多,比如:
- 強大的智能提示
- 強大的斷點調試
- 性能追蹤
- 方便好用的各種插件
- 各種自定義配置
需求
為了實踐Python,最先想到的就是要完成一個爬蟲程序,大概需求如下:
實施
可配置化我本身是計划通過DI(Dependency Injection
)這個技術來完成,不過查了下資料,由於Python和其他語言不太一樣,Python是可以多父類繼承,並且遵循Duck Typing
原則,因此DI在Python中並不實用(Python也是沒有Interface概念的)。但可以通過如下方式實現類似的邏輯:
# 假設a-class-name這個類包含在xxx.py文件中,首先引入這個文件中的內容
from xxx import *
# 然后執行以下這行代碼,這將初始化一個a-class-name類的實例
(lambda x: globals()[x])('a-class-name')
入口程序文件main.py
main.py
主要有幾個功能:
- 通過交互讓用戶輸入:項目名稱、網站首頁、線程數三個初始化變量
- 初始化數據庫訪問對象
- 初始化爬蟲對象
- 初始化線程池
- 執行程序
核心代碼如下:
from db_queue import *
...
def execute():
...
(lambda x: globals()[x])(project_settings.DB_CLASS_NAME)(home_page, project_name + '_pages')
Spider(project_name, home_page, DomainHelpers.get_domain_name(home_page), project_settings.HTML_RESOLVER_NAME)
worker = Worker(thread_count, project_name)
worker.create_threads()
worker.crawl()
execute()
邏輯解釋:
(lambda x: globals()[x])(project_settings.DB_CLASS_NAME)(home_page, project_name + '_pages')
,本例中DB_CLASS_NAME = 'MongoDbQueue'
,因此Python將在當前頁面的應用中查找名為MongoDbQueue
的類來執行初始化並傳入構造函數的參數:home_page
和project_name + '_pages'
- 初始化
Spider
類,以便在線程中執行爬取頁面 - 初始化指定數量的現成作為線程池以備后續使用,main.py執行完畢,線程將被自動回收
- 開始執行爬蟲程序
線程創建類worker.py文件
from db_queue import *
class Worker:
...
def __init__(self, thread_count, project_name):
Worker.DB = (lambda x: globals()[x])(project_settings.DB_CLASS_NAME)
...
def create_threads(self):
for _ in range(self.thread_count):
t = threading.Thread(target=self.__run_thread)
t.daemon = True
t.start()
def __run_thread(self):
while True:
url = self.queue.get()
Spider.crawl_page(threading.current_thread().name, url)
self.queue.task_done()
def __create_jobs(self):
for link in Worker.DB.get_pending_queue():
self.queue.put(link)
self.queue.join()
self.crawl()
def crawl(self):
urls = Worker.DB.get_pending_queue()
if len(urls) > 0:
self.__create_jobs()
邏輯解釋:
__init__
中將數據庫連接類保存到全局變量DB中create_threads
將初始化指定數量的線程數,設置為datmon=true
以便線程被創建之后一直存在,隨時可以被調用crawl
將獲取待爬列表之后,將其放入Spider所需的待爬隊列中self.queue.join()
是用來阻塞隊列,這樣隊列中的每一項都將只被調用一次__run_thread
和__create_jobs
這兩個方法是Worker內部調用的方法,不需要公開給其他人,因此加上前綴__
(兩個下划線)
數據庫操作基礎類
由於需要將數據庫操作做成可替換,因此必須實現數據庫操作的接口,而Python沒有Interface,但是可以使用abc(Abstract Based Class)來實現類似於Interface所需的功能。
代碼如下:
from abc import ABCMeta, abstractmethod
class DbBase(metaclass=ABCMeta):
@abstractmethod
def __init__(self, file_name):
pass
@staticmethod
@abstractmethod
def get_pending_queue():
pass
@staticmethod
@abstractmethod
def is_page_in_queue():
pass
@staticmethod
@abstractmethod
def save_pending_queue():
pass
@staticmethod
@abstractmethod
def set_page_crawled():
pass
邏輯解釋:
class DbBase(metaclass=ABCMeta)
表示DbBase
類的元類為ABCMeta
@abstractmethod
則表明該方法在繼承了DbBase
的類中必須被實現,如果沒有被實現,執行時將會報錯:TypeError: Can't instantiate abstract class XXXX with abstract methods xxxx
數據庫存儲操作db_queue.py文件
from pymongo import *
from abc_base.db_base import DbBase
...
class MongoDbQueue(DbBase):
def __init__(self, home_page, tbl_name='pages'):
...
MongoDbQueue.db = MongoClient(project_settings.DB_CONNECTION_STRING)[project_settings.DB_REPOSITORY_NAME]
...
# create unique index
MongoDbQueue.db[MongoDbQueue.tbl_name].create_index('url', unique=True)
@staticmethod
def get_pending_queue():
...
@staticmethod
def is_page_in_queue(url):
...
@staticmethod
def save_pending_queue(urls):
...
@staticmethod
def set_page_crawled(url):
...
邏輯解釋:
class MongoDbQueue(DbBase):
表示該類繼承了DbBase
,因此必須實現DbBase
中定義的幾個方法__init__
、get_pending_queue
、is_page_in_queue
、save_pending_queue
及set_page_crawled
- 為了確保相同的url絕對不會重復,在數據庫層也增加一個
Unique Index
以便從數據庫層面也做好驗證 get_pending_queue
將所有未被爬過的頁面列表返回is_page_in_queue
判斷是否頁面在待爬列表中save_pending_queue
,這個方法是在爬取某個頁面,抓取了該頁面上所有新的代碼鏈接之后,將數據庫中不存在的連接保存為待爬頁面set_page_crawled
,這個方法將數據庫中已存在,且狀態為未爬過的頁面,設置為已爬,該方法將在爬蟲爬好某個頁面之后被調用
爬蟲文件spider.py文件
...
class Spider:
...
def __init__(self, base_url, domain_name, html_resolver):
...
Spider.crawl_page('First spider', Spider.BASE_URL)
@staticmethod
def crawl_page(thread_name, page_url):
if Spider.DB.is_page_in_queue(page_url):
...
urls = Spider.add_links_to_queue(Spider.gather_links(page_url))
Spider.DB.save_pending_queue(urls)
Spider.DB.set_page_crawled(page_url)
@staticmethod
def gather_links(page_url):
html_string = ''
...
# to make self-signed ssl works, pass variable 'context' to function 'urlopen'
context = ssl._create_unverified_context()
response = urlopen(page_url, context=context)
...
finder = (lambda x: globals()[x])(Spider.HTML_RESOLVER)(Spider.BASE_URL, page_url)
return finder.page_links()
@staticmethod
def add_links_to_queue(urls):
...
for url in urls:
if Spider.DOMAIN_NAME != DomainHelpers.get_domain_name(url):
continue
...
邏輯解釋:
Spider.DB = (lambda x: globals()[x])(project_settings.DB_CLASS_NAME)
這一行依然是動態初始化數據庫操作類context = ssl._create_unverified_context()
,有時候有些自簽名ssl證書,執行urlopen
方法時會報錯,需要創建這個context變量來避免這個錯誤產生finder = (lambda x: globals()[x])(Spider.HTML_RESOLVER)(Spider.BASE_URL, page_url)
這行也是通過動態初始化的方式,按照配置文件中指定的解析類來解析html內容,如果想自定義解析內容,只要重新實現一個解析類即可add_links_to_queue
這個方法是確保只會將當前域名相關的頁面保存起來以便后續繼續爬,如果不加這個判斷,一旦頁面上有一個www.weibo.com
這樣的鏈接的話,那爬蟲估計會把整個互聯網上的內容都爬一遍。。。
html解析html_resolver.py文件
class HtmlResolver(HTMLParser):
...
def handle_starttag(self, tag, attrs):
if tag == 'a':
for (attribute, value) in attrs:
if attribute == 'href':
url = parse.urljoin(self.base_url, value)
self.links.add(url)
...
這個類決定了我們爬取頁面的邏輯,這里我們只抓去鏈接(也就是a標簽)中的href
屬性中的內容。
執行過程動圖
附錄
本Demo完整代碼已經放到Github上: https://github.com/fisherdan/crawler。
本文在博客園和我的個人博客www.fujiabin.com上同步發布。轉載請注明來源。