pyspider源碼解讀--調度器scheduler.py
scheduler.py
首先從pyspider的根目錄下找到/pyspider/scheduler/scheduler.py
其中定義了四個類:
class Project(object)
class Scheduler(object)
class OneScheduler(Scheduler)
class ThreadBaseScheduler(Scheduler)
這四個類的作用分別如下:
Project
單個項目的Paused狀態切換即是由這個類實例化的對象來完成,其中的方法有—paused(),update(project_info)
Scheduler
整體的調度過程,包括從入庫,讀庫,對task的操作(主要是各個隊列之間的get和put,以及task執行包的封裝),狀態的切換
OneScheduler
debug中用到的類,其繼承自Scheduler,不同的是,它不會將需要抓取的task丟入到一個消費者隊列中,而是會直接調用一個process立刻去執行fetch(其實現在send_task這一函數)
ThreadBaseScheduler
這個類用到的地方很少,在pyspider/libs/bench.py中用到過,作壓力測試
主要看Scheduler類
Scheduler(object)
先看所有函數的名字以及其中的注釋,首先看到的是run以及run_once
關於變量,在具體看某一個函數的時候,可以再去查詢這個變量第一次出現的位置,結合函數的作用就能大致明白這個變量的功能了。
def run_once(self):
'''comsume queues and feed tasks to fetcher, once'''
self._update_projects()
self._check_task_done()
self._check_request()
while self._check_cronjob():
pass
self._check_select()
self._check_delete()
self._try_dump_cnt()
self._update_projects()
更新project的狀態,並且回顯到webui上 (這個操作經由幾步調用,其會去讀所有項目的庫,並且load到各自的task_queue中)
self._check_task_done()
去不斷取出scheduler的status_queue優先級最高的任務,並檢測其狀態
self._check_request()
其會優先去取_postpone_request中的延遲task,因為這些極有可能是上次版本運行改變遺留下的一些認為有(比如某個任務正在跑,我們手動stop了它,然后修改了腳本,再重新啟動它),將其發送給on_request
然后去獲取newtask_queue中的任務,並且不斷將這些任務交給on_request, on_request會作一些判斷,然后將其put到各自project的task_queue中
總結如上三個方法
上面的操作無非都是向各個項目的 task_queue中填充內容,從數據庫讀取 / 從新產生 的任務中生成
self._check_select()
這個函數會優先從緩沖隊列中讀取任務(當任務太多的時候,會將多余任務放到緩沖隊列中),然后正常遍歷所有的project,並且將其中的task_queue取出(在此之前,每個project首先會更新各自的task_queue的優先級隊列:兩個,一個time,一個process),並且使用自定義方法get()取得優先級最高的task(這個獲取是從proicess優先級隊列中取出的)
time優先級 和 process優先級 隊列 的關系是:process是真正被取出去執行的任務,time是生成process的一個條件,也就是從time中取時間優先級最高的不斷添加到process中
更新完了之后,將獲取到的taskid按順序保存,傳遞給_load_put_task()函數,經由其從數據庫中(根據taskid)讀信息,再傳遞給on_select_task()方法
on_select_task()這個函數是給每個taskid填充完整的信息,最后添加到所在project的active隊列中,表明其正在被調度
最后由send_task()方法傳遞到 out_queue隊列中(如果out_queue滿了,會放到緩沖deque中)
其余兩個方法
self._check_delete()
self._try_dump_cnt()
前者是檢測是否有到達約定時間需要刪除的項目,后者是定時回顯webui頁面上的任務數目狀態(60s一次)
run_once
這個函數的功能就是將需要被調度的taskid,經過一系列中間轉換,帶上了一些優先級,最后全部丟入到調度器類本身的一個out_queue隊列中,這個隊列作為一個生產者,供給fetch消費
詳細流程,隊列間傳遞消息
調度情況如上圖,其中的queue指的是作者在其中自己封裝實現的一些優先級隊列(其實現在/pyspider/scheduler/task_queue.py),deque一般都是被用作緩沖隊列。
--------------------------
1.隊列統計:方便查看爬蟲狀態,優化爬蟲爬取速度新增的狀態統計。
每個組件之間的數字就是對應不同隊列的排隊數量,通常就是0或個位數,如果達到了幾十甚至一百說明下游組件出現了瓶頸或錯誤,需要分析處理
2.組名:新建project后一般是不能修改project名字,如果需要特殊標記,可以通過更改group名字。
注:組名改為delete后如果狀態是stop狀態,24小時后會被系統自動刪除
3.運行狀態:五個
TODO : 新建項目后的默認狀態
STOP : 停止
CHECKING :只要修改了代碼,自動變成檢查狀態
DEBUG :在這個模式下運行,遇到錯誤信息會停止繼續運行
RUNNING : 這里運行時遇到錯誤會嘗試,如果還是錯誤會跳過這個任務繼續運行
4.速度控制:rate是每秒爬取頁面數 , burst是並發數 ,如1/3是三個並發,每秒爬取一個頁面。
5.簡單統計:5m是五分鍾內任務執行的情況 , 1h是一小時內任務統計 , 1d是一天內運行任務統計 , all是所有任務統計
6.任務列表:顯示最新任務列表,方便查看狀態,查看錯誤等
7.結果查看:查看項目爬取的結果(默認是保存到sqlite3 db中,支持以json和csv格式下載)