記一次scrapy-redis爬取小說網的分布式搭建過程


scrapy-redis簡介

  scrapy-redis是scrapy框架基於redis數據庫的組件,用於scrapy項目的分布式開發和部署。

  有如下特征:

  • 分布式爬取

    可以啟動多個spider工程,相互之間共享單個redis隊列

  • 分布式數據處理

    爬取到的scrapy的item數據可以推入到redis隊列中,這樣可以根據需求啟動竟可能多的處理程序來共享item的隊列,進行item數據的持久化。

  • scrapy即插即用的組件

    Scheduler調度器 + Duplication 復制過濾器,Item Pipeline 基本的spider

 

scrapy-redis架構

  scrapy-redis整體運行流程如下:

  

  1. 首先Slaver端從Master端拿任務(Request、url)進行數據抓取,Slaver抓取數據的同時,產生新任務的Request便提交給 Master 處理;

  2. Master端只有一個Redis數據庫,負責將未處理的Request去重和任務分配,將處理后的Request加入待爬隊列,並且存儲爬取的數據。

  Scrapy-Redis默認使用的就是這種策略,我們實現起來很簡單,因為任務調度等工作Scrapy-Redis都已經幫我們做好了,我們只需要繼承RedisSpider、指定redis_key就行了。

  缺點是,Scrapy-Redis調度的任務是Request對象,里面信息量比較大(不僅包含url,還有callback函數、headers等信息),

  可能導致的結果就是會降低爬蟲的速度,而且會占用Redis大量的存儲空間。

本次分布式爬取的任務以及相關配置介紹

1、單機的scrapy程序編寫

  首先先在本機編寫單機版的scrapy程序,具體程序見下面會介紹

 

2、將單機版的scrapy程序寫改為scrapy-rdeis程序

  首先是spider爬蟲程序需要將原來的繼承的類改為RedisSpider類

     其次就是之前的start_urls不需要了,而是改為redis_key=“redis數據庫壓入時的鍵的名字”

# -*- coding: utf-8 -*-
import scrapy
from scrapy_redis.spiders import RedisSpider
from datetime import datetime
from quanshu.items import NovelItem,NovelChapterItem



class NovelSpider(RedisSpider):
    name = 'novel'
    allowed_domains = ['www.quanshuwang.com']
    # start_urls = ['http://www.quanshuwang.com/list/5_1.html']

    redis_key = 'novel:start_url'

    # 解析每一頁下所有小說的鏈接
    def parse(self, response):
        # 收集小說的主頁的url地址
        novel_urls = response.xpath('//div[@class="mainnav"]//li/a/@href').extract()
        for novel_url in novel_urls:
            # print(novel_url)
            yield scrapy.Request(url=novel_url,callback=self.second_parse)

        # 在這里找到下一頁直接返回用自己的函數解析就翻頁了

        # 這是找到下一頁的url
        next_page_url = response.xpath('//div[@class="pages"]/div/a[@class="next"]/attribute::href').extract()
        # 判斷一下,如果沒有下一頁鏈接則表明該板塊爬取完畢
        if not next_page_url:
            pass
        # print(next_page_url[0])
        yield scrapy.Request(url=next_page_url[0],callback=self.parse)

    # 解析每一個小說的簡介 並進入下一頁 去獲取沒一個章節的小說鏈接
    def second_parse(self,response):

        item = NovelItem()
        item['title'] = response.xpath('//div[@class="b-info"]//h1/text()').extract_first().strip()
        # print(item['title'])
        item['author'] = response.xpath('//div[@class="bookDetail"]/dl[2]/dd/text()').extract_first().strip()
        item['sort'] = response.xpath('//div[@class="main-index"]/a[last()]/text()').extract_first().strip()
        item['intro'] = response.xpath('//div[@id="waa"]/text()').extract_first().strip()
        item['status'] = response.xpath('//div[@class="bookDetail"]/dl[1]/dd/text()').extract_first()
        item['url'] = response.url
        item['c_time'] = datetime.now()

        # 下面的可以寫到管道類中
        if item['sort']:
            # 其他的都要strip一下
            item['sort'] = item['sort'].strip()
        else:
            item['sort'] = '其他'

        # 查詢章節列表url
        chapters_info_url = response.xpath('//a[@class="reader"]/@href').extract_first()
        yield scrapy.Request(chapters_info_url,callback=self.thrid_parse,meta={'item':item})


    def thrid_parse(self,response):

        # print(resopnse.text)
        novel_item = response.meta['item']
        # 提取小說章節信息
        lias = response.xpath('//div[@class="chapterNum"]/ul//li/a')

        # 獲取小說章節名稱和章節url 鏈接的一個列表里面是元組
        chapter_infos = [(a.xpath('./text()').extract_first(),a.xpath('./@href').extract_first()) for a in lias]
        novel_item['chapter_info'] = chapter_infos
        yield novel_item

        # 查詢數據庫
        sql = "select id,url from chapter where content is null "
        self.cursor.execute(sql)
        for item in self.cursor.fetchall():
            yield scrapy.Request(item[1],callback=self.four_parse,meta={"id":item[0]})
    #
    #
    def four_parse(self,response):
        item = NovelChapterItem()
        item['content'] =''.join(response.xpath('//*[@id="content"]/text()').extract())
        item['chapter_id'] = response.meta['id']
        item['url'] = response.url
        # print(item['content'])
        yield item

 

 

  在DownloaderMiddleware中不需要進行修改,這里只加了一個隨機請求頭的中間件

class RandomUserAgent(object):
    # 如何運行此中間件? settings 直接添加就OK

    def process_request(self, request, spider):
        # # 在請求頭里設置ua

        ua = UserAgent(verify_ssl=False).random
        request.headers["User-Agent"] = ua

  然后是items和pipelines這里都不需要修改代碼如下:

import scrapy



class NovelItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()

    # 小說名稱
    title = scrapy.Field()
    # 小說作者
    author = scrapy.Field()
    # 小說類別
    sort = scrapy.Field()
    # 小說簡介
    intro = scrapy.Field()
    # 小說狀態(如連載和完結)
    status = scrapy.Field()
    # 小說爬取時間
    c_time = scrapy.Field()
    # 小說網址鏈接
    url = scrapy.Field()

    chapter_info = scrapy.Field()


class NovelChapterItem(scrapy.Item):
    content = scrapy.Field()
    chapter_id = scrapy.Field()
    url = scrapy.Field()

 

 

import pymysql
from quanshu.items import NovelItem,NovelChapterItem
import logging
from scrapy.exceptions import DropItem

logger = logging.getLogger(__name__)
class QuanshuPipeline(object):


    def process_item(self, item, spider):
        if isinstance(item, NovelItem):
            sql = "select id from novel where title=%s and author = %s"
            self.cursor.execute(sql,(item['title'],item['author']))
            if self.cursor.fetchone():
                pass
            else:
                try:
                    # 寫入小說信息
                    sql = 'insert into novel (title,author,sort,intro,status,c_time,url) values(%s,%s,%s,%s,%s,%s,%s)'
                    self.cursor.execute(sql,
                                        (item['title'],
                                        item['author'],
                                        item['sort'],
                                        item['intro'],
                                        item['status'],
                                        item['c_time'],
                                        item['url'],
                                        ))
                    self.conn.commit()
                    # 章節信息
                    novel_id  = self.cursor.lastrowid
                    sql = 'insert into chapter (novel_id,title,ord,c_time,url) values '
                    for index ,info in enumerate(item['chapter_info']):
                        title, url = info
                        temp = '(%s,"%s",%s,"%s","%s"),'%(novel_id,title.replace(" ",""),index,item['c_time'],url)
                        sql += temp
                    sql = sql[:-1]
                    # print(sql)
                    try:
                        self.cursor.execute(sql)
                        self.conn.commit()
                    except Exception as e:
                        self.conn.rollback()
                        logger.warning('小說章節信息寫入數據庫錯誤 url=%s %s'%(url,e) )
                except Exception as e:
                    self.conn.rollback()
                    logger.warning('小說信息寫入錯誤 url=%s %s'%(item['url'],e))
        elif isinstance(item,NovelChapterItem):
            sql = 'update chapter set content=%s where  id = %s'
            try:
                self.cursor.execute(sql,(item['content'],item['chapter_id']))
                self.conn.commit()
                # print('小說內容寫入成功')
            except Exception as e:
                # print('失敗')
                self.conn.rollback()
                logger.warning('小說內容寫入數據庫信息錯誤 url=%s %s'%(item['url'],e))
        else:
            raise DropItem


    # 設置爬蟲一開始就鏈接數據庫
    def open_spider(self,spider):
        data_config = spider.settings["DATABASE_CONFIG"]
        # print(data_config)
        if data_config['type'] == 'mysql':
            self.conn = pymysql.connect(**data_config['config'])
            self.cursor = self.conn.cursor()
            spider.conn = self.conn
            spider.cursor = self.cursor


    # 設置爬蟲一結束就關閉數據庫鏈接
    def close_spider(self,spider):
        data_config = spider.settings["DATABASE_CONFIG"]
        if data_config['type'] == 'mysql':
            self.cursor.close()
            self.conn.close()

 

 

 

   下面settings文件的修改,這里是最重要的,先看代碼:

# scrapy-redis 的配置
# 配置調度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 重復過濾
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# redis 配置
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
REDIS_PARAMS = {
'password': '123456',
}

# 數據保存數據庫的配置
DATABASE_CONFIG = {
    'type': 'mysql',
    'config':{
        'host':'127.0.0.1',
        'port':3306,
        'user':'root',
        'password':'root',
        'db':'novel',
        'charset':'utf8',
    }
}

  上面這部分是redis相關的配置,首先要使用scrapy-redis調度器組件和重復過濾組件,然后配置redis數據庫的相關信息,還有數據入庫mysql數據庫相關的配置

DOWNLOADER_MIDDLEWARES = {
   # 'quanshu.middlewares.QuanshuDownloaderMiddleware': 543,
   #  'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
    #隨機請求頭中間件
    'quanshu.middlewares.RandomUserAgent': 200,
}

ITEM_PIPELINES = {
   'quanshu.pipelines.QuanshuPipeline': 300,
    # 這里不用scrapy_redis的pipeline我們直接存到mysql中
}

 

  上面的兩個配置第一個是之前寫好的隨機請求頭中間件的啟用,下面的是pipelines的選用,注意這里我們選擇的是scrapy本來的pipelines而沒有選擇scrapy-redis的pipelines,具體原因下面會說。

到這里基本的scrapy-redis程序就准備好了。

部署爬蟲程序到服務器

  本次准備一個Master端加七個Slaver端,下面是本次運行環境:

  • 服務器系統:centos7.x版本
  • python版本:python3,6
  • fake-useragent:0.1.11
  • pymysql :0.9.3
  • Scrapy : 1.6.0
  • Scrapy-rdeis : 0.6.8
  • redis : 3.2.1
  • Twisted : 19.2.0

  需要注意的是在配置安裝服務器環境的時候Twisted最好下載然后本地安裝,好幾台在線安裝都失敗。

  這里由於自己的主節點的服務器的內存過小,所以redis中值存放了request請求隊列和url指紋去重隊列,這也是為什么沒有使用scrapy-redis中的pipelines的原因。

  環境准備好之后就可以再每個從節點上運行scrapy-redis程序了

  啟動scrapy-redis程序之后查看日志,每個scrapy-redis都會停在這里等待主節點發送任務。

  此時在主節點發送需要爬取的網址url,將url壓入redis數據庫即可,代碼如下

import redis   # 導入redis模塊,通過python操作redis 也可以直接在redis主機的服務端操作緩存數據庫
r = redis.Redis(host='***.***.***.***', port=6379, password=123456, decode_responses=True)   # host是redis主機,需要redis服務端和客戶端都啟動 redis默認端口是6379
 for j in range(1,12):
     for i in range(20): 
     r.lpush("novel:start_url","http://www.quanshuwang.com/list/%s_%s.html"%(j,i+1))

 

 這樣,每個子節點拿到請求的url之后就能開始爬去工作了,當然每次爬取過程中子節點還會把需要下次請求的request再次寫入redis數據庫中。

到這里本次的分布式部署就結束了。

說一說此次遇到的問題以及一些解決思路

  1.  由於時間的關系本次只是簡單的部署了一下,下次希望改進成scrapyd進行部署,至於為什么,當你需要監控爬蟲狀態和日志的時候你就知道了。每次一台一台服務器的查看,真的很麻煩。
  2.  由於沒有使用scrapy-redis中的pipelines而是直接在每個子節點直接把數據寫入到數據庫,但是由於爬蟲爬去的速度和下載內容的速度遠大於向數據庫中寫入的速度,也是因為這樣出現了好多問題,比如數據寫着寫着會丟失,還有就是mysql數據庫會莫名的報錯,具體的解決辦法,可以將每台子節點寫入數據庫的程序寫成異步的twisted模塊中好像有這個功能,由於其他是耽誤了所以也沒嘗試,不過網上看了好多資料,好像可以,不過異步寫入數據庫可能數據會亂,還有就是設置延遲將爬蟲爬取的速度降下來,使用bs4解析模塊,不要用xpath解析。因為xpath比bs4速度快。還有就是主節點的服務器配置弄好點,直接把數據先保存在redis數據庫中,然后在通過其他python程序去取數據就好了。
  3. 在爬蟲運行過程種出現了好幾次redis保存問題

    

    

    

    前兩張是從節點scrapy-redis程序報錯,最后是主節點redis日志報錯信息,這是redis在數據持久化寫入文件沒有權限問題,解決辦法將目標文件打開權限即可。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM