一,分布式爬蟲介紹
1.scrapy框架為何不能實現分布式?
其一:因為多台機器上部署的scrapy會各自擁有各自的調度器,這樣就使得多台機器無法分配start_urls列表中的url。(多台機器無法共享同一個調度器)
其二:多台機器爬取到的數據無法通過同一個管道對數據進行統一的數據持久出存儲。(多台機器無法共享同一個管道)
2.scrapy_redis實現原理
原來scrapy的Scheduler維護的是本機的任務隊列(存放Request對象及其回調函數等信息)+本機的去重隊列(存放訪問過的url地址)
所以實現分布式爬取的關鍵就是,找一台專門的主機上運行一個共享的隊列比如Redis,
然后重寫Scrapy的Scheduler,讓新的Scheduler到共享隊列存取Request,並且去除重復的Request請求,所以總結下來,實現分布式的關鍵就是三點:
#1、共享隊列 #2、重寫Scheduler,讓其無論是去重還是任務都去訪問共享隊列 #3、為Scheduler定制去重規則(利用redis的集合類型)
以上三點便是scrapy-redis組件的核心功能
安裝: pip3 install scrapy-redis #源碼: D:\python3.6\Lib\site-packages\scrapy_redis
使用
1.安裝scrapy-redis組件:
- pip install scrapy-redis
- scrapy-redis是基於scrapy框架開發出的一套組件,其作用就是可以讓scrapy實現分布式爬蟲。
2.編寫爬蟲文件:
- 同之前scrapy中基於Spider或者CrawlSpider的編寫方式一致。
3.編寫管道文件:
- 在scrapy-redis組件中已經幫助我們封裝好了一個專門用於連接存儲redis數據庫的管道(RedisPipeline),因此我們直接使用即可,無需自己編寫管道文件。
4.編寫配置文件:
- 在settings.py中開啟管道,且指定使用scrapy-redis中封裝好的管道。
ITEM_PIPELINES = { 'scrapy_redis.pipelines.RedisPipeline': 400 }
- 該管道默認會連接且將數據存儲到本機的redis服務中,如果想要連接存儲到其他redis服務中需要在settings.py中進行如下配置:
REDIS_HOST = 'redis服務的ip地址' REDIS_PORT = 6379 REDIS_ENCODING = ‘utf-8’ REDIS_PARAMS = {‘password’:’123456’}
注意:redis中的配置需要修改
.對redis配置文件進行配置: - 注釋該行:bind 127.0.0.1,表示可以讓其他ip訪問redis - 將yes該為no:protected-mode no,表示可以讓其他ip操作redis
還需要做的是要在settings中設置
使用scrapy-redis組件中封裝好的調度器,將所有的url存儲到該指定的調度器中,從而實現了多台機器的調度器共享。
# 使用scrapy-redis組件的去重隊列 DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 使用scrapy-redis組件自己的調度器 SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 是否允許暫停 SCHEDULER_PERSIST = True
redis實現分布式基本流程
# -1.將redis數據庫的配置文件進行改動: protected-mode no #bind 127.0.0.1 # 0.下載scrapy-redis # 1.創建工程 # 2.創建基於scrawlSpider的爬蟲文件 # 3.導入RedisCrawlSpider類 # 4.將start_urls更換成redis_key屬性 # 5.在現有代碼的基礎上進行連接提取和解析操作 # 6.將解析的數據值封裝到item中,然后將item對象提交到scrapy-redis組件中的管道里('scrapy_redis.pipelines.RedisPipeline': 400,) # 7.管道會將數據值寫入到指定的redis數據庫中(在配置文件中進行指定redis數據庫ip的編寫) # 8.在當前工程中使用scrapy-redis封裝好的調度器(在配置文件中進行配置) # 9.將起始url扔到調度器隊列(redis_key)中 # 10.啟動redis服務器:redis-server redis.windows.conf # 11.啟動redis-cli # 12.執行當前爬蟲文件:scrapy runspider 爬蟲文件 # 13.向隊列中扔一個起始url:在redis-cli執行扔的操作(lpush redis_key的value值 起始url)
spider.py文件:
# -*- coding: utf-8 -*- import scrapy from scrapy.linkextractors import LinkExtractor from scrapy.spiders import CrawlSpider, Rule from scrapy_redis.spiders import RedisCrawlSpider from pa02.items import Pa02Item class FbsSpider(RedisCrawlSpider): name = 'fbs' # allowed_domains = ['www.xxx.com'] # start_urls = ['http://www.xxx.com/'] redis_key = "fbs" rules = ( Rule(LinkExtractor(allow=r'/type/6-\d+.html'), callback='parse_item', follow=True), ) def parse_item(self, response): item = Pa02Item() #item['domain_id'] = response.xpath('//input[@id="sid"]/@value').get() #item['name'] = response.xpath('//div[@id="name"]').get() #item['description'] = response.xpath('//div[@id="description"]').get() item["title"] = response.xpath('/html/body/div[5]/div[1]/div[3]/ul/li/a/span[2]/p[1]/text()').extract_first() item["actor"] = response.xpath('/html/body/div[5]/div[1]/div[3]/ul/li/a/span[2]/p[2]/text()').extract_first() return item
items.py文件
import scrapy class Pa02Item(scrapy.Item): # define the fields for your item here like: # name = scrapy.Field() title = scrapy.Field() actor = scrapy.Field()
pipelines.py文件
# -*- coding: utf-8 -*- # Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html class Pa02Pipeline(object): r = None def open_spider(self, spider): import redis self.r = redis.Redis() print("開始分布式爬蟲") def process_item(self, item, spider): dic = dict(item) self.r.lpush('F8_data', dic) return item def close_spider(self, spider): print("爬蟲任務完成")
- 進入到 py文件所在的路徑 通過 代碼 scrapy runspider xxx.py 執行分布式程序,
- 啟動reids服務端(注意conf文件配置),啟動客戶端,最后向調度器隊列中仍入一個起始url: lpush redis_key "http://www.xxx.com/"
分布式爬蟲實例:http://baijiahao.baidu.com/s?id=1585117266335386391&wfr=spider&for=pc
二、增量式爬蟲
概念:通過爬蟲程序監測某網站數據更新的情況,以便可以爬取到該網站更新出的新數據。 如何進行增量式的爬取工作: 1.在發送請求之前判斷這個URL是不是之前爬取過 2.在解析內容后判斷這部分內容是不是之前爬取過 3.寫入存儲介質時判斷內容是不是已經在介質中存在 分析 不難發現,其實增量爬取的核心是去重, 至於去重的操作在哪個步驟起作用,只能說各有利弊。 在我看來,前兩種思路需要根據實際情況取一個(也可能都用)。 第一種思路適合不斷有新頁面出現的網站,比如說小說的新章節,每天的最新新聞等等; 第二種思路則適合頁面內容會更新的網站。 第三個思路是相當於是最后的一道防線。這樣做可以最大程度上達到去重的目的。 去重方法 1,將爬取過程中產生的url進行存儲,存儲在redis的set中。當下次進行數據爬取時,首先對即將要發起的請求對應的url在存儲的url的set中做判斷,如果存在則不進行請求,否則才進行請求。 2,對爬取到的網頁內容進行唯一標識的制定,然后將該唯一表示存儲至redis的set中。當下次爬取到網頁數據的時候,在進行持久化存儲之前,首先可以先判斷該數據的唯一標識在redis的set中是否存在,在決定是否進行持久化存儲。
案例一:爬取dilidili資源區的所有帖子 (在請求發出之前判斷內容有沒有爬過)
spider.py
import scrapy import hashlib import redis from scrapy.linkextractors import LinkExtractor from scrapy.spiders import CrawlSpider, Rule from pa03.items import Pa03Item class AddpaSpider(CrawlSpider): name = 'addpa' # allowed_domains = ['www.xxx.com'] start_urls = ['http://bbs.005.tv/forum.php?mod=forumdisplay&fid=709&page=1'] # 爬取dilidili資源區 文件 rules = ( Rule(LinkExtractor(allow=r'forum.php?mod=forumdisplay&fid=709&page=/d+'), callback='parse_item', follow=True), ) def parse_item(self, response): conn = redis.Redis(host="127.0.0.1",port=6379) item = Pa03Item() item["title"] = response.xpath('//table/tbody[starts-with(@id, "normalthread_")]/tr/th/a[@class="s xst"]/text()').extract_first() href = response.xpath('//table/tbody[starts-with(@id, "normalthread_")]/tr/th/a[@class="s xst"]/@href').extract_first() item["url"] = "http://bbs.005.tv/" + href item["resnum"] = response.xpath('//table/tbody[starts-with(@id, "normalthread_")]/tr/td[@class="num"]/a/text()').extract_first() #item['domain_id'] = response.path('//input[@id="sid"]/@value').get() #item['name'] = response.xpath('//div[@id="name"]').get() #item['description'] = response.xpath('//div[@id="description"]').get() source = item["title"] + item["url"] + item["resnum"] hash_value = hashlib.sha3_256(source.encode()).hexdigist() result = conn.sadd("dilidili_hash",hash_value) if result == 1: yield item else: print("數據沒有更新")
items.py
# -*- coding: utf-8 -*- # Define here the models for your scraped items # # See documentation in: # https://doc.scrapy.org/en/latest/topics/items.html import scrapy class Pa03Item(scrapy.Item): # define the fields for your item here like: # name = scrapy.Field() title = scrapy.Field() url = scrapy.Field() resnum = scrapy.Field()
pipelines.py
# -*- coding: utf-8 -*- # Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html class Pa03Pipeline(object): f = None def open_spider(self,spider): self.f = open("dilidili.txt","w",encoding="utf8") def process_item(self, item, spider): import json dic = dict(item) self.f.write(json.dumps(dic,ensure_ascii=False)) return item def close_spider(self,spider): self.f.close()
案例二: 爬取4567tv網站中喜劇片的所有電影的標題和上映年份 (在請求之前判斷url有沒有被爬過)
1. 爬蟲文件 # -*- coding: utf-8 -*- import scrapy from scrapy.linkextractors import LinkExtractor from scrapy.spiders import CrawlSpider, Rule from redis import Redis from moviePro.items import MovieproItem class MovieSpider(CrawlSpider): name = 'movie' # allowed_domains = ['www.xxx.com'] start_urls = ['https://www.4567tv.tv/index.php/vod/show/id/6/page/23.html'] rules = ( Rule(LinkExtractor(allow=r'/index.php/vod/show/id/6/page/\d+.html'), callback='parse_item', follow=True), ) # 創建redis鏈接對象 conn = Redis(host='127.0.0.1', port=6379) def parse_item(self, response): li_list = response.xpath('//li[@class="col-md-6 col-sm-4 col-xs-3"]') for li in li_list: # 獲取詳情頁的url detail_url = 'http://www.4567tv.tv' + li.xpath('./div/a/@href').extract_first() # 將詳情頁的url存入redis的set中 ex = self.conn.sadd('urls', detail_url) # 設置redis的key-value成功時,會返回1,否則返回0 if ex == 1: print('該url沒有被爬取過,可以進行數據的爬取') yield scrapy.Request(url=detail_url, callback=self.parst_detail) else: print('數據還沒有更新,暫無新數據可爬取!') # 解析詳情頁中的電影名稱和類型,進行持久化存儲 def parst_detail(self, response): item = MovieproItem() item['title'] = response.xpath('//div[@class="stui-content__detail"]/h3[@class="title"]/text()').extract_first() item['year'] = response.xpath('//div[@class="stui-content__detail"]/p[1]/a[2]/@href').extract_first() yield item 2. items.py import scrapy class MovieproItem(scrapy.Item): # define the fields for your item here like: title = scrapy.Field() year = scrapy.Field() 3. pipelines.py from redis import Redis class MovieproPipeline(object): conn = None def open_spider(self, spider): self.conn = Redis(host='127.0.0.1', port=6379) def process_item(self, item, spider): dic = { 'title': item['title'], 'year': item['year'] } print(dic) self.conn.lpush('movieData', dic) return item
這里可以觀摩一下大佬的項目案例
https://blog.csdn.net/seven_2016/article/details/72802961?tdsourcetag=s_pcqq_aiomsg