一、調度器
配置
SCHEDULER = 'scrapy.core.scheduler.Scheduler' #表示scrapy包下core文件夾scheduler文件Scheduler類
# 可以通過設置SCHEDULER值來使用自定義的調度器,
源碼
import os import json import logging import warnings from os.path import join, exists from queuelib import PriorityQueue from scrapy.utils.misc import load_object, create_instance from scrapy.utils.job import job_dir from scrapy.utils.deprecate import ScrapyDeprecationWarning logger = logging.getLogger(__name__) class Scheduler: """ Scrapy Scheduler. It allows to enqueue requests and then get a next request to download. Scheduler is also handling duplication filtering, via dupefilter. Prioritization and queueing is not performed by the Scheduler. User sets ``priority`` field for each Request, and a PriorityQueue (defined by :setting:`SCHEDULER_PRIORITY_QUEUE`) uses these priorities to dequeue requests in a desired order. Scheduler uses two PriorityQueue instances, configured to work in-memory and on-disk (optional). When on-disk queue is present, it is used by default, and an in-memory queue is used as a fallback for cases where a disk queue can't handle a request (can't serialize it). :setting:`SCHEDULER_MEMORY_QUEUE` and :setting:`SCHEDULER_DISK_QUEUE` allow to specify lower-level queue classes which PriorityQueue instances would be instantiated with, to keep requests on disk and in memory respectively. Overall, Scheduler is an object which holds several PriorityQueue instances (in-memory and on-disk) and implements fallback logic for them. Also, it handles dupefilters. """ def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None, logunser=False, stats=None, pqclass=None, crawler=None): self.df = dupefilter self.dqdir = self._dqdir(jobdir) self.pqclass = pqclass #優先級隊列 self.dqclass = dqclass #磁盤隊列 self.mqclass = mqclass #內存隊列 self.logunser = logunser self.stats = stats self.crawler = crawler
#Scheduler是這里創建的Scheduler.from_crawler @classmethod def from_crawler(cls, crawler): settings = crawler.settings dupefilter_cls = load_object(settings['DUPEFILTER_CLASS']) dupefilter = create_instance(dupefilter_cls, settings, crawler) pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE']) if pqclass is PriorityQueue: warnings.warn("SCHEDULER_PRIORITY_QUEUE='queuelib.PriorityQueue'" " is no longer supported because of API changes; " "please use 'scrapy.pqueues.ScrapyPriorityQueue'", ScrapyDeprecationWarning) from scrapy.pqueues import ScrapyPriorityQueue pqclass = ScrapyPriorityQueue dqclass = load_object(settings['SCHEDULER_DISK_QUEUE']) mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE']) logunser = settings.getbool('SCHEDULER_DEBUG') return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser, stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass, crawler=crawler)
#dupefilter是DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter',主要是對請求生成提取特征值去重用的
#還有三個隊列:pqclass是一個優先級隊列,dqclass是磁盤隊列,mqclass 內存隊列
#主要看下這兩個方法enqueue_request跟next_request。這兩個一個是請求進隊列,一個是從隊列里取出數據進行處理
def has_pending_requests(self): return len(self) > 0 def open(self, spider): self.spider = spider self.mqs = self._mq() self.dqs = self._dq() if self.dqdir else None return self.df.open() def close(self, reason): if self.dqs: state = self.dqs.close() self._write_dqs_state(self.dqdir, state) return self.df.close(reason)
#enqueue_request 會優先進入磁盤隊列,磁盤隊列沒有或push異常才改為內存隊列 def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False dqok = self._dqpush(request) if dqok: self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider) else: self._mqpush(request) self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider) self.stats.inc_value('scheduler/enqueued', spider=self.spider) return True
#next_request優先從內存隊列里取,然后才是磁盤隊列 def next_request(self): request = self.mqs.pop() if request: self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider) else: request = self._dqpop() if request: self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider) if request: self.stats.inc_value('scheduler/dequeued', spider=self.spider) return request def __len__(self): return len(self.dqs) + len(self.mqs) if self.dqs else len(self.mqs) def _dqpush(self, request): if self.dqs is None: return try: self.dqs.push(request) except ValueError as e: # non serializable request if self.logunser: msg = ("Unable to serialize request: %(request)s - reason:" " %(reason)s - no more unserializable requests will be" " logged (stats being collected)") logger.warning(msg, {'request': request, 'reason': e}, exc_info=True, extra={'spider': self.spider}) self.logunser = False self.stats.inc_value('scheduler/unserializable', spider=self.spider) return else: return True def _mqpush(self, request): self.mqs.push(request) def _dqpop(self): if self.dqs: return self.dqs.pop() def _mq(self): """ Create a new priority queue instance, with in-memory storage """ return create_instance(self.pqclass, settings=None, crawler=self.crawler, downstream_queue_cls=self.mqclass, key='') def _dq(self): """ Create a new priority queue instance, with disk storage """ state = self._read_dqs_state(self.dqdir) q = create_instance(self.pqclass, settings=None, crawler=self.crawler, downstream_queue_cls=self.dqclass, key=self.dqdir, startprios=state) if q: logger.info("Resuming crawl (%(queuesize)d requests scheduled)", {'queuesize': len(q)}, extra={'spider': self.spider}) return q def _dqdir(self, jobdir): """ Return a folder name to keep disk queue state at """ if jobdir: dqdir = join(jobdir, 'requests.queue') if not exists(dqdir): os.makedirs(dqdir) return dqdir
#還有一個點隊列停止或啟動通過這個命令將內存未完成的隊列保存到磁盤以便啟動后繼續執行scrapy crawl somespider -s JOBDIR=crawls/somespider-1
def _read_dqs_state(self, dqdir): path = join(dqdir, 'active.json') if not exists(path): return () with open(path) as f: return json.load(f) def _write_dqs_state(self, dqdir, state): with open(join(dqdir, 'active.json'), 'w') as f: json.dump(state, f)
二、調度器中間件
早期scrapy版本0.9中具有scrapy.contrib.schedulermiddleware.SchedulerMiddleware類,可以使用。后期自己找不到
三、自定義調度器
settings里面的配置:
'''當下面配置了這個(scrapy-redis)時候,下面的調度器已經配置在scrapy-redis里面了'''
##########連接配置########
REDIS_HOST = '127.0.0.1' REDIS_PORT = 6379 # REDIS_PARAMS = {'password':'xxxx'} #Redis連接參數,默認:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,}) REDIS_ENCODING = "utf-8" # REDIS_URL = 'redis://user:pass@hostname:6379' #連接URL(優先於以上配置)
###########調度器##########
# from scrapy_pro1.scheduler_test import Self_Scheduler #SCHEDULER='scrapy_pro1.scheduler_test.Self_Scheduler'##可以使用自己定制的調度器 SCHEDULER='scrapy_redis.scheduler.Scheduler'#自帶的調度器 ##有scrapy_redis里面的調度器,也就是調度器》》scrapy-redis里面的調度器 SCHEDULER_QUEUE_KEY = '%(spider)s:requests' # 調度器中請求存放在redis中的key #每一個爬蟲都有自己自己的歷史記錄 ''' { 里面是全部的爬蟲(里面有相對應的爬蟲記錄) chouti:requets(封裝了>>url:'',callback=''):'xx結果' 由於redis不能存放request對象,所以需要序列化一下,生成字符串然后保存在redis里面,作為key存在 pickle.dumps(chouti:requets,requets里面封裝了要訪問url和回調函數,chouti:requets就是key,要去這里面的數據的時候應該也是conn.smembers('chouti:requets') } ''' SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat" # 對保存到redis中的數據進行序列化,默認使用pickle ##將requets對象進行序列化處理,作為key保存 SCHEDULER_PERSIST = False # 是否在關閉時候保留原來的調度器和去重記錄,True=保留,False=清空 ##是否在關閉的時候保留數據REDIS_PARAMS SCHEDULER_FLUSH_ON_START = True # 是否在開始之前清空 調度器和去重記錄,True=清空,False=不清空 ##在爬蟲啟動的時候清空或者是不清空 # SCHEDULER_IDLE_BEFORE_CLOSE = 10 # 去調度器中獲取數據時,如果為空,最多等待時間(最后沒數據,未獲取到)。 #當沒有數據的時候,最多等待的時間 SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' # 去重規則,在redis中保存時對應的key》》chouti:dupefilter ##爬蟲相對應的記錄,對應的鍵 SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' # 去重規則對應處理的類 START_URLS_KEY = '%(name)s:start_urls' ##你要保存去重規則的鍵 REDIS_START_URLS_AS_SET = False
scrapy-redis調度器源碼:
from scrapy_redis.scheduler import Scheduler import importlib import six##判斷類型,six.xxtype from scrapy.utils.misc import load_object from . import connection, defaults # TODO: add SCRAPY_JOB support. class Scheduler(object): """Redis-based scheduler Settings -------- SCHEDULER_PERSIST : bool (default: False) Whether to persist or clear redis queue. SCHEDULER_FLUSH_ON_START : bool (default: False) Whether to flush redis queue on start. SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0) How many seconds to wait before closing if no message is received. SCHEDULER_QUEUE_KEY : str Scheduler redis key. SCHEDULER_QUEUE_CLASS : str Scheduler queue class. SCHEDULER_DUPEFILTER_KEY : str Scheduler dupefilter redis key. SCHEDULER_DUPEFILTER_CLASS : str Scheduler dupefilter class. SCHEDULER_SERIALIZER : str Scheduler serializer. """ def __init__(self, server, persist=False, flush_on_start=False, queue_key=defaults.SCHEDULER_QUEUE_KEY, queue_cls=defaults.SCHEDULER_QUEUE_CLASS, dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY, dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS, idle_before_close=0, serializer=None): """Initialize scheduler. Parameters ---------- server : Redis The redis server instance. persist : bool Whether to flush requests when closing. Default is False. flush_on_start : bool Whether to flush requests on start. Default is False. queue_key : str Requests queue key. queue_cls : str Importable path to the queue class. dupefilter_key : str Duplicates filter key. dupefilter_cls : str Importable path to the dupefilter class. idle_before_close : int Timeout before giving up. """ if idle_before_close < 0: raise TypeError("idle_before_close cannot be negative") self.server = server self.persist = persist self.flush_on_start = flush_on_start self.queue_key = queue_key self.queue_cls = queue_cls self.dupefilter_cls = dupefilter_cls self.dupefilter_key = dupefilter_key self.idle_before_close = idle_before_close self.serializer = serializer self.stats = None def __len__(self): return len(self.queue) @classmethod def from_settings(cls, settings):##settings是傳過來的配置文件信息 kwargs = { 'persist': settings.getbool('SCHEDULER_PERSIST'), 'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'), 'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'), } # If these values are missing, it means we want to use the defaults. optional = { # TODO: Use custom prefixes for this settings to note that are # specific to scrapy-redis. 'queue_key': 'SCHEDULER_QUEUE_KEY', 'queue_cls': 'SCHEDULER_QUEUE_CLASS', 'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY', # We use the default setting name to keep compatibility. 'dupefilter_cls': 'DUPEFILTER_CLASS', 'serializer': 'SCHEDULER_SERIALIZER', } ##讀取上面的配置文件,取settings里面找到相對應的值,拿到settings后面的結果 for name, setting_name in optional.items(): val = settings.get(setting_name)##匹配settings對應的值出來(自己配置的) if val: kwargs[name] = val # Support serializer as a path to a module. if isinstance(kwargs.get('serializer'), six.string_types): kwargs['serializer'] = importlib.import_module(kwargs['serializer']) server = connection.from_settings(settings)##取配置文件里面讀取自己配置的連接相關的配置文件 # Ensure the connection is working. server.ping() return cls(server=server, **kwargs)##這里開始實例化scheduler對象,開始正式執行爬蟲,cls就是當前的類 @classmethod def from_crawler(cls, crawler):##當你執行調度器scrapy-redis的時候,就會傳入settigs進來,配置信息是在crawler.settings instance = cls.from_settings(crawler.settings)##crawlwe.settinsg拿到的是setting對象<scrapy.settings.Settings object at 0x00000265B2E41940> '''可以調用里面的方法,通過crawler.settings.get("host")''' # FIXME: for now, stats are only supported from this constructor instance.stats = crawler.stats return instance def open(self, spider): self.spider = spider try: self.queue = load_object(self.queue_cls)( server=self.server, spider=spider, key=self.queue_key % {'spider': spider.name}, serializer=self.serializer, ) except TypeError as e: raise ValueError("Failed to instantiate queue class '%s': %s", self.queue_cls, e) try: self.df = load_object(self.dupefilter_cls)( server=self.server, key=self.dupefilter_key % {'spider': spider.name}, debug=spider.settings.getbool('DUPEFILTER_DEBUG'), ) except TypeError as e: raise ValueError("Failed to instantiate dupefilter class '%s': %s", self.dupefilter_cls, e) if self.flush_on_start: self.flush() # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue)) def close(self, reason): if not self.persist: self.flush() def flush(self): self.df.clear() self.queue.clear() ##開始真正執行下面的爬蟲部分了,上面的只是取讀取配置信息 def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): #判斷requets里面是否封裝了dont_filter ##判斷之前是否已經存在此爬蟲 self.df.log(request, self.spider) return False ##已經訪問過不用在訪問了,返回false if self.stats: ##如果已經訪問過的話 self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) ##如果未訪問過的話,將這個requets對象,加進調度器里面,以便下載器調度使用 self.queue.push(request) ##其請求的調度其里面 return True##沒有訪問過的url,將他添加進調度器里面 def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request def has_pending_requests(self): return len(self) > 0
在scray-redis調度器scheduler里面:
實例化調度器對象:scrapy crawl baidu --nolog
最開始執行from_crawler:
@classmethod
def from_crawler(cls, crawler):##當你執行調度器scrapy-redis的時候,就會傳入settigs進來,配置信息是在crawler.settings
instance = cls.from_settings(crawler.settings)##crawlwe.settinsg拿到的是setting對象<scrapy.settings.Settings object at 0x00000265B2E41940>
'''可以調用里面的方法,通過crawler.settings.get("host")'''
# FIXME: for now, stats are only supported from this constructor
instance.stats = crawler.stats return instance##執行from_settings,傳入參數settings
執行from_settings(傳入參數settings,配置信息):
作用:讀取配置信息
@classmethod
def from_settings(cls, settings):##settings是傳過來的配置文件信息
kwargs = { 'persist': settings.getbool('SCHEDULER_PERSIST'), 'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'), 'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'), } # If these values are missing, it means we want to use the defaults. optional = { # TODO: Use custom prefixes for this settings to note that are # specific to scrapy-redis. 'queue_key': 'SCHEDULER_QUEUE_KEY', 'queue_cls': 'SCHEDULER_QUEUE_CLASS', 'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY', # We use the default setting name to keep compatibility. 'dupefilter_cls': 'DUPEFILTER_CLASS', 'serializer': 'SCHEDULER_SERIALIZER', } ##讀取上面的配置文件,取settings里面找到相對應的值,拿到settings后面的結果 for name, setting_name in optional.items(): val = settings.get(setting_name)##匹配settings對應的值出來(自己配置的) if val: kwargs[name] = val
'''
val = settings.get(setting_name)取配置文件settings里面拿到相對應的值出來,settings里面的鍵是在這里面循環拿到的(optional),也就是optional后面的值,對應settinsg里面的鍵
kwargs[name] = val#存進去
'''
# Support serializer as a path to a module.
##序列化操作,爬蟲key序列化
if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
##取settings里面拿到相對應的配置信息,連接上redis,在settings里面的配置信息就是:
'''
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
# REDIS_PARAMS = {'password':'xxxx'} #Redis連接參數,默認:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
REDIS_ENCODING = "utf-8"
# REDIS_URL = 'redis://user:pass@hostname:6379' #連接URL(優先於以上配置)
'''
server = connection.from_settings(settings)##取配置文件里面讀取自己配置的連接相關的配置文件,連接redis操作
# Ensure the connection is working.
server.ping()##可以測試有沒有連接成功
return cls(server=server, **kwargs) ##開始實例化scheduler對象,執行爬蟲,cls是當前的類
連接redis操作:from_settings
from_settings = get_redis_from_settings
def get_redis_from_settings(settings): params = defaults.REDIS_PARAMS.copy() ##拿到默認的配置參數: ''' REDIS_PARAMS = { 'socket_timeout': 30, 'socket_connect_timeout': 30, 'retry_on_timeout': True, 'encoding': REDIS_ENCODING, } ''' params.update(settings.getdict('REDIS_PARAMS'))##取settings里面讀取相對應的連接的配合信息,字典擴展一下,后面是settings配置的值,加進去 ##把配置settings里面的信息加進來 # XXX: Deprecate REDIS_* settings. for source, dest in SETTINGS_PARAMS_MAP.items(): val = settings.get(source)##settings.get這個是settings里面的字典名稱,DNA在settings里面沒有配置名稱,所以自己是取模塊文件取靜態方法,直接后面是模塊名字 ''' 這個操作是去到這里的鍵 然后在settigs里面拿到拿到相對應的值出來 ''' if val: params[dest] = val # Allow ``redis_cls`` to be a path to a class. if isinstance(params.get('redis_cls'), six.string_types): params['redis_cls'] = load_object(params['redis_cls']) return get_redis(**params)
getdict方法:
def getdict(self, name, default=None):
value = self.get(name, default or {}) if isinstance(value, six.string_types): value = json.loads(value) return dict(value)
實例化scheduler對象的時候,開始執行爬蟲:
##開始真正執行下面的爬蟲部分了,上面的只是取讀取配置信息
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request): #判斷requets里面是否封裝了dont_filter ##判斷之前是否已經存在此爬蟲 self.df.log(request, self.spider) return False ##已經訪問過不用在訪問了,返回false if self.stats: ##如果已經訪問過的話 self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) ##如果未訪問過的話,將這個requets對象,加進調度器里面,以便下載器調度使用 self.queue.push(request)##放進隊列里面,可能是先進先出,優先級隊列,取決於你在settings里面的配置 ##其請求的調度其里面 return True##沒有訪問過的url,將他添加進調度器里面
下載器去隊列里面獲取數據:queue
def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout)##每pop一次的時候,可以拿出當前取出的requets對象 if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request
參考:
https://cuiqingcai.com/6058.html
