分布式爬蟲
搭建一個分布式的集群,讓其對一組資源進行分布聯合爬取,提升爬取效率
如何實現分布式
1.scrapy框架是否可以自己實現分布式?
不可以!!!
其一:因為多台機器上部署的scrapy會各自擁有各自的調度器,這樣就使得多台機器無法分配start_urls列表中的url。(多台機器無法共享同一個調度器)
其二:多台機器爬取到的數據無法通過同一個管道對數據進行統一的數據持久化存儲。(多台機器無法共享同一個管道)
2.基於scrapy-redis組件的分布式爬蟲
- scrapy-redis組件中為我們封裝好了可以被多台機器共享的調度器和管道,我們可以直接使用並實現分布式數據爬取。
- 實現方式:
1.基於該組件的RedisSpider類
2.基於該組件的RedisCrawlSpider類
3.分布式實現流程:上述兩種不同方式的分布式實現流程是統一的
- 3.1 下載scrapy-redis組件:pip install scrapy-redis
- 3.2 redis配置文件的配置:
- linux或者mac:redis.conf - windows:redis.windows.conf 修改 - 注釋該行:bind 127.0.0.1,表示可以讓其他ip訪問redis - 將yes改為no: protected-mode no,表示可以讓其他ip操作redis
3.3 修改爬蟲文件中的相關代碼:
- 將爬蟲類的父類修改成基於RedisSpider或者RedisCrawlSpider。
注意:如果原始爬蟲文件是基於Spider的,則應該將父類修改成RedisSpider,如果原始爬蟲文件是基於CrawlSpider的,則應該將其父類修改成RedisCrawlSpider。
- 注釋或者刪除start_urls列表,且加入redis_key屬性,屬性值為scrpy-redis組件中調度器隊列的名稱
3.4 在配置文件中進行相關配置,開啟使用scrapy-redis組件中封裝好的管道
ITEM_PIPELINES = { 'scrapy_redis.pipelines.RedisPipeline': 400 }
3.5 在配置文件(setting)中進行相關配置,開啟使用scrapy-redis組件中封裝好的調度器
# 增加了一個去重容器類的配置, 作用使用Redis的set集合來存儲請求的指紋數據, 從而實現請求去重的持久化 DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 使用scrapy-redis組件自己的調度器 SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 配置調度器是否要持久化, 也就是當爬蟲結束了, 要不要清空Redis中請求隊列和去重指紋的set。如果是True, 就表示要持久化存儲, 就不清空數據, 否則清空數據 SCHEDULER_PERSIST = True
3.6 在配置文件中進行爬蟲程序鏈接redis的配置:
REDIS_HOST = 'redis服務的ip地址' REDIS_PORT = 6379 REDIS_ENCODING ='utf-8' REDIS_PARAMS = {'password':'xx'}
3.7 開啟redis服務器:redis-server 配置文件
3.8 開啟redis客戶端:redis-cli (-h ip -p 6379)
3.9 運行爬蟲文件:scrapy runspider SpiderFile(x.py)
3.10 向調度器隊列中扔入一個起始url(在redis客戶端中操作):lpush redis_key屬性值 起始url
分布式爬蟲案例
需求:爬取網易新聞國內,國際,軍事等板塊內容
基於RedisSpider實現的分布式爬蟲
selenium如何被應用到scrapy
a) 在爬蟲文件中導入webdriver類
b) 在爬蟲文件的爬蟲類的構造方法中進行了瀏覽器實例化的操作
c) 在爬蟲類的closed方法中進行瀏覽器關閉的操作
d) 在下載中間件的process_response方法中編寫執行瀏覽器自動化的操作
spider文件
# -*- coding: utf-8 -*- import scrapy from selenium import webdriver from selenium.webdriver.chrome.options import Options from scrapy_redis.spiders import RedisCrawlSpider from wangSpider.items import WangspiderItem class WangSpider(RedisCrawlSpider): name = 'wang' # allowed_domains = ['www.xxx.com'] # start_urls = ['https://news.163.com/'] # 調度器隊列的名稱 redis_key='wangyiurl' def __init__(self): options = webdriver.ChromeOptions() options.add_argument('--window-position=0,0'); # chrome 啟動初始位置 options.add_argument('--window-size=1080,800'); # chrome 啟動初始大小 self.bro = webdriver.Chrome( executable_path='C://xx/PycharmProjects/djnago_study/spider/wangSpider/chromedriver.exe', chrome_options=options) def parse(self, response): lis = response.xpath('//div[@class="ns_area list"]/ul/li') li_list = [] # 存儲國內、國際、軍事、航空 indexs = [3,4,6,7] for index in indexs: li_list.append(lis[index]) # 獲取四個板塊的文字標題和url for li in li_list: url = li.xpath('./a/@href').extract_first() title = li.xpath('./a/text()').extract_first() # print(url+":"+title) # 對每一個板塊對應的url發起請求,獲取頁面數據(標題,縮略圖,關鍵字,發布時間,url) yield scrapy.Request(url=url, callback=self.parseSecond, meta={'title': title}) def parseSecond(self, response): div_list = response.xpath('//div[@class="data_row news_article clearfix "]') # print(len(div_list)) for div in div_list: head = div.xpath('.//div[@class="news_title"]/h3/a/text()').extract_first() url = div.xpath('.//div[@class="news_title"]/h3/a/@href').extract_first() imgUrl = div.xpath('./a/img/@src').extract_first() tag = div.xpath('.//div[@class="news_tag"]//text()').extract() tags = [] for t in tag: t = t.strip(' \n \t') tags.append(t) tag = "".join(tags) # 獲取meta傳遞過來的數據值title title = response.meta['title'] # 實例化item對象,將解析到的數據值存儲到item對象中 item = WangspiderItem() item['head'] = head item['url'] = url item['imgUrl'] = imgUrl item['tag'] = tag item['title'] = title # 對url發起請求,獲取對應頁面中存儲的新聞內容數據 yield scrapy.Request(url=url, callback=self.getContent, meta={'item': item}) # print(head+":"+url+":"+imgUrl+":"+tag) def getContent(self,response): #獲取傳遞過來的item item = response.meta['item'] #解析當前頁面中存儲的新聞數據 content_list = response.xpath('//div[@class="post_text"]/p/text()').extract() content = "".join(content_list) item['content'] = content yield item def closed(self,spider): self.bro.quit()
中間件文件
from scrapy import signals from scrapy.http import HtmlResponse import time from scrapy.contrib.downloadermiddleware.useragent import UserAgentMiddleware import random #UA池代碼的編寫(單獨給UA池封裝一個下載中間件的一個類) #1,導包UserAgentMiddlware類 class RandomUserAgent(UserAgentMiddleware): def process_request(self, request, spider): #從列表中隨機抽選出一個ua值 ua = random.choice(user_agent_list) #ua值進行當前攔截到請求的ua的寫入操作 request.headers.setdefault('User-Agent',ua) #批量對攔截到的請求進行ip更換 class Proxy(object): def process_request(self, request, spider): #對攔截到請求的url進行判斷(協議頭到底是http還是https) #request.url返回值:http://www.xxx.com h = request.url.split(':')[0] #請求的協議頭 if h == 'https': ip = random.choice(PROXY_https) request.meta['proxy'] = 'https://'+ip else: ip = random.choice(PROXY_http) request.meta['proxy'] = 'http://' + ipclass WangspiderDownloaderMiddleware(object): # Not all methods need to be defined. If a method is not defined, # scrapy acts as if the downloader middleware does not modify the # passed objects. @classmethod def from_crawler(cls, crawler): # This method is used by Scrapy to create your spiders. s = cls() crawler.signals.connect(s.spider_opened, signal=signals.spider_opened) return s def process_request(self, request, spider): # Called for each request that goes through the downloader # middleware. # Must either: # - return None: continue processing this request # - or return a Response object # - or return a Request object # - or raise IgnoreRequest: process_exception() methods of # installed downloader middleware will be called return None # 攔截到響應對象(下載器傳遞給Spider的響應對象) # request:響應對象對應的請求對象 # response:攔截到的響應對象 # spider:爬蟲文件中對應的爬蟲類的實例 def process_response(self, request, response, spider): # 響應對象中存儲頁面數據的篡改 if request.url in ['http://news.163.com/domestic/', 'http://news.163.com/world/', 'http://news.163.com/air/', 'http://war.163.com/']: spider.bro.get(url=request.url) js = 'window.scrollTo(0,document.body.scrollHeight)' spider.bro.execute_script(js) time.sleep(3) # 一定要給與瀏覽器一定的緩沖加載數據的時間 # 頁面數據就是包含了動態加載出來的新聞數據對應的頁面數據 page_text = spider.bro.page_source # 篡改響應對象 return HtmlResponse(url=spider.bro.current_url, body=page_text, encoding='utf-8', request=request) else: return response def process_exception(self, request, exception, spider): # Called when a download handler or a process_request() # (from other downloader middleware) raises an exception. # Must either: # - return None: continue processing this exception # - return a Response object: stops process_exception() chain # - return a Request object: stops process_exception() chain pass def spider_opened(self, spider): spider.logger.info('Spider opened: %s' % spider.name) PROXY_http = [ '153.180.102.104:80', '195.208.131.189:56055', ] PROXY_https = [ '120.83.49.90:9000', '95.189.112.214:35508', ] user_agent_list = [ "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 " "(KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1", "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 " "(KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 " "(KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 " "(KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 " "(KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/536.5 " "(KHTML, like Gecko) Chrome/19.0.1084.9 Safari/536.5", "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/536.5 " "(KHTML, like Gecko) Chrome/19.0.1084.36 Safari/536.5", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 " "(KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.3 " "(KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/536.3 " "(KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 " "(KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 " "(KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 " "(KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 " "(KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.3 " "(KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 " "(KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 " "(KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 " "(KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24" ]
管道類文件
因為網絡的下載速度和數據庫的I/O速度是不一樣的
所以有可能會發生下載快,但是寫入數據庫速度慢,造成線程的堵塞;
這里采用異步寫入數據庫
#方法2:采用異步的機制寫入mysql import pymysql.cursors from twisted.enterprise import adbapi from twisted.enterprise import adbapi class MysqlTwistedPipeline(object): ''' 異步機制將數據寫入到mysql數據庫中 ''' # 創建初始化函數,當通過此類創建對象時首先被調用的方法 def __init__(self, dbpool): self.dbpool = dbpool # 創建一個靜態方法,靜態方法的加載內存優先級高於init方法, # 在創建這個類的對之前就已將加載到了內存中,所以init這個方法可以調用這個方法產生的對象 @classmethod # 名稱固定的 def from_settings(cls, settings): # 先將setting中連接數據庫所需內容取出,構造一個地點 dbparms = dict( host=settings["MYSQL_HOST"], db=settings["MYSQL_DBNAME"], user=settings["MYSQL_USER"], password=settings["MYSQL_PASSWORD"], charset='utf8mb4', # 游標設置 cursorclass=pymysql.cursors.DictCursor, # 設置編碼是否使用Unicode use_unicode=True ) # 通過Twisted框架提供的容器連接數據庫,pymysql是數據庫模塊名 dbpool = adbapi.ConnectionPool("pymysql", **dbparms) # 無需直接導入 dbmodule. 只需要告訴 adbapi.ConnectionPool 構造器你用的數據庫模塊的名稱比如pymysql. return cls(dbpool) def process_item(self, item, spider): # 使用Twisted異步的將Item數據插入數據庫 query = self.dbpool.runInteraction(self.do_insert, item) query.addErrback(self.handle_error, item, spider) # 這里不往下傳入item,spider,handle_error則不需接受,item,spider) def do_insert(self, cursor, item): # 執行具體的插入語句,不需要commit操作,Twisted會自動進行 insert_sql = """ insert into wangyi_content(head,url,imgUrl,tag,title,content ) VALUES(%s,%s,%s,%s,%s,%s) """ cursor.execute(insert_sql, (item["head"], item["url"], item["imgUrl"], item["tag"], item["title"], item["content"])) def handle_error(self, failure, item, spider): # 異步插入異常 if failure: print(failure)
item文件
import scrapy class WangspiderItem(scrapy.Item): # define the fields for your item here like: # name = scrapy.Field() head = scrapy.Field() url = scrapy.Field() imgUrl = scrapy.Field() tag = scrapy.Field() title = scrapy.Field() content = scrapy.Field() pass
setting文件
BOT_NAME = 'wangSpider' SPIDER_MODULES = ['wangSpider.spiders'] NEWSPIDER_MODULE = 'wangSpider.spiders' # Crawl responsibly by identifying yourself (and your website) on the user-agent #USER_AGENT = 'firstBlood (+http://www.yourdomain.com)' USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36' # 偽裝請求載體身份 # Obey robots.txt rules # ROBOTSTXT_OBEY = True ROBOTSTXT_OBEY = False #可以忽略或者不遵守robots協議 #只顯示指定類型的日志信息 LOG_LEVEL='ERROR' # Configure maximum concurrent requests performed by Scrapy (default: 16) #CONCURRENT_REQUESTS = 32 # Configure a delay for requests for the same website (default: 0) # See https://doc.scrapy.org/en/latest/topics/settings.html#download-delay # See also autothrottle settings and docs #DOWNLOAD_DELAY = 3 # The download delay setting will honor only one of: #CONCURRENT_REQUESTS_PER_DOMAIN = 16 #CONCURRENT_REQUESTS_PER_IP = 16 # Disable cookies (enabled by default) #COOKIES_ENABLED = False # Disable Telnet Console (enabled by default) #TELNETCONSOLE_ENABLED = False DOWNLOADER_MIDDLEWARES = { 'wangSpider.middlewares.WangspiderDownloaderMiddleware': 543, 'wangSpider.middlewares.RandomUserAgent': 542, # 'wangSpider.middlewares.Proxy': 541, } ITEM_PIPELINES = { 'wangSpider.pipelines.MysqlTwistedPipeline': 300, 'scrapy_redis.pipelines.RedisPipeline': 400 } REDIS_HOST = '192.168.31.125' REDIS_PORT = 6379 REDIS_ENCODING = 'utf-8' MYSQL_HOST = 'xx' MYSQL_DBNAME = 'spider_db' MYSQL_USER = 'root' MYSQL_PASSWORD = 'xx' # 增加了一個去重容器類的配置, 作用使用Redis的set集合來存儲請求的指紋數據, 從而實現請求去重的持久化 DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 使用scrapy-redis組件自己的調度器 SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 配置調度器是否要持久化, 也就是當爬蟲結束了, 要不要清空Redis中請求隊列和去重指紋的set。如果是True, 就表示要持久化存儲, 就不清空數據, 否則清空數據 SCHEDULER_PERSIST = True
執行
scrapy runspider wang.py
每次重新爬取需要將redis清空。當爬蟲停止后,redis數據庫中只剩下dupefilter 和 items。dupefilter 中保存的是已經爬取過的url的md5值。如果未清空,爬蟲會過濾掉這些網址,不進行該網址的爬取。因此,如果需要重新爬取,需要將redis清空
結果