setting文件
為什么項目中需要配置文件
在配置文件中存放一些公共變量,在后續的項目中便便修改,注意其中的變量名一般全部大寫
配置文件中的變量使用方法
導入即可使用
settings.py中的重點字段和內涵
USER_AGENT 設置ua
ROBOTSTXT_OBEY 是否遵守robots協議,默認是遵守
CONCURRENT_REQUESTS 設置並發請求的數量,默認是16個
DOWNLOAD_DELAY 下載延遲,默認無延遲
COOKIES_ENABLED 是否開啟cookie,即每次請求帶上前一次的cookie,默認是開啟的
DEFAULT_REQUEST_HEADERS 設置默認請求頭
SPIDER_MIDDLEWARES 爬蟲中間件,設置過程和管道相同
DOWNLOADER_MIDDLEWARES 下載中間件
隨機更換user-agent
每次url請求更換一次user-agent
pip install fake-useragent
settings
DOWNLOADER_MIDDLEWARES = { # 'ArticleSpider.middlewares.MyCustomDownloaderMiddleware': 543, 'ArticleSpider.middlewares.RandomUserAgentMiddleware': 400, }
middlewares
from fake_useragent import UserAgent class RandomUserAgentMiddleware(object): def __init__(self, crawler): super(RandomUserAgentMiddleware, self).__init__() self.ua = UserAgent() # 若settings中沒有設置RANDOM_UA_TYPE的值默認值為random, # 從settings中獲取RANDOM_UA_TYPE變量,值可以是 random ie chrome firefox safari opera msie self.ua_type = crawler.settings.get('RANDOM_UA_TYPE', 'random') @classmethod def from_crawler(cls, crawler): return cls(crawler) def process_request(self, request, spider): def get_ua(): '''根據settings的RANDOM_UA_TYPE變量設置每次請求的User-Agent''' return getattr(self.ua, self.ua_type) ua = get_ua() request.headers.setdefault('User-Agent', get_ua())
ip代理
方案一:免費版
自定義函數獲取網上的一些免費代理ip
settings
DOWNLOADER_MIDDLEWARES = { 'ArticleSpider.middlewares.RandomProxyMiddleware': 400, }
middlewares
class RandomProxyMiddleware(object): #動態設置ip代理 def process_request(self, request, spider): request.meta["proxy"] = get_random_ip() # 這個自定義函數返回一個隨機代理ip:port
方案二:收費版
github上scrapy-proxies等等
在線打碼
編碼識別:由於驗證碼識別難度大,而且易更新,所以編碼識別驗證碼(不推薦)
在線打碼:調用已經開發好的在線驗證碼識別軟件接口識別驗證碼。識別率在90%以上,並且效率高(推薦)
人工打碼:識別率近100%,但是成本高(用於復雜的)
cookie
1.一些網站會跟蹤cookie,如果不需要登陸的網站,可禁用cookie,降低被ban概率,scrapy默認開啟cookie
COOKIES_ENABLED = False
2.配合其他程序使用,比如其使用selenium把登陸之后的cookie獲取到保存到本地,scrapy發送請求之前先讀取本地cookie

import scrapy import re class RenrenSpider(scrapy.Spider): name = 'renren' allowed_domains = ['renren.com'] start_urls = ['http://www.renren.com/941954027/profile'] def start_requests(self): cookie_str = "cookie_str" cookie_dict = {i.split("=")[0]:i.split("=")[1] for i in cookie_str.split("; ")} yield scrapy.Request( self.start_urls[0], callback=self.parse, cookies=cookie_dict, # headers={"Cookie":cookie_str} ) def parse(self, response): ret = re.findall("新用戶287",response.text) print(ret) yield scrapy.Request( "http://www.renren.com/941954027/profile?v=info_timeline", callback=self.parse_detail ) def parse_detail(self,response): ret = re.findall("新用戶287",response.text) print(ret)
自動限速
調整某些參數,如
AUTOTHROTTLE_ENABLED = True DOWNLOAD_DELAY = 3
selenium
官方文檔 http://selenium-python-docs-zh.readthedocs.io/zh_CN/latest/
作用:瀏覽器操控
安裝selenium
pip install selenium
下載對應瀏覽器的驅動
http://selenium-python.readthedocs.io/installation.html
第三方(微博)登錄知乎
import time from selenium import webdriver from scrapy.selector import Selector browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") time.sleep(2) # 延時為了讓頁面加載完 browser.get("https://www.zhihu.com/#signin") browser.find_element_by_css_selector(".qrcode-signin-cut-button").click() browser.find_element_by_css_selector(".signup-social-buttons").click() browser.find_element_by_css_selector(".js-bindweibo").click() #browser.switch_to.window(browser.window_handles[-1]) browser.find_element_by_css_selector(".WB_iptxt").send_keys("xxx") browser.find_element_by_css_selector("input[node-type='passwd']").send_keys("xxx") browser.find_element_by_css_selector("a[node-type='submit']").click() time.sleep(2) # 延時為了讓頁面加載完 browser.find_element_by_css_selector("a[node-type='submit']").click()
第三方(QQ)登錄知乎
代碼維護 https://github.com/ecithy/selenium-QQ
# -*- coding: utf-8 -*- __author__ = 'hy' import time from selenium import webdriver from scrapy.selector import Selector browser = webdriver.Firefox(executable_path="D:/Package/geckodriver.exe") # browser.get("https://www.zhihu.com/#signin") time.sleep(2) # 點擊QQ browser.find_element_by_css_selector(".qrcode-signin-cut-button").click() browser.find_element_by_css_selector(".signup-social-buttons").click() time.sleep(2) browser.find_element_by_css_selector(".js-bindqq").click() time.sleep(5) browser.switch_to.window(browser.window_handles[-1]) browser.switch_to.frame("ptlogin_iframe") # iframe必須逐級切入 # 用戶名 密碼 # 隱藏初始界面 browser.execute_script('document.getElementById("qlogin").style="display: none;"') browser.execute_script('document.getElementsByClassName("authLogin").style="display: none;"') # 顯示用戶、密碼輸入界面 browser.execute_script('document.getElementById("web_qr_login").style="display: block;"') # browser.evaluate_script('document.getElementById("batch_quto").contentEditable = true') time.sleep(5) # 輸入用戶、密碼 elem_user = browser.find_element_by_name("u").send_keys("xxx") elem_pwd = browser.find_element_by_name("p").send_keys("xxx") elem_but = browser.find_element_by_id("login_button").click() time.sleep(5)
scrapy集成selenium
為什么集成selenium
selenium取代下載器,編碼難度大的操作交給selenium
優點:反爬蟲難度大
缺點:同步selenium效率低,需要結合Twisted成異步
middleware方式
方式一
settings
DOWNLOADER_MIDDLEWARES = { 'ArticleSpider.middlewares.JSPageMiddleware':1, }
middlewares
from selenium import webdriver from scrapy.http import HtmlResponse import time class JSPageMiddleware(object): def __init__(self): # 使用同一個self,保證只打開一個瀏覽器,所有spider使用一個瀏覽器 self.browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") super(JSPageMiddleware, self).__init__() # 通過chrome請求動態網頁 def process_request(self, request, spider): if spider.name == "jobbole": # self.browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") self.browser.get(request.url) time.sleep(1) print("訪問:{0}".format(request.url)) # browser.quit() return HtmlResponse(url=self.browser.current_url, body=self.browser.page_source, encoding="utf-8", request=request)
方式二
middlewares
from scrapy.http import HtmlResponse import time class JSPageMiddleware(object): # 通過chrome請求動態網頁 def process_request(self, request, spider): if spider.name == "jobbole": # self.browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") spider.browser.get(request.url) time.sleep(1) print("訪問:{0}".format(request.url)) # browser.quit() return HtmlResponse(url=spider.browser.current_url, body=spider.browser.page_source, encoding="utf-8", request=request)
spider
from selenium import webdriver from scrapy.xlib.pydispatch import dispatcher from scrapy import signals class JobboleSpider(scrapy.Spider): name = 'jobbole' allowed_domains = ['blog.jobbole.com'] start_urls = ['http://blog.jobbole.com/all-posts/'] def __init__(self): # 使用同一個self,每個spider使用一個瀏覽器 self.browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") super(JobboleSpider, self).__init__() dispatcher.connect(self.spider_closed, signals.spider_closed) # 爬蟲關閉后 def spider_closed(self, spider): self.browser.quit()
scrapy集成selenium/phthomjs模擬登錄
為什么不直接用selenium替代原生下載器?
selenium是同步的方式,如果每個頁面采用selenium則導致爬蟲效率極低,目前並沒有scrapy中的Twisted結合selenium的異步方案,因此selenium不推薦替代原生下載器
scrapy集成selenium能做什么?
由於模擬登錄是編碼很難解決的問題 ,因此采用selenium解決;其它頁面繼續用原生下載器的異步下載方案
# -*- coding: utf-8 -*- import re import datetime try: import urlparse as parse except: from urllib import parse import scrapy from selenium import webdriver import time class ZhihuSpider(scrapy.Spider): name = "zhihu" allowed_domains = ["www.zhihu.com"] start_urls = ['https://www.zhihu.com/'] login_cookies = [] headers = { "HOST": "www.zhihu.com", "Referer": "https://www.zhizhu.com", 'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0" } # selenium登錄保存cookies def get_cookies(self): browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") time.sleep(2) # 延時為了讓頁面加載完 browser.get("https://www.zhihu.com/#signin") browser.find_element_by_css_selector(".qrcode-signin-cut-button").click() browser.find_element_by_css_selector(".signup-social-buttons").click() browser.find_element_by_css_selector(".js-bindweibo").click() # browser.switch_to.window(browser.window_handles[-1]) browser.find_element_by_css_selector(".WB_iptxt").send_keys("xxx") browser.find_element_by_css_selector("input[node-type='passwd']").send_keys("xxx") browser.find_element_by_css_selector("a[node-type='submit']").click() time.sleep(2) # 延時為了讓頁面加載完 browser.find_element_by_css_selector("a[node-type='submit']").click() login_cookies = browser.get_cookies() browser.close() # 第一步:先於parse方法執行,處理登陸邏輯。可以猜測,start_requests攜帶的cookie會給后續所有的訪問自動帶上 def start_requests(self): return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, cookies=self.login_cookies, callback=self.parse)] # 第二步:處理登陸后的邏輯 def parse(self, response): my_url= 'https://www.zhihu.com/people/edit' # 該頁面是個人中心頁,只有登錄后才能訪問 yield scrapy.Request(my_url, headers=self.headers)
爬取知乎文章和問答
scrapy shell調試
scrapy shell -s USER_AGENT="Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0" https://www.zhihu.com/question/56320032
頁面分析
chrome安裝jsonview插件
xhr頁面查看json數據,這樣獲取數據更輕松
表設計
為了避免可能解析不到的字段或無法插入的情況,需要給字段設置默認值

ITEM_PIPELINES = { 'ArticleSpider.pipelines.MysqlTwistedPipline': 1, } MYSQL_HOST = "127.0.0.1" MYSQL_DBNAME = "article_spider" MYSQL_USER = "root" MYSQL_PASSWORD = "root" SQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" SQL_DATE_FORMAT = "%Y-%m-%d" ES_HOST = "127.0.0.1"

import datetime import re import scrapy from scrapy.loader import ItemLoader from scrapy.loader.processors import MapCompose, TakeFirst, Join from .utils.common import extract_num from .settings import SQL_DATETIME_FORMAT, SQL_DATE_FORMAT from w3lib.html import remove_tags class ZhihuQuestionItem(scrapy.Item): #知乎的問題 item zhihu_id = scrapy.Field() topics = scrapy.Field() url = scrapy.Field() title = scrapy.Field() content = scrapy.Field() answer_num = scrapy.Field() comments_num = scrapy.Field() watch_user_num = scrapy.Field() click_num = scrapy.Field() crawl_time = scrapy.Field() def get_insert_sql(self): #插入知乎question表的sql語句 insert_sql = """ insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num), watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num) """ # 方式二:將item loader的item數組形式在這里處理 zhihu_id = self["zhihu_id"][0] topics = ",".join(self["topics"]) url = self["url"][0] title = "".join(self["title"]) content = "".join(self["content"]) answer_num = extract_num("".join(self["answer_num"])) comments_num = extract_num("".join(self["comments_num"])) if len(self["watch_user_num"]) == 2: watch_user_num = int(self["watch_user_num"][0]) click_num = int(self["watch_user_num"][1]) else: watch_user_num = int(self["watch_user_num"][0]) click_num = 0 crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT) params = (zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time) return insert_sql, params class ZhihuAnswerItem(scrapy.Item): #知乎的問題回答item zhihu_id = scrapy.Field() url = scrapy.Field() question_id = scrapy.Field() author_id = scrapy.Field() content = scrapy.Field() parise_num = scrapy.Field() comments_num = scrapy.Field() create_time = scrapy.Field() update_time = scrapy.Field() crawl_time = scrapy.Field() def get_insert_sql(self): #插入知乎question表的sql語句 insert_sql = """ insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num, create_time, update_time, crawl_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num), update_time=VALUES(update_time) """ create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT) update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT) params = ( self["zhihu_id"], self["url"], self["question_id"], self["author_id"], self["content"], self["parise_num"], self["comments_num"], create_time, update_time, self["crawl_time"].strftime(SQL_DATETIME_FORMAT), ) return insert_sql, params

class MysqlTwistedPipline(object): def __init__(self, dbpool): self.dbpool = dbpool @classmethod def from_settings(cls, settings): dbparms = dict( host=settings["MYSQL_HOST"], db=settings["MYSQL_DBNAME"], user=settings["MYSQL_USER"], passwd=settings["MYSQL_PASSWORD"], charset='utf8', cursorclass=MySQLdb.cursors.DictCursor, use_unicode=True, ) dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms) return cls(dbpool) def process_item(self, item, spider): # 使用twisted將mysql插入變成異步執行 query = self.dbpool.runInteraction(self.do_insert, item) query.addErrback(self.handle_error, item, spider) # 處理異常 def handle_error(self, failure, item, spider): # 處理異步插入的異常 print(failure) def do_insert(self, cursor, item): # 執行具體的插入 # 根據不同的item 構建不同的sql語句並插入到mysql中 insert_sql, params = item.get_insert_sql() print(insert_sql, params) cursor.execute(insert_sql, params)

# -*- coding: utf-8 -*- import re import json import datetime try: import urlparse as parse except: from urllib import parse import scrapy from scrapy.loader import ItemLoader from ArticleSpider.items import ZhihuQuestionItem, ZhihuAnswerItem from selenium import webdriver import time class ZhihuSpider(scrapy.Spider): name = "zhihu" allowed_domains = ["www.zhihu.com"] start_urls = ['https://www.zhihu.com/'] # question的第一頁answer的請求url start_answer_url = "https://www.zhihu.com/api/v4/questions/{0}/answers?sort_by=default&include=data%5B%2A%5D.is_normal%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccollapsed_counts%2Creviewing_comments_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Cmark_infos%2Ccreated_time%2Cupdated_time%2Crelationship.is_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.author.is_blocking%2Cis_blocked%2Cis_followed%2Cvoteup_count%2Cmessage_thread_token%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}" login_cookies = [] headers = { "HOST": "www.zhihu.com", "Referer": "https://www.zhizhu.com", 'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0" } # selenium登錄保存cookies def get_cookies(self): browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") time.sleep(2) # 延時為了讓頁面加載完 browser.get("https://www.zhihu.com/#signin") browser.find_element_by_css_selector(".qrcode-signin-cut-button").click() browser.find_element_by_css_selector(".signup-social-buttons").click() browser.find_element_by_css_selector(".js-bindweibo").click() # browser.switch_to.window(browser.window_handles[-1]) browser.find_element_by_css_selector(".WB_iptxt").send_keys("liyao198705@sina.com") browser.find_element_by_css_selector("input[node-type='passwd']").send_keys("da_ge_da") browser.find_element_by_css_selector("a[node-type='submit']").click() time.sleep(2) # 延時為了讓頁面加載完 browser.find_element_by_css_selector("a[node-type='submit']").click() login_cookies = browser.get_cookies() browser.close() # 第一步:先於parse方法執行,處理登陸邏輯 def start_requests(self): return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, cookies=self.login_cookies, callback=self.parse)] # 第二步:處理登陸后的邏輯 def parse(self, response): """ 爬取策略:深度優先( 提取出html登陸后頁面中的所有url 並跟蹤這些url進行一步爬取) 如果提取的url中格式為 /question/xxx 就下載之后直接進入解析函數 """ all_urls = response.css("a::attr(href)").extract() all_urls = [parse.urljoin(response.url, url) for url in all_urls] all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls) for url in all_urls: match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url) if match_obj: # 如果提取到question相關的頁面則下載后交由提取函數進行提取 request_url = match_obj.group(1) yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question) else: # 如果不是question頁面則直接進一步跟蹤 yield scrapy.Request(url, headers=self.headers, callback=self.parse) def parse_question(self, response): # 處理question頁面, 從頁面中提取出具體的question item if "QuestionHeader-title" in response.text: # 處理新版本 match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url) if match_obj: question_id = int(match_obj.group(2)) item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response) item_loader.add_css("title", "h1.QuestionHeader-title::text") item_loader.add_css("content", ".QuestionHeader-detail") item_loader.add_value("url", response.url) item_loader.add_value("zhihu_id", question_id) item_loader.add_css("answer_num", ".List-headerText span::text") item_loader.add_css("comments_num", ".QuestionHeader-actions button::text") item_loader.add_css("watch_user_num", ".NumberBoard-value::text") item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text") question_item = item_loader.load_item() else: # 處理老版本頁面的item提取 match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url) if match_obj: question_id = int(match_obj.group(2)) # item沒有函數處理可使用默認ItemLoader item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response) # item_loader.add_css("title", ".zh-question-title h2 a::text") item_loader.add_xpath("title", "//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()") item_loader.add_css("content", "#zh-question-detail") item_loader.add_value("url", response.url) item_loader.add_value("zhihu_id", question_id) item_loader.add_css("answer_num", "#zh-question-answer-num::text") item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text") # item_loader.add_css("watch_user_num", "#zh-question-side-header-wrap::text") item_loader.add_xpath("watch_user_num", "//*[@id='zh-question-side-header-wrap']/text()|//*[@class='zh-question-followers-sidebar']/div/a/strong/text()") item_loader.add_css("topics", ".zm-tag-editor-labels a::text") question_item = item_loader.load_item() yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer) yield question_item def parse_answer(self, reponse): # 處理question的answer ans_json = json.loads(reponse.text) is_end = ans_json["paging"]["is_end"] next_url = ans_json["paging"]["next"] # 提取answer的具體字段 for answer in ans_json["data"]: answer_item = ZhihuAnswerItem() answer_item["zhihu_id"] = answer["id"] answer_item["url"] = answer["url"] answer_item["question_id"] = answer["question"]["id"] answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None answer_item["content"] = answer["content"] if "content" in answer else None answer_item["parise_num"] = answer["voteup_count"] answer_item["comments_num"] = answer["comment_count"] answer_item["create_time"] = answer["created_time"] answer_item["update_time"] = answer["updated_time"] answer_item["crawl_time"] = datetime.datetime.now() yield answer_item if not is_end: yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)