setting文件
为什么项目中需要配置文件
在配置文件中存放一些公共变量,在后续的项目中便便修改,注意其中的变量名一般全部大写
配置文件中的变量使用方法
导入即可使用
settings.py中的重点字段和内涵
USER_AGENT 设置ua
ROBOTSTXT_OBEY 是否遵守robots协议,默认是遵守
CONCURRENT_REQUESTS 设置并发请求的数量,默认是16个
DOWNLOAD_DELAY 下载延迟,默认无延迟
COOKIES_ENABLED 是否开启cookie,即每次请求带上前一次的cookie,默认是开启的
DEFAULT_REQUEST_HEADERS 设置默认请求头
SPIDER_MIDDLEWARES 爬虫中间件,设置过程和管道相同
DOWNLOADER_MIDDLEWARES 下载中间件
随机更换user-agent
每次url请求更换一次user-agent
pip install fake-useragent
settings
DOWNLOADER_MIDDLEWARES = { # 'ArticleSpider.middlewares.MyCustomDownloaderMiddleware': 543, 'ArticleSpider.middlewares.RandomUserAgentMiddleware': 400, }
middlewares
from fake_useragent import UserAgent class RandomUserAgentMiddleware(object): def __init__(self, crawler): super(RandomUserAgentMiddleware, self).__init__() self.ua = UserAgent() # 若settings中没有设置RANDOM_UA_TYPE的值默认值为random, # 从settings中获取RANDOM_UA_TYPE变量,值可以是 random ie chrome firefox safari opera msie self.ua_type = crawler.settings.get('RANDOM_UA_TYPE', 'random') @classmethod def from_crawler(cls, crawler): return cls(crawler) def process_request(self, request, spider): def get_ua(): '''根据settings的RANDOM_UA_TYPE变量设置每次请求的User-Agent''' return getattr(self.ua, self.ua_type) ua = get_ua() request.headers.setdefault('User-Agent', get_ua())
ip代理
方案一:免费版
自定义函数获取网上的一些免费代理ip
settings
DOWNLOADER_MIDDLEWARES = { 'ArticleSpider.middlewares.RandomProxyMiddleware': 400, }
middlewares
class RandomProxyMiddleware(object): #动态设置ip代理 def process_request(self, request, spider): request.meta["proxy"] = get_random_ip() # 这个自定义函数返回一个随机代理ip:port
方案二:收费版
github上scrapy-proxies等等
在线打码
编码识别:由于验证码识别难度大,而且易更新,所以编码识别验证码(不推荐)
在线打码:调用已经开发好的在线验证码识别软件接口识别验证码。识别率在90%以上,并且效率高(推荐)
人工打码:识别率近100%,但是成本高(用于复杂的)
cookie
1.一些网站会跟踪cookie,如果不需要登陆的网站,可禁用cookie,降低被ban概率,scrapy默认开启cookie
COOKIES_ENABLED = False
2.配合其他程序使用,比如其使用selenium把登陆之后的cookie获取到保存到本地,scrapy发送请求之前先读取本地cookie

import scrapy import re class RenrenSpider(scrapy.Spider): name = 'renren' allowed_domains = ['renren.com'] start_urls = ['http://www.renren.com/941954027/profile'] def start_requests(self): cookie_str = "cookie_str" cookie_dict = {i.split("=")[0]:i.split("=")[1] for i in cookie_str.split("; ")} yield scrapy.Request( self.start_urls[0], callback=self.parse, cookies=cookie_dict, # headers={"Cookie":cookie_str} ) def parse(self, response): ret = re.findall("新用户287",response.text) print(ret) yield scrapy.Request( "http://www.renren.com/941954027/profile?v=info_timeline", callback=self.parse_detail ) def parse_detail(self,response): ret = re.findall("新用户287",response.text) print(ret)
自动限速
调整某些参数,如
AUTOTHROTTLE_ENABLED = True DOWNLOAD_DELAY = 3
selenium
官方文档 http://selenium-python-docs-zh.readthedocs.io/zh_CN/latest/
作用:浏览器操控
安装selenium
pip install selenium
下载对应浏览器的驱动
http://selenium-python.readthedocs.io/installation.html
第三方(微博)登录知乎
import time from selenium import webdriver from scrapy.selector import Selector browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") time.sleep(2) # 延时为了让页面加载完 browser.get("https://www.zhihu.com/#signin") browser.find_element_by_css_selector(".qrcode-signin-cut-button").click() browser.find_element_by_css_selector(".signup-social-buttons").click() browser.find_element_by_css_selector(".js-bindweibo").click() #browser.switch_to.window(browser.window_handles[-1]) browser.find_element_by_css_selector(".WB_iptxt").send_keys("xxx") browser.find_element_by_css_selector("input[node-type='passwd']").send_keys("xxx") browser.find_element_by_css_selector("a[node-type='submit']").click() time.sleep(2) # 延时为了让页面加载完 browser.find_element_by_css_selector("a[node-type='submit']").click()
第三方(QQ)登录知乎
代码维护 https://github.com/ecithy/selenium-QQ
# -*- coding: utf-8 -*- __author__ = 'hy' import time from selenium import webdriver from scrapy.selector import Selector browser = webdriver.Firefox(executable_path="D:/Package/geckodriver.exe") # browser.get("https://www.zhihu.com/#signin") time.sleep(2) # 点击QQ browser.find_element_by_css_selector(".qrcode-signin-cut-button").click() browser.find_element_by_css_selector(".signup-social-buttons").click() time.sleep(2) browser.find_element_by_css_selector(".js-bindqq").click() time.sleep(5) browser.switch_to.window(browser.window_handles[-1]) browser.switch_to.frame("ptlogin_iframe") # iframe必须逐级切入 # 用户名 密码 # 隐藏初始界面 browser.execute_script('document.getElementById("qlogin").style="display: none;"') browser.execute_script('document.getElementsByClassName("authLogin").style="display: none;"') # 显示用户、密码输入界面 browser.execute_script('document.getElementById("web_qr_login").style="display: block;"') # browser.evaluate_script('document.getElementById("batch_quto").contentEditable = true') time.sleep(5) # 输入用户、密码 elem_user = browser.find_element_by_name("u").send_keys("xxx") elem_pwd = browser.find_element_by_name("p").send_keys("xxx") elem_but = browser.find_element_by_id("login_button").click() time.sleep(5)
scrapy集成selenium
为什么集成selenium
selenium取代下载器,编码难度大的操作交给selenium
优点:反爬虫难度大
缺点:同步selenium效率低,需要结合Twisted成异步
middleware方式
方式一
settings
DOWNLOADER_MIDDLEWARES = { 'ArticleSpider.middlewares.JSPageMiddleware':1, }
middlewares
from selenium import webdriver from scrapy.http import HtmlResponse import time class JSPageMiddleware(object): def __init__(self): # 使用同一个self,保证只打开一个浏览器,所有spider使用一个浏览器 self.browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") super(JSPageMiddleware, self).__init__() # 通过chrome请求动态网页 def process_request(self, request, spider): if spider.name == "jobbole": # self.browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") self.browser.get(request.url) time.sleep(1) print("访问:{0}".format(request.url)) # browser.quit() return HtmlResponse(url=self.browser.current_url, body=self.browser.page_source, encoding="utf-8", request=request)
方式二
middlewares
from scrapy.http import HtmlResponse import time class JSPageMiddleware(object): # 通过chrome请求动态网页 def process_request(self, request, spider): if spider.name == "jobbole": # self.browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") spider.browser.get(request.url) time.sleep(1) print("访问:{0}".format(request.url)) # browser.quit() return HtmlResponse(url=spider.browser.current_url, body=spider.browser.page_source, encoding="utf-8", request=request)
spider
from selenium import webdriver from scrapy.xlib.pydispatch import dispatcher from scrapy import signals class JobboleSpider(scrapy.Spider): name = 'jobbole' allowed_domains = ['blog.jobbole.com'] start_urls = ['http://blog.jobbole.com/all-posts/'] def __init__(self): # 使用同一个self,每个spider使用一个浏览器 self.browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") super(JobboleSpider, self).__init__() dispatcher.connect(self.spider_closed, signals.spider_closed) # 爬虫关闭后 def spider_closed(self, spider): self.browser.quit()
scrapy集成selenium/phthomjs模拟登录
为什么不直接用selenium替代原生下载器?
selenium是同步的方式,如果每个页面采用selenium则导致爬虫效率极低,目前并没有scrapy中的Twisted结合selenium的异步方案,因此selenium不推荐替代原生下载器
scrapy集成selenium能做什么?
由于模拟登录是编码很难解决的问题 ,因此采用selenium解决;其它页面继续用原生下载器的异步下载方案
# -*- coding: utf-8 -*- import re import datetime try: import urlparse as parse except: from urllib import parse import scrapy from selenium import webdriver import time class ZhihuSpider(scrapy.Spider): name = "zhihu" allowed_domains = ["www.zhihu.com"] start_urls = ['https://www.zhihu.com/'] login_cookies = [] headers = { "HOST": "www.zhihu.com", "Referer": "https://www.zhizhu.com", 'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0" } # selenium登录保存cookies def get_cookies(self): browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") time.sleep(2) # 延时为了让页面加载完 browser.get("https://www.zhihu.com/#signin") browser.find_element_by_css_selector(".qrcode-signin-cut-button").click() browser.find_element_by_css_selector(".signup-social-buttons").click() browser.find_element_by_css_selector(".js-bindweibo").click() # browser.switch_to.window(browser.window_handles[-1]) browser.find_element_by_css_selector(".WB_iptxt").send_keys("xxx") browser.find_element_by_css_selector("input[node-type='passwd']").send_keys("xxx") browser.find_element_by_css_selector("a[node-type='submit']").click() time.sleep(2) # 延时为了让页面加载完 browser.find_element_by_css_selector("a[node-type='submit']").click() login_cookies = browser.get_cookies() browser.close() # 第一步:先于parse方法执行,处理登陆逻辑。可以猜测,start_requests携带的cookie会给后续所有的访问自动带上 def start_requests(self): return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, cookies=self.login_cookies, callback=self.parse)] # 第二步:处理登陆后的逻辑 def parse(self, response): my_url= 'https://www.zhihu.com/people/edit' # 该页面是个人中心页,只有登录后才能访问 yield scrapy.Request(my_url, headers=self.headers)
爬取知乎文章和问答
scrapy shell调试
scrapy shell -s USER_AGENT="Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0" https://www.zhihu.com/question/56320032
页面分析
chrome安装jsonview插件
xhr页面查看json数据,这样获取数据更轻松
表设计
为了避免可能解析不到的字段或无法插入的情况,需要给字段设置默认值

ITEM_PIPELINES = { 'ArticleSpider.pipelines.MysqlTwistedPipline': 1, } MYSQL_HOST = "127.0.0.1" MYSQL_DBNAME = "article_spider" MYSQL_USER = "root" MYSQL_PASSWORD = "root" SQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" SQL_DATE_FORMAT = "%Y-%m-%d" ES_HOST = "127.0.0.1"

import datetime import re import scrapy from scrapy.loader import ItemLoader from scrapy.loader.processors import MapCompose, TakeFirst, Join from .utils.common import extract_num from .settings import SQL_DATETIME_FORMAT, SQL_DATE_FORMAT from w3lib.html import remove_tags class ZhihuQuestionItem(scrapy.Item): #知乎的问题 item zhihu_id = scrapy.Field() topics = scrapy.Field() url = scrapy.Field() title = scrapy.Field() content = scrapy.Field() answer_num = scrapy.Field() comments_num = scrapy.Field() watch_user_num = scrapy.Field() click_num = scrapy.Field() crawl_time = scrapy.Field() def get_insert_sql(self): #插入知乎question表的sql语句 insert_sql = """ insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num), watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num) """ # 方式二:将item loader的item数组形式在这里处理 zhihu_id = self["zhihu_id"][0] topics = ",".join(self["topics"]) url = self["url"][0] title = "".join(self["title"]) content = "".join(self["content"]) answer_num = extract_num("".join(self["answer_num"])) comments_num = extract_num("".join(self["comments_num"])) if len(self["watch_user_num"]) == 2: watch_user_num = int(self["watch_user_num"][0]) click_num = int(self["watch_user_num"][1]) else: watch_user_num = int(self["watch_user_num"][0]) click_num = 0 crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT) params = (zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time) return insert_sql, params class ZhihuAnswerItem(scrapy.Item): #知乎的问题回答item zhihu_id = scrapy.Field() url = scrapy.Field() question_id = scrapy.Field() author_id = scrapy.Field() content = scrapy.Field() parise_num = scrapy.Field() comments_num = scrapy.Field() create_time = scrapy.Field() update_time = scrapy.Field() crawl_time = scrapy.Field() def get_insert_sql(self): #插入知乎question表的sql语句 insert_sql = """ insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num, create_time, update_time, crawl_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num), update_time=VALUES(update_time) """ create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT) update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT) params = ( self["zhihu_id"], self["url"], self["question_id"], self["author_id"], self["content"], self["parise_num"], self["comments_num"], create_time, update_time, self["crawl_time"].strftime(SQL_DATETIME_FORMAT), ) return insert_sql, params

class MysqlTwistedPipline(object): def __init__(self, dbpool): self.dbpool = dbpool @classmethod def from_settings(cls, settings): dbparms = dict( host=settings["MYSQL_HOST"], db=settings["MYSQL_DBNAME"], user=settings["MYSQL_USER"], passwd=settings["MYSQL_PASSWORD"], charset='utf8', cursorclass=MySQLdb.cursors.DictCursor, use_unicode=True, ) dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms) return cls(dbpool) def process_item(self, item, spider): # 使用twisted将mysql插入变成异步执行 query = self.dbpool.runInteraction(self.do_insert, item) query.addErrback(self.handle_error, item, spider) # 处理异常 def handle_error(self, failure, item, spider): # 处理异步插入的异常 print(failure) def do_insert(self, cursor, item): # 执行具体的插入 # 根据不同的item 构建不同的sql语句并插入到mysql中 insert_sql, params = item.get_insert_sql() print(insert_sql, params) cursor.execute(insert_sql, params)

# -*- coding: utf-8 -*- import re import json import datetime try: import urlparse as parse except: from urllib import parse import scrapy from scrapy.loader import ItemLoader from ArticleSpider.items import ZhihuQuestionItem, ZhihuAnswerItem from selenium import webdriver import time class ZhihuSpider(scrapy.Spider): name = "zhihu" allowed_domains = ["www.zhihu.com"] start_urls = ['https://www.zhihu.com/'] # question的第一页answer的请求url start_answer_url = "https://www.zhihu.com/api/v4/questions/{0}/answers?sort_by=default&include=data%5B%2A%5D.is_normal%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccollapsed_counts%2Creviewing_comments_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Cmark_infos%2Ccreated_time%2Cupdated_time%2Crelationship.is_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.author.is_blocking%2Cis_blocked%2Cis_followed%2Cvoteup_count%2Cmessage_thread_token%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}" login_cookies = [] headers = { "HOST": "www.zhihu.com", "Referer": "https://www.zhizhu.com", 'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0" } # selenium登录保存cookies def get_cookies(self): browser = webdriver.Chrome(executable_path="D:/Package/chromedriver.exe") time.sleep(2) # 延时为了让页面加载完 browser.get("https://www.zhihu.com/#signin") browser.find_element_by_css_selector(".qrcode-signin-cut-button").click() browser.find_element_by_css_selector(".signup-social-buttons").click() browser.find_element_by_css_selector(".js-bindweibo").click() # browser.switch_to.window(browser.window_handles[-1]) browser.find_element_by_css_selector(".WB_iptxt").send_keys("liyao198705@sina.com") browser.find_element_by_css_selector("input[node-type='passwd']").send_keys("da_ge_da") browser.find_element_by_css_selector("a[node-type='submit']").click() time.sleep(2) # 延时为了让页面加载完 browser.find_element_by_css_selector("a[node-type='submit']").click() login_cookies = browser.get_cookies() browser.close() # 第一步:先于parse方法执行,处理登陆逻辑 def start_requests(self): return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, cookies=self.login_cookies, callback=self.parse)] # 第二步:处理登陆后的逻辑 def parse(self, response): """ 爬取策略:深度优先( 提取出html登陆后页面中的所有url 并跟踪这些url进行一步爬取) 如果提取的url中格式为 /question/xxx 就下载之后直接进入解析函数 """ all_urls = response.css("a::attr(href)").extract() all_urls = [parse.urljoin(response.url, url) for url in all_urls] all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls) for url in all_urls: match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url) if match_obj: # 如果提取到question相关的页面则下载后交由提取函数进行提取 request_url = match_obj.group(1) yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question) else: # 如果不是question页面则直接进一步跟踪 yield scrapy.Request(url, headers=self.headers, callback=self.parse) def parse_question(self, response): # 处理question页面, 从页面中提取出具体的question item if "QuestionHeader-title" in response.text: # 处理新版本 match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url) if match_obj: question_id = int(match_obj.group(2)) item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response) item_loader.add_css("title", "h1.QuestionHeader-title::text") item_loader.add_css("content", ".QuestionHeader-detail") item_loader.add_value("url", response.url) item_loader.add_value("zhihu_id", question_id) item_loader.add_css("answer_num", ".List-headerText span::text") item_loader.add_css("comments_num", ".QuestionHeader-actions button::text") item_loader.add_css("watch_user_num", ".NumberBoard-value::text") item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text") question_item = item_loader.load_item() else: # 处理老版本页面的item提取 match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url) if match_obj: question_id = int(match_obj.group(2)) # item没有函数处理可使用默认ItemLoader item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response) # item_loader.add_css("title", ".zh-question-title h2 a::text") item_loader.add_xpath("title", "//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()") item_loader.add_css("content", "#zh-question-detail") item_loader.add_value("url", response.url) item_loader.add_value("zhihu_id", question_id) item_loader.add_css("answer_num", "#zh-question-answer-num::text") item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text") # item_loader.add_css("watch_user_num", "#zh-question-side-header-wrap::text") item_loader.add_xpath("watch_user_num", "//*[@id='zh-question-side-header-wrap']/text()|//*[@class='zh-question-followers-sidebar']/div/a/strong/text()") item_loader.add_css("topics", ".zm-tag-editor-labels a::text") question_item = item_loader.load_item() yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer) yield question_item def parse_answer(self, reponse): # 处理question的answer ans_json = json.loads(reponse.text) is_end = ans_json["paging"]["is_end"] next_url = ans_json["paging"]["next"] # 提取answer的具体字段 for answer in ans_json["data"]: answer_item = ZhihuAnswerItem() answer_item["zhihu_id"] = answer["id"] answer_item["url"] = answer["url"] answer_item["question_id"] = answer["question"]["id"] answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None answer_item["content"] = answer["content"] if "content" in answer else None answer_item["parise_num"] = answer["voteup_count"] answer_item["comments_num"] = answer["comment_count"] answer_item["create_time"] = answer["created_time"] answer_item["update_time"] = answer["updated_time"] answer_item["crawl_time"] = datetime.datetime.now() yield answer_item if not is_end: yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)