settings里面的配置:
'''當下面配置了這個(scrapy-redis)時候,下面的調度器已經配置在scrapy-redis里面了'''
##########連接配置########
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
# REDIS_PARAMS = {'password':'xxxx'} #Redis連接參數,默認:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
REDIS_ENCODING = "utf-8"
# REDIS_URL = 'redis://user:pass@hostname:6379' #連接URL(優先於以上配置)
###########調度器##########
# from scrapy_pro1.scheduler_test import Self_Scheduler
#SCHEDULER='scrapy_pro1.scheduler_test.Self_Scheduler'##可以使用自己定制的調度器
SCHEDULER='scrapy_redis.scheduler.Scheduler'#自帶的調度器
##有scrapy_redis里面的調度器,也就是調度器》》scrapy-redis里面的調度器
SCHEDULER_QUEUE_KEY = '%(spider)s:requests' # 調度器中請求存放在redis中的key
#每一個爬蟲都有自己自己的歷史記錄
'''
{
里面是全部的爬蟲(里面有相對應的爬蟲記錄)
chouti:requets(封裝了>>url:'',callback=''):'xx結果'
由於redis不能存放request對象,所以需要序列化一下,生成字符串然后保存在redis里面,作為key存在
pickle.dumps(chouti:requets,requets里面封裝了要訪問url和回調函數,chouti:requets就是key,要去這里面的數據的時候應該也是conn.smembers('chouti:requets')
}
'''
SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat" # 對保存到redis中的數據進行序列化,默認使用pickle
##將requets對象進行序列化處理,作為key保存
SCHEDULER_PERSIST = False # 是否在關閉時候保留原來的調度器和去重記錄,True=保留,False=清空
##是否在關閉的時候保留數據REDIS_PARAMS
SCHEDULER_FLUSH_ON_START = True # 是否在開始之前清空 調度器和去重記錄,True=清空,False=不清空
##在爬蟲啟動的時候清空或者是不清空
# SCHEDULER_IDLE_BEFORE_CLOSE = 10 # 去調度器中獲取數據時,如果為空,最多等待時間(最后沒數據,未獲取到)。
#當沒有數據的時候,最多等待的時間
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' # 去重規則,在redis中保存時對應的key》》chouti:dupefilter
##爬蟲相對應的記錄,對應的鍵
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' # 去重規則對應處理的類
START_URLS_KEY = '%(name)s:start_urls'
##你要保存去重規則的鍵
REDIS_START_URLS_AS_SET = False
在scray-redis調度器scheduler里面:
實例化調度器對象:scrapy crawl baidu --nolog
最開始執行from_crawler:
@classmethod
def from_crawler(cls, crawler):##當你執行調度器scrapy-redis的時候,就會傳入settigs進來,配置信息是在crawler.settings
instance = cls.from_settings(crawler.settings)##crawlwe.settinsg拿到的是setting對象<scrapy.settings.Settings object at 0x00000265B2E41940>
'''可以調用里面的方法,通過crawler.settings.get("host")'''
# FIXME: for now, stats are only supported from this constructor
instance.stats = crawler.stats
return instance##執行from_settings,傳入參數settings
執行from_settings(傳入參數settings,配置信息):
作用:讀取配置信息
@classmethod
def from_settings(cls, settings):##settings是傳過來的配置文件信息
kwargs = {
'persist': settings.getbool('SCHEDULER_PERSIST'),
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
}
# If these values are missing, it means we want to use the defaults.
optional = {
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
'queue_key': 'SCHEDULER_QUEUE_KEY',
'queue_cls': 'SCHEDULER_QUEUE_CLASS',
'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
# We use the default setting name to keep compatibility.
'dupefilter_cls': 'DUPEFILTER_CLASS',
'serializer': 'SCHEDULER_SERIALIZER',
}
##讀取上面的配置文件,取settings里面找到相對應的值,拿到settings后面的結果
for name, setting_name in optional.items():
val = settings.get(setting_name)##匹配settings對應的值出來(自己配置的)
if val:
kwargs[name] = val
'''
val = settings.get(setting_name)取配置文件settings里面拿到相對應的值出來,settings里面的鍵是在這里面循環拿到的(optional),也就是optional后面的值,對應settinsg里面的鍵
kwargs[name] = val#存進去
'''
# Support serializer as a path to a module.
##序列化操作,爬蟲key序列化
if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
##取settings里面拿到相對應的配置信息,連接上redis,在settings里面的配置信息就是:
'''
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379 # REDIS_PARAMS = {'password':'xxxx'} #Redis連接參數,默認:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,}) REDIS_ENCODING = "utf-8" # REDIS_URL = 'redis://user:pass@hostname:6379' #連接URL(優先於以上配置)
'''
server = connection.from_settings(settings)##取配置文件里面讀取自己配置的連接相關的配置文件,連接redis操作
# Ensure the connection is working.
server.ping()##可以測試有沒有連接成功
return cls(server=server, **kwargs)
##開始實例化scheduler對象,執行爬蟲,cls是當前的類
連接redis操作:from_settings
from_settings = get_redis_from_settings
def get_redis_from_settings(settings):
params = defaults.REDIS_PARAMS.copy()
##拿到默認的配置參數:
'''
REDIS_PARAMS = {
'socket_timeout': 30,
'socket_connect_timeout': 30,
'retry_on_timeout': True,
'encoding': REDIS_ENCODING,
}
'''
params.update(settings.getdict('REDIS_PARAMS'))##取settings里面讀取相對應的連接的配合信息,字典擴展一下,后面是settings配置的值,加進去
##把配置settings里面的信息加進來
# XXX: Deprecate REDIS_* settings.
for source, dest in SETTINGS_PARAMS_MAP.items():
val = settings.get(source)##settings.get這個是settings里面的字典名稱,DNA在settings里面沒有配置名稱,所以自己是取模塊文件取靜態方法,直接后面是模塊名字
'''
這個操作是去到這里的鍵
然后在settigs里面拿到拿到相對應的值出來
'''
if val:
params[dest] = val
# Allow ``redis_cls`` to be a path to a class.
if isinstance(params.get('redis_cls'), six.string_types):
params['redis_cls'] = load_object(params['redis_cls'])
return get_redis(**params)
getdict方法:
def getdict(self, name, default=None):
value = self.get(name, default or {})
if isinstance(value, six.string_types):
value = json.loads(value)
return dict(value)
實例化scheduler對象的時候,開始執行爬蟲:
##開始真正執行下面的爬蟲部分了,上面的只是取讀取配置信息
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
#判斷requets里面是否封裝了dont_filter
##判斷之前是否已經存在此爬蟲
self.df.log(request, self.spider)
return False
##已經訪問過不用在訪問了,返回false
if self.stats:
##如果已經訪問過的話
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
##如果未訪問過的話,將這個requets對象,加進調度器里面,以便下載器調度使用
self.queue.push(request)##放進隊列里面,可能是先進先出,優先級隊列,取決於你在settings里面的配置
##其請求的調度其里面
return True##沒有訪問過的url,將他添加進調度器里面
下載器去隊列里面獲取數據:queue
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)##每pop一次的時候,可以拿出當前取出的requets對象
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
scrapy-redis調度器源碼:
from scrapy_redis.scheduler import Scheduler
import importlib
import six##判斷類型,six.xxtype
from scrapy.utils.misc import load_object
from . import connection, defaults
# TODO: add SCRAPY_JOB support.
class Scheduler(object):
"""Redis-based scheduler
Settings
--------
SCHEDULER_PERSIST : bool (default: False)
Whether to persist or clear redis queue.
SCHEDULER_FLUSH_ON_START : bool (default: False)
Whether to flush redis queue on start.
SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
How many seconds to wait before closing if no message is received.
SCHEDULER_QUEUE_KEY : str
Scheduler redis key.
SCHEDULER_QUEUE_CLASS : str
Scheduler queue class.
SCHEDULER_DUPEFILTER_KEY : str
Scheduler dupefilter redis key.
SCHEDULER_DUPEFILTER_CLASS : str
Scheduler dupefilter class.
SCHEDULER_SERIALIZER : str
Scheduler serializer.
"""
def __init__(self, server,
persist=False,
flush_on_start=False,
queue_key=defaults.SCHEDULER_QUEUE_KEY,
queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
idle_before_close=0,
serializer=None):
"""Initialize scheduler.
Parameters
----------
server : Redis
The redis server instance.
persist : bool
Whether to flush requests when closing. Default is False.
flush_on_start : bool
Whether to flush requests on start. Default is False.
queue_key : str
Requests queue key.
queue_cls : str
Importable path to the queue class.
dupefilter_key : str
Duplicates filter key.
dupefilter_cls : str
Importable path to the dupefilter class.
idle_before_close : int
Timeout before giving up.
"""
if idle_before_close < 0:
raise TypeError("idle_before_close cannot be negative")
self.server = server
self.persist = persist
self.flush_on_start = flush_on_start
self.queue_key = queue_key
self.queue_cls = queue_cls
self.dupefilter_cls = dupefilter_cls
self.dupefilter_key = dupefilter_key
self.idle_before_close = idle_before_close
self.serializer = serializer
self.stats = None
def __len__(self):
return len(self.queue)
@classmethod
def from_settings(cls, settings):##settings是傳過來的配置文件信息
kwargs = {
'persist': settings.getbool('SCHEDULER_PERSIST'),
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
}
# If these values are missing, it means we want to use the defaults.
optional = {
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
'queue_key': 'SCHEDULER_QUEUE_KEY',
'queue_cls': 'SCHEDULER_QUEUE_CLASS',
'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
# We use the default setting name to keep compatibility.
'dupefilter_cls': 'DUPEFILTER_CLASS',
'serializer': 'SCHEDULER_SERIALIZER',
}
##讀取上面的配置文件,取settings里面找到相對應的值,拿到settings后面的結果
for name, setting_name in optional.items():
val = settings.get(setting_name)##匹配settings對應的值出來(自己配置的)
if val:
kwargs[name] = val
# Support serializer as a path to a module.
if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
server = connection.from_settings(settings)##取配置文件里面讀取自己配置的連接相關的配置文件
# Ensure the connection is working.
server.ping()
return cls(server=server, **kwargs)##這里開始實例化scheduler對象,開始正式執行爬蟲,cls就是當前的類
@classmethod
def from_crawler(cls, crawler):##當你執行調度器scrapy-redis的時候,就會傳入settigs進來,配置信息是在crawler.settings
instance = cls.from_settings(crawler.settings)##crawlwe.settinsg拿到的是setting對象<scrapy.settings.Settings object at 0x00000265B2E41940>
'''可以調用里面的方法,通過crawler.settings.get("host")'''
# FIXME: for now, stats are only supported from this constructor
instance.stats = crawler.stats
return instance
def open(self, spider):
self.spider = spider
try:
self.queue = load_object(self.queue_cls)(
server=self.server,
spider=spider,
key=self.queue_key % {'spider': spider.name},
serializer=self.serializer,
)
except TypeError as e:
raise ValueError("Failed to instantiate queue class '%s': %s",
self.queue_cls, e)
try:
self.df = load_object(self.dupefilter_cls)(
server=self.server,
key=self.dupefilter_key % {'spider': spider.name},
debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
)
except TypeError as e:
raise ValueError("Failed to instantiate dupefilter class '%s': %s",
self.dupefilter_cls, e)
if self.flush_on_start:
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
def close(self, reason):
if not self.persist:
self.flush()
def flush(self):
self.df.clear()
self.queue.clear()
##開始真正執行下面的爬蟲部分了,上面的只是取讀取配置信息
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
#判斷requets里面是否封裝了dont_filter
##判斷之前是否已經存在此爬蟲
self.df.log(request, self.spider)
return False
##已經訪問過不用在訪問了,返回false
if self.stats:
##如果已經訪問過的話
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
##如果未訪問過的話,將這個requets對象,加進調度器里面,以便下載器調度使用
self.queue.push(request)
##其請求的調度其里面
return True##沒有訪問過的url,將他添加進調度器里面
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
def has_pending_requests(self):
return len(self) > 0