Python分布式爬蟲打造搜索引擎
基於Scrapy、Redis、elasticsearch和django打造一個完整的搜索引擎網站
https://github.com/mtianyan/ArticleSpider
未來是什么時代?是數據時代!數據分析服務、互聯網金融,數據建模、自然語言處理、醫療病例分析……越來越多的工作會基於數據來做,而爬蟲正是快速獲取數據最重要的方式,相比其它語言,Python爬蟲更簡單、高效
一、基礎知識學習:
1. 爬取策略的深度優先和廣度優先
目錄:
- 網站的樹結構
- 深度優先算法和實現
- 廣度優先算法和實現
網站url樹結構分層設計:
- bogbole.com
- blog.bogbole.com
- python.bogbole.com
- python.bogbole.com/123
環路鏈接問題:
從首頁到下面節點。
但是下面的鏈接節點又會有鏈接指向首頁
所以:我們需要對於鏈接進行去重
1. 深度優先
2. 廣度優先
跳過已爬取的鏈接
對於二叉樹的遍歷問題
深度優先(遞歸實現):
順着一條路,走到最深處。然后回頭
廣度優先(隊列實現):
分層遍歷:遍歷完兒子輩。然后遍歷孫子輩
Python實現深度優先過程code:
def depth_tree(tree_node):
if tree_node is not None:
print (tree_node._data)
if tree_node._left is not None:
return depth_tree(tree_node.left)
if tree_node._right is not None:
return depth_tree(tree_node,_right)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
Python實現廣度優先過程code:
def level_queue(root):
#利用隊列實現樹的廣度優先遍歷
if root is None:
return
my_queue = []
node = root
my_queue.append(node)
while my_queue:
node = my_queue.pop(0)
print (node.elem)
if node.lchild is not None:
my_queue.append(node.lchild)
if node.rchild is not None:
my_queue.append(node.rchild)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
2. 爬蟲網址去重策略
- 將訪問過的url保存到數據庫中
- 將url保存到set中。只需要O(1)的代價就可以查詢到url
100000000*2byte*50個字符/1024/1024/1024 = 9G
- url經過md5等方法哈希后保存到set中,將url壓縮到固定長度而且不重復
- 用bitmap方法,將訪問過的url通過hash函數映射到某一位
- bloomfilter方法對bitmap進行改進,多重hash函數降低沖突
scrapy去重使用的是第三種方法:后面分布式scrapy-redis會講解bloomfilter方法。
3. Python字符串編碼問題解決:
**讀取文件,進行操作時轉換為unicode編碼進行處理** **保存文件時,轉換為utf-8編碼。以便於傳輸** 讀文件的庫會將轉換為unicode *python2 默認編碼格式為`ASCII`,Python3 默認編碼為 `utf-8`*
- 計算機只能處理數字,文本轉換為數字才能處理,計算機中8個bit作為一個字節,
所以一個字節能表示的最大數字就是255- 計算機是美國人發明的,所以一個字節就可以標識所有單個字符
,所以ASCII(一個字節)編碼就成為美國人的標准編碼- 但是ASCII處理中文明顯不夠,中文不止255個漢字,所以中國制定了GB2312編碼
,用兩個字節表示一個漢字。GB2312將ASCII也包含進去了。同理,日文,韓文,越來越多的國家為了解決這個問題就都發展了一套編碼,標准越來越多,如果出現多種語言混合顯示就一定會出現亂碼- 於是unicode出現了,它將所有語言包含進去了。
- 看一下ASCII和unicode編碼:
- 字母A用ASCII編碼十進制是65,二進制 0100 0001
- 漢字”中” 已近超出ASCII編碼的范圍,用unicode編碼是20013二進制是01001110 00101101
- A用unicode編碼只需要前面補0二進制是 00000000 0100 0001
- 亂碼問題解決的,但是如果內容全是英文,unicode編碼比ASCII編碼需要多一倍的存儲空間,傳輸也會變慢。
- 所以此時出現了可變長的編碼”utf-8” ,把英文:1字節,漢字3字節,特別生僻的變成4-6字節,如果傳輸大量的英文,utf8作用就很明顯。
#python3
import sys
sys.getdefaultencoding()
s.encoding('utf-8')
- 1
- 2
- 3
- 4
#python2
import sys
sys.getdefaultencoding()
s = "我和你"
su = u"我和你"
~~s.encode("utf-8")#會報錯~~
s.decode("gb2312").encode("utf-8")
su.encode("utf-8")
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
二、伯樂在線爬取所有文章
1. 初始化文件目錄
基礎環境為了便於日后的部署:我們開發使用了虛擬環境。
- python 3.5.1
- JetBrains PyCharm 2016.3.2
- mysql+navicat
pip install virtualenv
pip install virtualenvwrapper-win
安裝虛擬環境管理
mkvirtualenv articlespider3
創建虛擬環境
workon articlespider3
直接進入虛擬環境
deactivate
退出激活狀態
workon
知道有哪些虛擬環境
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
scrapy項目初始化介紹
**命令行創建scrapy項目**自行官網下載py35對應得whl文件進行pip離線安裝
Scrapy 1.3.3
cd desktop
scrapy startproject ArticleSpider
- 1
- 2
- 3
scrapy.cfg:配置文件。setings.py:設置
SPIDER_MODULES = ['ArticleSpider.spiders'] #存放spider的路徑
NEWSPIDER_MODULE = 'ArticleSpider.spiders'
- 1
- 2
middilewares.py:做跟數據存儲相關的東西
__init__.py:自己定義的middlewares 定義方法,處理響應的IO操作
items.py:項目的初始化文件。
**創建我們的spider**定義我們所要爬取的信息的相關屬性。Item對象是種類似於表單,用來保存獲取到的數據
cd ArticleSpider
scrapy genspider jobbole blog.jobbole.com
- 1
- 2
# -*- coding: utf-8 -*-
import scrapy
class JobboleSpider(scrapy.Spider):
name = "jobbole"
allowed_domains = ["blog.jobbole.com"]
# start_urls是一個帶爬的列表,
#spider會為我們把請求下載網頁做到,直接到parse階段
start_urls = ['http://blog.jobbole.com/']
def parse(self, response):
pass
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
scrapy crawl jobbole
- 1
pip install pypiwin32#解決
- 1
# _*_ coding: utf-8 _*_
__author__ = 'mtianyan'
__date__ = '2017/3/28 12:06'
from scrapy.cmdline import execute
import sys
import os
#將系統當前目錄設置為項目根目錄
#os.path.abspath(__file__)為當前文件所在絕對路徑
#os.path.dirname為文件所在目錄
#H:\CodePath\spider\ArticleSpider\main.py
#H:\CodePath\spider\ArticleSpider
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
#執行命令,相當於在控制台cmd輸入改名了
execute(["scrapy", "crawl" , "jobbole"])
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
def parse(self, response):
pass
- 1
- 2
可以看出scrapy已經為我們做到了將網頁下載下來。而且編碼也進行了轉換.
- body:網頁內容
- _DEFAULT_ENCODING= ‘ascii’
- encoding= ‘utf-8’
2. 提取伯樂在線內容
xpath的使用
xpath讓你可以不懂前端html,不看html的詳細結構,只需要會右鍵查看就能獲取網頁上任何內容。速度遠超beautifulsoup。 目錄:1. xpath簡介
2. xpath術語與語法
3. xpath抓取誤區:javasrcipt生成html與html源文件的區別
4. xpath抓取實例
- 1
- 2
- 3
- 4
- 5
- xpath使用路徑表達式在xml和html中進行導航
- xpath包含有一個標准函數庫
- xpath是一個w3c的標准
- xpath速度要遠遠超beautifulsoup。
- 父節點
*上一層節點* - 子節點
- 兄弟節點
*同胞節點* - 先輩節點
*父節點,爺爺節點* - 后代節點
*兒子,孫子*
xpath語法:
| 表達式 | 說明 |
|---|---|
| article | 選取所有article元素的所有子節點 |
| /article | 選取根元素article |
| article/a | 選取所有屬於article的子元素的a元素 |
| //div | 選取所有div元素(不管出現在文檔里的任何地方) |
| article//div | 選取所有屬於article元素的后代的div元素,不管它出現在article之下的任何位置 |
| //@class | 選取所有名為class的屬性 |
xpath語法-謂語:
| 表達式 | 說明 |
|---|---|
| /article/div[1 | 選取屬於article子元素的第一個div元素 |
| /article/div[last()] | 選取屬於article子元素的最后一個div元素 |
| /article/div[last()-1] | 選取屬於article子元素的倒數第二個div元素 |
| //div[@color] | 選取所有擁有color屬性的div元素 |
| //div[@color=’red’] | 選取所有color屬性值為red的div元素 |
xpath語法:
| 表達式 | 說明 |
|---|---|
| /div/* | 選取屬於div元素的所有子節點 |
| //* | 選取所有元素 |
| //div[@*] | 選取所有帶屬性的div 元素 |
| //div/a 丨//div/p | 選取所有div元素的a和p元素 |
| //span丨//ul | 選取文檔中的span和ul元素 |
| article/div/p丨//span | 選取所有屬於article元素的div元素的p元素以及文檔中所有的 span元素 |
xpath抓取誤區
取某一個網頁上元素的xpath地址
在標題處右鍵使用firebugs查看元素。
然后在<h1>2016 騰訊軟件開發面試題(部分)</h1>右鍵查看xpath
# -*- coding: utf-8 -*-
import scrapy
class JobboleSpider(scrapy.Spider):
name = "jobbole"
allowed_domains = ["blog.jobbole.com"]
start_urls = ['http://blog.jobbole.com/110287/']
def parse(self, response):
re_selector = response.xpath("/html/body/div[3]/div[3]/div[1]/div[1]/h1")
# print(re_selector)
pass
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
調試debug可以看到
re_selector =(selectorlist)[]
- 1
可以看到返回的是一個空列表,
列表是為了如果我們當前的xpath路徑下還有層級目錄時可以進行選取
空說明沒取到值:
我們可以來chorme里觀察一下
chorme取到的值
//*[@id="post-110287"]/div[1]/h1
chormexpath代碼
# -*- coding: utf-8 -*-
import scrapy
class JobboleSpider(scrapy.Spider):
name = "jobbole"
allowed_domains = ["blog.jobbole.com"]
start_urls = ['http://blog.jobbole.com/110287/']
def parse(self, response):
re_selector = response.xpath('//*[@id="post-110287"]/div[1]/h1')
# print(re_selector)
pass
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
可以看出此時可以取到值
分析頁面,可以發現頁面內有一部html是通過JavaScript ajax交互來生成的,因此在f12檢查元素時的頁面結構里有,而xpath不對
xpath是基於html源代碼文件結構來找的
xpath可以有多種多樣的寫法:
re_selector = response.xpath("/html/body/div[1]/div[3]/div[1]/div[1]/h1/text()")
re2_selector = response.xpath('//*[@id="post-110287"]/div[1]/h1/text()')
re3_selector = response.xpath('//div[@class="entry-header]/h1/text()')
- 1
- 2
- 3
推薦使用id型。因為頁面id唯一。
推薦使用class型,因為后期循環爬取可擴展通用性強。
通過了解了這些此時我們已經可以抓取到頁面的標題,此時可以使用xpath利器照貓畫虎抓取任何內容。只需要點擊右鍵查看xpath。
開啟控制台調試
scrapy shell http://blog.jobbole.com/110287/
完整的xpath提取伯樂在線字段代碼
# -*- coding: utf-8 -*-
import scrapy
import re
class JobboleSpider(scrapy.Spider):
name = "jobbole"
allowed_domains = ["blog.jobbole.com"]
start_urls = ['http://blog.jobbole.com/110287/']
def parse(self, response):
#提取文章的具體字段
title = response.xpath('//div[@class="entry-header"]/h1/text()').extract_first("")
create_date = response.xpath("//p[@class='entry-meta-hide-on-mobile']/text()").extract()[0].strip().replace("·","").strip()
praise_nums = response.xpath("//span[contains(@class, 'vote-post-up')]/h10/text()").extract()[0]
fav_nums = response.xpath("//span[contains(@class, 'bookmark-btn')]/text()").extract()[0]
match_re = re.match(".*?(\d+).*", fav_nums)
if match_re:
fav_nums = match_re.group(1)
comment_nums = response.xpath("//a[@href='#article-comment']/span/text()").extract()[0]
match_re = re.match(".*?(\d+).*", comment_nums)
if match_re:
comment_nums = match_re.group(1)
content = response.xpath("//div[@class='entry']").extract()[0]
tag_list = response.xpath("//p[@class='entry-meta-hide-on-mobile']/a/text()").extract()
tag_list = [element for element in tag_list if not element.strip().endswith("評論")]
tags = ",".join(tag_list)
pass
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
css選擇器的使用:
# 通過css選擇器提取字段
# front_image_url = response.meta.get("front_image_url", "") #文章封面圖
title = response.css(".entry-header h1::text").extract_first()
create_date = response.css("p.entry-meta-hide-on-mobile::text").extract()[0].strip().replace("·","").strip()
praise_nums = response.css(".vote-post-up h10::text").extract()[0]
fav_nums = response.css(".bookmark-btn::text").extract()[0]
match_re = re.match(".*?(\d+).*", fav_nums)
if match_re:
fav_nums = int(match_re.group(1))
else:
fav_nums = 0
comment_nums = response.css("a[href='#article-comment'] span::text").extract()[0]
match_re = re.match(".*?(\d+).*", comment_nums)
if match_re:
comment_nums = int(match_re.group(1))
else:
comment_nums = 0
content = response.css("div.entry").extract()[0]
tag_list = response.css("p.entry-meta-hide-on-mobile a::text").extract()
tag_list = [element for element in tag_list if not element.strip().endswith("評論")]
tags = ",".join(tag_list)
pass
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
3. 爬取所有文章
yield關鍵字
#使用request下載詳情頁面,下載完成后回調方法parse_detail()提取文章內容中的字段
yield Request(url=parse.urljoin(response.url,post_url),callback=self.parse_detail)
- 1
- 2
scrapy.http import Request下載網頁
from scrapy.http import Request
Request(url=parse.urljoin(response.url,post_url),callback=self.parse_detail)
- 1
- 2
parse拼接網址應對herf內有可能網址不全
from urllib import parse
url=parse.urljoin(response.url,post_url)
parse.urljoin("http://blog.jobbole.com/all-posts/","http://blog.jobbole.com/111535/")
#結果為http://blog.jobbole.com/111535/
- 1
- 2
- 3
- 4
class層級關系
next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
#如果.next .pagenumber 是指兩個class為層級關系。而不加空格為同一個標簽
- 1
- 2
twist異步機制
Scrapy使用了Twisted作為框架,Twisted有些特殊的地方是它是事件驅動的,並且比較適合異步的代碼。在任何情況下,都不要寫阻塞的代碼。阻塞的代碼包括:
- 訪問文件、數據庫或者Web
- 產生新的進程並需要處理新進程的輸出,如運行shell命令
- 執行系統層次操作的代碼,如等待系統隊列
實現全部文章字段下載的代碼:
def parse(self, response):
""" 1. 獲取文章列表頁中的文章url並交給scrapy下載后並進行解析 2. 獲取下一頁的url並交給scrapy進行下載, 下載完成后交給parse """
# 解析列表頁中的所有文章url並交給scrapy下載后並進行解析
post_urls = response.css("#archive .floated-thumb .post-thumb a::attr(href)").extract()
for post_url in post_urls:
#request下載完成之后,回調parse_detail進行文章詳情頁的解析
# Request(url=post_url,callback=self.parse_detail)
print(response.url)
print(post_url)
yield Request(url=parse.urljoin(response.url,post_url),callback=self.parse_detail)
#遇到href沒有域名的解決方案
#response.url + post_url
print(post_url)
# 提取下一頁並交給scrapy進行下載
next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
if next_url:
yield Request(url=parse.urljoin(response.url, post_url), callback=self.parse)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
全部文章的邏輯流程圖

4. scrapy的items整合字段
數據爬取的任務就是從非結構的數據中提取出結構性的數據。
items 可以讓我們自定義自己的字段(類似於字典,但比字典的功能更齊全)
在當前頁,需要提取多個url
原始寫法,extract之后則生成list列表,無法進行二次篩選:
post_urls = response.css("#archive .floated-thumb .post-thumb a::attr(href)").extract()
- 1
改進寫法:
post_nodes = response.css("#archive .floated-thumb .post-thumb a")
for post_node in post_nodes:
#獲取封面圖的url
image_url = post_node.css("img::attr(src)").extract_first("")
post_url = post_node.css("::attr(href)").extract_first("")
- 1
- 2
- 3
- 4
- 5
在下載網頁的時候把獲取到的封面圖的url傳給parse_detail的response
在下載網頁時將這個封面url獲取到,並通過meta將他發送出去。在callback的回調函數中接收該值
yield Request(url=parse.urljoin(response.url,post_url),meta={"front_image_url":image_url},callback=self.parse_detail)
front_image_url = response.meta.get("front_image_url", "")
- 1
- 2
- 3
urljoin的好處
如果你沒有域名,我就從response里取出來,如果你有域名則我對你起不了作用了
**編寫我們自定義的item並在jobboled.py中填充。
class JobBoleArticleItem(scrapy.Item):
title = scrapy.Field()
create_date = scrapy.Field()
url = scrapy.Field()
url_object_id = scrapy.Field()
front_image_url = scrapy.Field()
front_image_path = scrapy.Field()
praise_nums = scrapy.Field()
comment_nums = scrapy.Field()
fav_nums = scrapy.Field()
content = scrapy.Field()
tags = scrapy.Field()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
import之后實例化,實例化之后填充:
1. from ArticleSpider.items import JobBoleArticleItem
2. article_item = JobBoleArticleItem()
3. article_item["title"] = title
article_item["url"] = response.url
article_item["create_date"] = create_date
article_item["front_image_url"] = [front_image_url]
article_item["praise_nums"] = praise_nums
article_item["comment_nums"] = comment_nums
article_item["fav_nums"] = fav_nums
article_item["tags"] = tags
article_item["content"] = content
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
yield article_item將這個item傳送到pipelines中
pipelines可以接收到傳送過來的item
將setting.py中的pipeline配置取消注釋
# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
}
- 1
- 2
- 3
- 4
- 5
當我們的item被傳輸到pipeline我們可以將其進行存儲到數據庫等工作
setting設置下載圖片pipeline
ITEM_PIPELINES={
'scrapy.pipelines.images.ImagesPipeline': 1,
}
- 1
- 2
- 3
H:\CodePath\pyEnvs\articlespider3\Lib\site-packages\scrapy\pipelines
里面有三個scrapy默認提供的pipeline
提供了文件,圖片,媒體。
ITEM_PIPELINES是一個數據管道的登記表,每一項具體的數字代表它的優先級,數字越小,越早進入。
setting設置下載圖片的地址
# IMAGES_MIN_HEIGHT = 100
# IMAGES_MIN_WIDTH = 100
- 1
- 2
設置下載圖片的最小高度,寬度。
新建文件夾images在
IMAGES_URLS_FIELD = "front_image_url"
project_dir = os.path.abspath(os.path.dirname(__file__))
IMAGES_STORE = os.path.join(project_dir, 'images')
- 1
- 2
- 3
安裝PIL
pip install pillow
定制自己的pipeline使其下載圖片后能保存下它的本地路徑
get_media_requests()接收一個迭代器對象下載圖片
item_completed獲取到圖片的下載地址

繼承並重寫item_completed()
from scrapy.pipelines.images import ImagesPipeline
class ArticleImagePipeline(ImagesPipeline):
#重寫該方法可從result中獲取到圖片的實際下載地址
def item_completed(self, results, item, info):
for ok, value in results:
image_file_path = value["path"]
item["front_image_path"] = image_file_path
return item
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
setting中設置使用我們自定義的pipeline,而不是系統自帶的
ITEM_PIPELINES = {
'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
# 'scrapy.pipelines.images.ImagesPipeline': 1,
'ArticleSpider.pipelines.ArticleImagePipeline':1,
}
- 1
- 2
- 3
- 4
- 5

圖片url的md5處理
新建package utils
import hashlib
def get_md5(url):
m = hashlib.md5()
m.update(url)
return m.hexdigest()
if __name__ == "__main__":
print(get_md5("http://jobbole.com".encode("utf-8")))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
不確定用戶傳入的是不是:
def get_md5(url):
#str就是unicode了
if isinstance(url, str):
url = url.encode("utf-8")
m = hashlib.md5()
m.update(url)
return m.hexdigest()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
在jobbole.py中將url的md5保存下來
from ArticleSpider.utils.common import get_md5
article_item["url_object_id"] = get_md5(response.url)
- 1
- 2
5. 數據保存到本地文件以及mysql中
保存到本地json文件
import codecs打開文件避免一些編碼問題,自定義JsonWithEncodingPipeline實現json本地保存
class JsonWithEncodingPipeline(object):
#自定義json文件的導出
def __init__(self):
self.file = codecs.open('article.json', 'w', encoding="utf-8")
def process_item(self, item, spider):
#將item轉換為dict,然后生成json對象,false避免中文出錯
lines = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(lines)
return item
#當spider關閉的時候
def spider_closed(self, spider):
self.file.close()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
setting.py注冊pipeline
ITEM_PIPELINES = {
'ArticleSpider.pipelines.JsonWithEncodingPipeline': 2,
# 'scrapy.pipelines.images.ImagesPipeline': 1,
'ArticleSpider.pipelines.ArticleImagePipeline':1,
}
- 1
- 2
- 3
- 4
- 5
scrapy exporters JsonItemExporter導出
scrapy自帶的導出:
- 'CsvItemExporter',
- 'XmlItemExporter',
- 'JsonItemExporter'
- 1
- 2
- 3
- 4
from scrapy.exporters import JsonItemExporter
class JsonExporterPipleline(object):
#調用scrapy提供的json export導出json文件
def __init__(self):
self.file = open('articleexport.json', 'wb')
self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
self.exporter.start_exporting()
def close_spider(self, spider):
self.exporter.finish_exporting()
self.file.close()
def process_item(self, item, spider):
self.exporter.export_item(item)
return item
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
設置setting.py注冊該pipeline
'ArticleSpider.pipelines.JsonExporterPipleline ': 2
- 1
保存到數據庫(mysql)
數據庫設計數據表,表的內容字段是和item一致的。數據庫與item的關系。類似於django中model與form的關系。
日期的轉換,將字符串轉換為datetime
import datetime
try:
create_date = datetime.datetime.strptime(create_date, "%Y/%m/%d").date()
except Exception as e:
create_date = datetime.datetime.now().date()
- 1
- 2
- 3
- 4
- 5
數據庫表設計

- 三個num字段均設置不能為空,然后默認0.
- content設置為longtext
- 主鍵設置為url_object_id
數據庫驅動安裝
pip install mysqlclient
Linux報錯解決方案:
ubuntu:
sudo apt-get install libmysqlclient-dev
centos:
sudo yum install python-devel mysql-devel
保存到數據庫pipeline(同步)編寫
import MySQLdb
class MysqlPipeline(object):
#采用同步的機制寫入mysql
def __init__(self):
self.conn = MySQLdb.connect('127.0.0.1', 'root', 'password', 'article_spider', charset="utf8", use_unicode=True)
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
insert_sql = """ insert into jobbole_article(title, url, create_date, fav_nums) VALUES (%s, %s, %s, %s) """
self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
self.conn.commit()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
保存到數據庫的(異步Twisted)編寫
因為我們的爬取速度可能大於數據庫存儲的速度。異步操作。
設置可配置參數
seeting.py設置
MYSQL_HOST = "127.0.0.1"
MYSQL_DBNAME = "article_spider"
MYSQL_USER = "root"
MYSQL_PASSWORD = "123456"
- 1
- 2
- 3
- 4
代碼中獲取到設置的可配置參數
twisted異步:
import MySQLdb.cursors
from twisted.enterprise import adbapi
#連接池ConnectionPool
# def __init__(self, dbapiName, *connargs, **connkw):
class MysqlTwistedPipline(object):
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
#**dbparms-->("MySQLdb",host=settings['MYSQL_HOST']
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider):
#使用twisted將mysql插入變成異步執行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider) #處理異常
def handle_error(self, failure, item, spider):
#處理異步插入的異常
print (failure)
def do_insert(self, cursor, item):
#執行具體的插入
#根據不同的item 構建不同的sql語句並插入到mysql中
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
可選django.items
https://github.com/scrapy-plugins/scrapy-djangoitem
可以讓我們保存的item直接變成django的models.
scrapy的itemloader來維護提取代碼
itemloadr提供了一個容器,讓我們配置某一個字段該使用哪種規則。
add_css add_value add_xpath
from scrapy.loader import ItemLoader
# 通過item loader加載item
front_image_url = response.meta.get("front_image_url", "") # 文章封面圖
item_loader = ItemLoader(item=JobBoleArticleItem(), response=response)
item_loader.add_css("title", ".entry-header h1::text")
item_loader.add_value("url", response.url)
item_loader.add_value("url_object_id", get_md5(response.url))
item_loader.add_css("create_date", "p.entry-meta-hide-on-mobile::text")
item_loader.add_value("front_image_url", [front_image_url])
item_loader.add_css("praise_nums", ".vote-post-up h10::text")
item_loader.add_css("comment_nums", "a[href='#article-comment'] span::text")
item_loader.add_css("fav_nums", ".bookmark-btn::text")
item_loader.add_css("tags", "p.entry-meta-hide-on-mobile a::text")
item_loader.add_css("content", "div.entry")
#調用這個方法來對規則進行解析生成item對象
article_item = item_loader.load_item()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17

- 所有值變成了list
- 對於這些值做一些處理函數
item.py中對於item process處理函數
MapCompose可以傳入函數對於該字段進行處理,而且可以傳入多個
from scrapy.loader.processors import MapCompose
def add_mtianyan(value):
return value+"-mtianyan"
title = scrapy.Field(
input_processor=MapCompose(lambda x:x+"mtianyan",add_mtianyan),
)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
注意:此處的自定義方法一定要寫在代碼前面。
create_date = scrapy.Field(
input_processor=MapCompose(date_convert),
output_processor=TakeFirst()
)
- 1
- 2
- 3
- 4
只取list中的第一個值。
自定義itemloader實現默認提取第一個
class ArticleItemLoader(ItemLoader):
#自定義itemloader實現默認提取第一個
default_output_processor = TakeFirst()
- 1
- 2
- 3
list保存原值
def return_value(value):
return value
front_image_url = scrapy.Field(
output_processor=MapCompose(return_value)
)
- 1
- 2
- 3
- 4
- 5
- 6
下載圖片pipeline增加if增強通用性
class ArticleImagePipeline(ImagesPipeline):
#重寫該方法可從result中獲取到圖片的實際下載地址
def item_completed(self, results, item, info):
if "front_image_url" in item:
for ok, value in results:
image_file_path = value["path"]
item["front_image_path"] = image_file_path
return item
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
自定義的item帶處理函數的完整代碼
class JobBoleArticleItem(scrapy.Item):
title = scrapy.Field()
create_date = scrapy.Field(
input_processor=MapCompose(date_convert),
)
url = scrapy.Field()
url_object_id = scrapy.Field()
front_image_url = scrapy.Field(
output_processor=MapCompose(return_value)
)
front_image_path = scrapy.Field()
praise_nums = scrapy.Field(
input_processor=MapCompose(get_nums)
)
comment_nums = scrapy.Field(
input_processor=MapCompose(get_nums)
)
fav_nums = scrapy.Field(
input_processor=MapCompose(get_nums)
)
#因為tag本身是list,所以要重寫
tags = scrapy.Field(
input_processor=MapCompose(remove_comment_tags),
output_processor=Join(",")
)
content = scrapy.Field()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
三、知乎網問題和答案爬取
1. 基礎知識
session和cookie機制
cookie:
瀏覽器支持的存儲方式
key-valuehttp無狀態請求,兩次請求沒有聯系

session的工作原理
(1)當一個session第一次被啟用時,一個唯一的標識被存儲於本地的cookie中。
(2)首先使用session_start()函數,從session倉庫中加載已經存儲的session變量。
(3)通過使用session_register()函數注冊session變量。
(4)腳本執行結束時,未被銷毀的session變量會被自動保存在本地一定路徑下的session庫中.
request模擬知乎的登錄
http狀態碼

獲取crsftoken
def get_xsrf():
#獲取xsrf code
response = requests.get("https://www.zhihu.com",headers =header)
# # print(response.text)
# text ='<input type="hidden" name="_xsrf" value="ca70366e5de5d133c3ae09fb16d9b0fa"/>'
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response.text)
if match_obj:
return (match_obj.group(1))
else:
return ""
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
python模擬知乎登錄代碼:
# _*_ coding: utf-8 _*_
import requests
try:
import cookielib
except:
import http.cookiejar as cookielib
import re
__author__ = 'mtianyan'
__date__ = '2017/5/23 16:42'
import requests
try:
import cookielib
except:
import http.cookiejar as cookielib
import re
session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename="cookies.txt")
try:
session.cookies.load(ignore_discard=True)
except:
print ("cookie未能加載")
agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36"
header = {
"HOST":"www.zhihu.com",
"Referer": "https://www.zhizhu.com",
'User-Agent': agent
}
def is_login():
#通過個人中心頁面返回狀態碼來判斷是否為登錄狀態
inbox_url = "https://www.zhihu.com/question/56250357/answer/148534773"
response = session.get(inbox_url, headers=header, allow_redirects=False)
if response.status_code != 200:
return False
else:
return True
def get_xsrf():
#獲取xsrf code
response = session.get("https://www.zhihu.com", headers=header)
response_text = response.text
#reDOTAll 匹配全文
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1))
return xsrf
def get_index():
response = session.get("https://www.zhihu.com", headers=header)
with open("index_page.html", "wb") as f:
f.write(response.text.encode("utf-8"))
print ("ok")
def get_captcha():
import time
t = str(int(time.time()*1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
t = session.get(captcha_url, headers=header)
with open("captcha.jpg","wb") as f:
f.write(t.content)
f.close()
from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass
captcha = input("輸入驗證碼\n>")
return captcha
def zhihu_login(account, password):
#知乎登錄
if re.match("^1\d{10}",account):
print ("手機號碼登錄")
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": get_xsrf(),
"phone_num": account,
"password": password,
"captcha":get_captcha()
}
else:
if "@" in account:
#判斷用戶名是否為郵箱
print("郵箱方式登錄")
post_url = "https://www.zhihu.com/login/email"
post_data = {
"_xsrf": get_xsrf(),
"email": account,
"password": password
}
response_text = session.post(post_url, data=post_data, headers=header)
session.cookies.save()
# get_index()
# is_login()
# get_captcha()
zhihu_login("phone", "password")
zhihu_login("shouji", "mima")
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
2. scrapy創建知乎爬蟲登錄
scrapy genspider zhihu www.zhihu.com
- 1
因為知乎我們需要先進行登錄,所以我們重寫它的start_requests
def start_requests(self):
return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, callback=self.login)]
- 1
- 2
- 3
-
下載首頁然后回調login函數。
-
login函數請求驗證碼並回調login_after_captcha函數.此處通過meta將post_data傳送出去,后面的回調函數來用。
def login(self, response):
response_text = response.text
#獲取xsrf。
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1))
if xsrf:
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": xsrf,
"phone_num": "phone",
"password": "password",
"captcha": ""
}
import time
t = str(int(time.time() * 1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
#請求驗證碼並回調login_after_captcha.
yield scrapy.Request(captcha_url, headers=self.headers,
meta={"post_data":post_data}, callback=self.login_after_captcha)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- login_after_captcha函數將驗證碼圖片保存到本地,然后使用PIL庫打開圖片,肉眼識別后在控制台輸入驗證碼值
然后接受步驟一的meta數據,一並提交至登錄接口。回調check_login檢查是否登錄成功。
def login_after_captcha(self, response):
with open("captcha.jpg", "wb") as f:
f.write(response.body)
f.close()
from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass
captcha = input("輸入驗證碼\n>")
post_data = response.meta.get("post_data", {})
post_url = "https://www.zhihu.com/login/phone_num"
post_data["captcha"] = captcha
return [scrapy.FormRequest(
url=post_url,
formdata=post_data,
headers=self.headers,
callback=self.check_login
)]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- check_login函數,驗證服務器的返回數據判斷是否成功
scrapy會對request的URL去重(RFPDupeFilter),加上dont_filter則告訴它這個URL不參與去重.
源碼中的startrequest:
def start_requests(self):
for url in self.start_urls:
yield self.make_requests_from_url(url)
- 1
- 2
- 3
我們將原本的start_request的代碼放在了現在重寫的,回調鏈最后的check_login
def check_login(self, response):
#驗證服務器的返回數據判斷是否成功
text_json = json.loads(response.text)
if "msg" in text_json and text_json["msg"] == "登錄成功":
for url in self.start_urls:
yield scrapy.Request(url, dont_filter=True, headers=self.headers)
- 1
- 2
- 3
- 4
- 5
- 6

3. 知乎數據表設計

上圖為知乎答案版本1

上圖為知乎答案版本2
設置數據表字段
| 問題字段 | 回答字段 |
|---|---|
| zhihu_id | zhihu_id |
| topics | url |
| url | question_id |
| title | author_id |
| content | content |
| answer_num | parise_num |
| comments_num | comments_num |
| watch_user_num | create_time |
| click_num | update_time |
| crawl_time | crawl_time |


知乎url分析
點具體問題下查看更多。
可獲得接口:
重點參數:
offset=43
isend = true
next

href=”/question/25460323”
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
- 1
- 從首頁獲取所有a標簽。如果提取的url中格式為 /question/xxx 就下載之后直接進入解析函數parse_question
如果不是question頁面則直接進一步跟蹤。
def parse(self, response):
""" 提取出html頁面中的所有url 並跟蹤這些url進行一步爬取 如果提取的url中格式為 /question/xxx 就下載之后直接進入解析函數 """
all_urls = response.css("a::attr(href)").extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
#使用lambda函數對於每一個url進行過濾,如果是true放回列表,返回false去除。
all_urls = filter(lambda x:True if x.startswith("https") else False, all_urls)
for url in all_urls:
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
if match_obj:
# 如果提取到question相關的頁面則下載后交由提取函數進行提取
request_url = match_obj.group(1)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
else:
# 如果不是question頁面則直接進一步跟蹤
yield scrapy.Request(url, headers=self.headers, callback=self.parse)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 進入parse_question函數處理
**創建我們的item
item要用到的方法ArticleSpider\utils\common.py:
def extract_num(text):
#從字符串中提取出數字
match_re = re.match(".*?(\d+).*", text)
if match_re:
nums = int(match_re.group(1))
else:
nums = 0
return nums
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
setting.py中設置
SQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
SQL_DATE_FORMAT = "%Y-%m-%d"
使用:
from ArticleSpider.settings import SQL_DATETIME_FORMAT
- 1
知乎的問題 item
class ZhihuQuestionItem(scrapy.Item):
#知乎的問題 item
zhihu_id = scrapy.Field()
topics = scrapy.Field()
url = scrapy.Field()
title = scrapy.Field()
content = scrapy.Field()
answer_num = scrapy.Field()
comments_num = scrapy.Field()
watch_user_num = scrapy.Field()
click_num = scrapy.Field()
crawl_time = scrapy.Field()
def get_insert_sql(self):
#插入知乎question表的sql語句
insert_sql = """ insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num), watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num) """
zhihu_id = self["zhihu_id"][0]
topics = ",".join(self["topics"])
url = self["url"][0]
title = "".join(self["title"])
content = "".join(self["content"])
answer_num = extract_num("".join(self["answer_num"]))
comments_num = extract_num("".join(self["comments_num"]))
if len(self["watch_user_num"]) == 2:
watch_user_num = int(self["watch_user_num"][0])
click_num = int(self["watch_user_num"][1])
else:
watch_user_num = int(self["watch_user_num"][0])
click_num = 0
crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)
params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time)
return insert_sql, params
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
知乎問題回答item
class ZhihuAnswerItem(scrapy.Item):
#知乎的問題回答item
zhihu_id = scrapy.Field()
url = scrapy.Field()
question_id = scrapy.Field()
author_id = scrapy.Field()
content = scrapy.Field()
parise_num = scrapy.Field()
comments_num = scrapy.Field()
create_time = scrapy.Field()
update_time = scrapy.Field()
crawl_time = scrapy.Field()
def get_insert_sql(self):
#插入知乎question表的sql語句
insert_sql = """ insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num, create_time, update_time, crawl_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num), update_time=VALUES(update_time) """
create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT)
update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT)
params = (
self["zhihu_id"], self["url"], self["question_id"],
self["author_id"], self["content"], self["parise_num"],
self["comments_num"], create_time, update_time,
self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
)
return insert_sql, params
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
有了兩個item之后,我們繼續完善我們的邏輯
def parse_question(self, response):
#處理question頁面, 從頁面中提取出具體的question item
if "QuestionHeader-title" in response.text:
#處理新版本
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css("title", "h1.QuestionHeader-title::text")
item_loader.add_css("content", ".QuestionHeader-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", ".List-headerText span::text")
item_loader.add_css("comments_num", ".QuestionHeader-actions button::text")
item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text")
question_item = item_loader.load_item()
else:
#處理老版本頁面的item提取
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
# item_loader.add_css("title", ".zh-question-title h2 a::text")
item_loader.add_xpath("title", "//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()")
item_loader.add_css("content", "#zh-question-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", "#zh-question-answer-num::text")
item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text")
# item_loader.add_css("watch_user_num", "#zh-question-side-header-wrap::text")
item_loader.add_xpath("watch_user_num", "//*[@id='zh-question-side-header-wrap']/text()|//*[@class='zh-question-followers-sidebar']/div/a/strong/text()")
item_loader.add_css("topics", ".zm-tag-editor-labels a::text")
question_item = item_loader.load_item()
yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
yield question_item
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
處理問題回答提取出需要的字段
def parse_answer(self, reponse):
#處理question的answer
ans_json = json.loads(reponse.text)
is_end = ans_json["paging"]["is_end"]
next_url = ans_json["paging"]["next"]
#提取answer的具體字段
for answer in ans_json["data"]:
answer_item = ZhihuAnswerItem()
answer_item["zhihu_id"] = answer["id"]
answer_item["url"] = answer["url"]
answer_item["question_id"] = answer["question"]["id"]
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
answer_item["content"] = answer["content"] if "content" in answer else None
answer_item["parise_num"] = answer["voteup_count"]
answer_item["comments_num"] = answer["comment_count"]
answer_item["create_time"] = answer["created_time"]
answer_item["update_time"] = answer["updated_time"]
answer_item["crawl_time"] = datetime.datetime.now()
yield answer_item
if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
知乎提取字段流程圖:

深度優先:
1. 提取出頁面所有的url,並過濾掉不需要的url
2. 如果是questionurl就進入question的解析
3. 把該問題的爬取完了然后就返回初始解析
將item寫入數據庫
pipelines.py錯誤處理
插入時錯誤可通過該方法監控
def handle_error(self, failure, item, spider):
#處理異步插入的異常
print (failure)
- 1
- 2
- 3
改造pipeline使其變得更通用
原本具體硬編碼的pipeline
def do_insert(self, cursor, item):
#執行具體的插入
insert_sql = """ insert into jobbole_article(title, url, create_date, fav_nums) VALUES (%s, %s, %s, %s) """
cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
改寫后的:
def do_insert(self, cursor, item):
#根據不同的item 構建不同的sql語句並插入到mysql中
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)
- 1
- 2
- 3
- 4
可選方法一:
if item.__class__.__name__ == "JobBoleArticleItem":
#執行具體的插入
insert_sql = """ insert into jobbole_article(title, url, create_date, fav_nums) VALUES (%s, %s, %s, %s) """
cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
推薦方法:
把sql語句等放到item里面:
jobboleitem類內部方法
def get_insert_sql(self):
insert_sql = """ insert into jobbole_article(title, url, create_date, fav_nums) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(fav_nums) """
params = (self["title"], self["url"], self["create_date"], self["fav_nums"])
return insert_sql, params
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
知乎問題:
def get_insert_sql(self):
#插入知乎question表的sql語句
insert_sql = """ insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num), watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num) """
zhihu_id = self["zhihu_id"][0]
topics = ",".join(self["topics"])
url = self["url"][0]
title = "".join(self["title"])
content = "".join(self["content"])
answer_num = extract_num("".join(self["answer_num"]))
comments_num = extract_num("".join(self["comments_num"]))
if len(self["watch_user_num"]) == 2:
watch_user_num = int(self["watch_user_num"][0])
click_num = int(self["watch_user_num"][1])
else:
watch_user_num = int(self["watch_user_num"][0])
click_num = 0
crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)
params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time)
return insert_sql, params
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
知乎回答:
def get_insert_sql(self):
#插入知乎回答表的sql語句
insert_sql = """ insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num, create_time, update_time, crawl_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num), update_time=VALUES(update_time) """
create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT)
update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT)
params = (
self["zhihu_id"], self["url"], self["question_id"],
self["author_id"], self["content"], self["parise_num"],
self["comments_num"], create_time, update_time,
self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
)
return insert_sql, params
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
第二次爬取到相同數據,更新數據
ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num), watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
- 1
- 2
調試技巧
if match_obj:
#如果提取到question相關的頁面則下載后交由提取函數進行提取
request_url = match_obj.group(1)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
#方便調試
break
else:
#方便調試
pass
#如果不是question頁面則直接進一步跟蹤
#方便調試
# yield scrapy.Request(url, headers=self.headers, callback=self.parse)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
#方便調試
# yield question_item
- 1
- 2
錯誤排查
[key error] title
pipeline中debug定位到哪一個item的錯誤。
四、通過CrawlSpider對招聘網站拉鈎網進行整站爬取
推薦工具cmder
http://cmder.net/
下載full版本,使我們在windows環境下也可以使用linux部分命令。
配置path環境變量
1. 設計拉勾網的數據表結構

2. 初始化拉鈎網項目並解讀crawl源碼
scrapy genspider --list
查看可使用的初始化模板
ailable templates:
- basic
- crawl
- csvfeed
- xmlfeed
scrapy genspider -t crawl lagou www.lagou.com
- 1
cmd與pycharm不同,mark root
setting.py 設置目錄
crawl模板
class LagouSpider(CrawlSpider):
name = 'lagou'
allowed_domains = ['www.lagou.com']
start_urls = ['http://www.lagou.com/']
rules = (
Rule(LinkExtractor(allow=r'Items/'), callback='parse_item', follow=True),
)
def parse_item(self, response):
i = {}
#i['domain_id'] = response.xpath('//input[@id="sid"]/@value').extract()
#i['name'] = response.xpath('//div[@id="name"]').extract()
#i['description'] = response.xpath('//div[@id="description"]').extract()
return i
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
源碼閱讀剖析
https://doc.scrapy.org/en/1.3/topics/spiders.html#crawlspider
提供了一些可以讓我們進行簡單的follow的規則,link,迭代爬取
rules:
規則,crawel spider讀取並執行
parse_start_url(response):
example:
rules是一個可迭代對象,里面有Rule實例->LinkExtractor的分析
allow=('category\.php', ), callback='parse_item',
allow允許的url模式。callback,要回調的函數名。
因為rules里面沒有self,無法獲取到方法。
import scrapy
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
class MySpider(CrawlSpider):
name = 'example.com'
allowed_domains = ['example.com']
start_urls = ['http://www.example.com']
rules = (
# Extract links matching 'category.php' (but not matching 'subsection.php')
# and follow links from them (since no callback means follow=True by default).
Rule(LinkExtractor(allow=('category\.php', ), deny=('subsection\.php', ))),
# Extract links matching 'item.php' and parse them with the spider's method parse_item
Rule(LinkExtractor(allow=('item\.php', )), callback='parse_item'),
)
def parse_item(self, response):
self.logger.info('Hi, this is an item page! %s', response.url)
item = scrapy.Item()
item['id'] = response.xpath('//td[@id="item_id"]/text()').re(r'ID: (\d+)')
item['name'] = response.xpath('//td[@id="item_name"]/text()').extract()
item['description'] = response.xpath('//td[@id="item_description"]/text()').extract()
return item
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
分析拉勾網模板代碼
1. 將http加上s
2. 重命名parse_item為我們自定義的parse_job
3. 點擊class LagouSpider(CrawlSpider):的CrawlSpider,進入crawl源碼
4. class CrawlSpider(Spider):可以看出它繼承於spider
5. 入口:def start_requests(self):
6. alt+左右方向鍵,不同代碼跳轉
7. 5->之后默認parse CrawlSpider里面有parse函數。但是這次我們不能向以前一樣覆蓋
Crawl.py核心函數parse。
parse函數調用_parse_response
def parse(self, response):
return self._parse_response(response, self.parse_start_url, cb_kwargs={}, follow=True)
- 1
- 2
_parse_response
- 判斷是否有callback即有沒有self.parse_start_url
- 我們可以重載parse_start_url加入自己的處理
- 把參數傳遞給函數,並調用process_results函數
_parse_response函數
def _parse_response(self, response, callback, cb_kwargs, follow=True):
if callback:
cb_res = callback(response, **cb_kwargs) or ()
cb_res = self.process_results(response, cb_res)
for requests_or_item in iterate_spider_output(cb_res):
yield requests_or_item
if follow and self._follow_links:
for request_or_item in self._requests_to_follow(response):
yield request_or_item
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
parse_start_url的return值將會被process_results方法接收處理
如果不重寫,因為返回為空,然后就相當於什么都沒做
def process_results(self, response, results):
return results
- 1
- 2
點擊followlink
def set_crawler(self, crawler):
super(CrawlSpider, self).set_crawler(crawler)
self._follow_links = crawler.settings.getbool('CRAWLSPIDER_FOLLOW_LINKS', True)
- 1
- 2
- 3
如果setting中有這個參數,則可以進一步執行到parse
_requests_to_follow
1. 判斷傳入的是不是response,如果不是直接returns
2. 針對當前response設置一個空set,去重
3. 把self的rules通過enumerate變成一個可迭代對象
4. 跳轉rules詳情
5. 拿到link通過link_extractor.extract_links抽取出具體的link
6. 執行我們的process_links
7. link制作完成發起Request,回調_response_downloaded函數
8. 然后執行parse_respose
def _requests_to_follow(self, response):
if not isinstance(response, HtmlResponse):
return
seen = set()
for n, rule in enumerate(self._rules):
links = [lnk for lnk in rule.link_extractor.extract_links(response)
if lnk not in seen]
if links and rule.process_links:
links = rule.process_links(links)
for link in links:
seen.add(link)
r = Request(url=link.url, callback=self._response_downloaded)
r.meta.update(rule=n, link_text=link.text)
yield rule.process_request(r)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
_compile_rules
- 在我們初始化時會調用_compile_rules
copy.copy(r) for r in self.rules]將我們的rules進行一個copy- 調用回調函數get_method。
- 調用rules里面我們定義的process_links
- 調用rules里面我們定義的process_request
def _compile_rules(self):
def get_method(method):
if callable(method):
return method
elif isinstance(method, six.string_types):
return getattr(self, method, None)
self._rules = [copy.copy(r) for r in self.rules]
for rule in self._rules:
rule.callback = get_method(rule.callback)
rule.process_links = get_method(rule.process_links)
rule.process_request = get_method(rule.process_request)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
self.process_links = process_links
self.process_request = process_request
- 1
- 2
- 3
可以通過在rules里面傳入我們自己的處理函數,實現對url的自定義。
達到負載均衡,多地不同ip訪問。
_response_downloaded
通過rule取到具體的rule
調用我們自己的回調函數
def _response_downloaded(self, response):
rule = self._rules[response.meta['rule']]
return self._parse_response(response, rule.callback, rule.cb_kwargs, rule.follow)
- 1
- 2
- 3
- allow :符合這個url我就爬取
- deny : 符合這個url規則我就放棄
- allow_domin : 這個域名下的我才處理
- allow_domin : 這個域名下的我不處理
- restrict_xpaths:進一步限定xpath
self, allow=(), deny=(), allow_domains=(), deny_domains=(), restrict_xpaths=(),
tags=('a', 'area'), attrs=('href',), canonicalize=True,
unique=True, process_value=None, deny_extensions=None, restrict_css=()
- 1
- 2
- 3
extract_links
如果有restrict_xpaths,他會進行讀取執行
def extract_links(self, response):
base_url = get_base_url(response)
if self.restrict_xpaths:
docs = [subdoc
for x in self.restrict_xpaths
for subdoc in response.xpath(x)]
else:
docs = [response.selector]
all_links = []
for doc in docs:
links = self._extract_links(doc, response.url, response.encoding, base_url)
all_links.extend(self._process_links(links))
return unique_list(all_links)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
get_base_url:
urllib.parse.urljoin替我們拼接好url
def get_base_url(text, baseurl='', encoding='utf-8'):
"""Return the base url if declared in the given HTML `text`, relative to the given base url. If no base url is found, the given `baseurl` is returned. """
text = to_unicode(text, encoding)
m = _baseurl_re.search(text)
if m:
return moves.urllib.parse.urljoin(
safe_url_string(baseurl),
safe_url_string(m.group(1), encoding=encoding)
)
else:
return safe_url_string(baseurl)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
編寫rule規則
rules = (
Rule(LinkExtractor(allow=("zhaopin/.*",)), follow=True),
Rule(LinkExtractor(allow=("gongsi/j\d+.html",)), follow=True),
Rule(LinkExtractor(allow=r'jobs/\d+.html'), callback='parse_job', follow=True),
)
- 1
- 2
- 3
- 4
- 5
3. 設計lagou的items
需要用到的方法
from w3lib.html import remove_tags
def remove_splash(value):
#去掉工作城市的斜線
return value.replace("/","")
def handle_jobaddr(value):
addr_list = value.split("\n")
addr_list = [item.strip() for item in addr_list if item.strip()!="查看地圖"]
return "".join(addr_list)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
定義好的item
class LagouJobItem(scrapy.Item):
#拉勾網職位信息
title = scrapy.Field()
url = scrapy.Field()
url_object_id = scrapy.Field()
salary = scrapy.Field()
job_city = scrapy.Field(
input_processor=MapCompose(remove_splash),
)
work_years = scrapy.Field(
input_processor = MapCompose(remove_splash),
)
degree_need = scrapy.Field(
input_processor = MapCompose(remove_splash),
)
job_type = scrapy.Field()
publish_time = scrapy.Field()
job_advantage = scrapy.Field()
job_desc = scrapy.Field()
job_addr = scrapy.Field(
input_processor=MapCompose(remove_tags, handle_jobaddr),
)
company_name = scrapy.Field()
company_url = scrapy.Field()
tags = scrapy.Field(
input_processor = Join(",")
)
crawl_time = scrapy.Field()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
重寫的itemloader
設置默認只提取第一個
class LagouJobItemLoader(ItemLoader):
#自定義itemloader
default_output_processor = TakeFirst()
- 1
- 2
- 3
4. 提取字段值並存入數據庫
def parse_job(self, response):
#解析拉勾網的職位
item_loader = LagouJobItemLoader(item=LagouJobItem(), response=response)
item_loader.add_css("title", ".job-name::attr(title)")
item_loader.add_value("url", response.url)
item_loader.add_value("url_object_id", get_md5(response.url))
item_loader.add_css("salary", ".job_request .salary::text")
item_loader.add_xpath("job_city", "//*[@class='job_request']/p/span[2]/text()")
item_loader.add_xpath("work_years", "//*[@class='job_request']/p/span[3]/text()")
item_loader.add_xpath("degree_need", "//*[@class='job_request']/p/span[4]/text()")
item_loader.add_xpath("job_type", "//*[@class='job_request']/p/span[5]/text()")
item_loader.add_css("tags", '.position-label li::text')
item_loader.add_css("publish_time", ".publish_time::text")
item_loader.add_css("job_advantage", ".job-advantage p::text")
item_loader.add_css("job_desc", ".job_bt div")
item_loader.add_css("job_addr", ".work_addr")
item_loader.add_css("company_name", "#job_company dt a img::attr(alt)")
item_loader.add_css("company_url", "#job_company dt a::attr(href)")
item_loader.add_value("crawl_time", datetime.now())
job_item = item_loader.load_item()
return job_item
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
獲得的拉勾網item數據

5. items中添加get_insert_sql實現存入數據庫
def get_insert_sql(self):
insert_sql = """ insert into lagou_job(title, url, url_object_id, salary, job_city, work_years, degree_need, job_type, publish_time, job_advantage, job_desc, job_addr, company_name, company_url, tags, crawl_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE salary=VALUES(salary), job_desc=VALUES(job_desc) """
params = (
self["title"], self["url"], self["url_object_id"], self["salary"], self["job_city"],
self["work_years"], self["degree_need"], self["job_type"],
self["publish_time"], self["job_advantage"], self["job_desc"],
self["job_addr"], self["company_name"], self["company_url"],
self["job_addr"], self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
)
return insert_sql, params
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
五、爬蟲與反爬蟲
1. 基礎知識
如何使我們的爬蟲不被禁止掉
爬蟲:
自動獲取數據的程序,關鍵是批量的獲取
反爬蟲:
使用技術手段防止爬蟲程序的方法
誤傷:
反爬蟲技術將普通用戶識別為爬蟲,效果再好也不能用
學校,網吧,出口的公網ip只有一個,所以禁止ip不能用。
ip動態分配。a爬封b
成本:
反爬蟲人力和機器成本
攔截:
攔截率越高,誤傷率越高
反爬蟲的目的:

爬蟲與反爬蟲的對抗過程:

使用檢查可以查看到價格,而查看網頁源代碼無法查看到價格字段。
scrapy下載到的網頁時網頁源代碼。
js(ajax)填充的動態數據無法通過網頁獲取到。
2. scrapy架構及源碼介紹


- 我們編寫的spider,然后yield一個request發送給engine
- engine拿到什么都不做然后給scheduler
- engine會生成一個request給engine
- engine拿到之后通過downloadermiddleware 給downloader
- downloader再發送response回來給engine。
- engine拿到之后,response給spider。
- spider進行處理,解析出item & request,
- item->給itempipeline;如果是request,跳轉步驟二
path:articlespider3\Lib\site-packages\scrapy\core
- engine.py:
- scheduler.py
-
downloader
-
item
- pipeline
- spider
engine.py:重要函數schedule
- enqueue_request:把request放scheduler
- _next_request_from_scheduler:從調度器拿。
def schedule(self, request, spider):
self.signals.send_catch_log(signal=signals.request_scheduled,
request=request, spider=spider)
if not self.slot.scheduler.enqueue_request(request):
self.signals.send_catch_log(signal=signals.request_dropped,
request=request, spider=spider)
- 1
- 2
- 3
- 4
- 5
- 6
articlespider3\Lib\site-packages\scrapy\core\downloader\handlers
支持文件,ftp,http下載(https).
后期定制middleware:
- spidermiddlewire
- downloadmiddlewire
django和scrapy結構類似
3. scrapy的兩個重要類:request和response
類似於django httprequest
yield Request(url=parse.urljoin(response.url, post_url))
- 1
request參數:
class Request(object_ref):
def __init__(self, url, callback=None, method='GET', headers=None, body=None, cookies=None, meta=None, encoding='utf-8', priority=0, dont_filter=False, errback=None):
- 1
- 2
- 3
- 4
- 5
cookies:
Lib\site-packages\scrapy\downloadermiddlewares\cookies.py
cookiejarkey = request.meta.get("cookiejar")
- 1
- priority: 優先級,影響調度順序
- dont_filter:我的同樣的request不會被過濾
- errback:錯誤時的回調函數
https://doc.scrapy.org/en/1.2/topics/request-response.html?highlight=response
errback example:
class ErrbackSpider(scrapy.Spider):
name = "errback_example"
start_urls = [
"http://www.httpbin.org/", # HTTP 200 expected
"http://www.httpbin.org/status/404", # Not found error
"http://www.httpbin.org/status/500", # server issue
"http://www.httpbin.org:12345/", # non-responding host, timeout expected
"http://www.httphttpbinbin.org/", # DNS error expected
]
def start_requests(self):
for u in self.start_urls:
yield scrapy.Request(u, callback=self.parse_httpbin,
errback=self.errback_httpbin,
dont_filter=True)
def parse_httpbin(self, response):
self.logger.info('Got successful response from {}'.format(response.url))
# do something useful here...
def errback_httpbin(self, failure):
# log all failures
self.logger.error(repr(failure))
# in case you want to do something special for some errors,
# you may need the failure's type:
if failure.check(HttpError):
# these exceptions come from HttpError spider middleware
# you can get the non-200 response
response = failure.value.response
self.logger.error('HttpError on %s', response.url)
elif failure.check(DNSLookupError):
# this is the original request
request = failure.request
self.logger.error('DNSLookupError on %s', request.url)
elif failure.check(TimeoutError, TCPTimedOutError):
request = failure.request
self.logger.error('TimeoutError on %s', request.url)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
response類
def __init__(self, url, status=200, headers=None, body=b'', flags=None, request=None):
self.headers = Headers(headers or {})
- 1
- 2
response的參數:
request:yield出來的request,會放在response,讓我們知道它是從哪里來的
4. 自行編寫隨機更換useagent
- setting中設置
user_agent_list = [
'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0',
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36',
]
- 1
- 2
- 3
- 4
然后在代碼中使用。
from settings import user_agent_list
import random
random_index =random.randint(0,len(user_agent_list))
random_agent = user_agent_list[random_index]
'User-Agent': random_agent
- 1
- 2
- 3
- 4
- 5
- 6
import random
random_index = random.randint(0, len(user_agent_list))
random_agent = user_agent_list[random_index]
self.headers["User-Agent"] = random_agent
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
- 1
- 2
- 3
- 4
- 5
但是問題:每個request之前都得這樣做。
5. middlewire配置及編寫fake UseAgent代理池
取消DOWNLOADER_MIDDLEWARES的注釋狀態
DOWNLOADER_MIDDLEWARES = {
'ArticleSpider.middlewares.MyCustomDownloaderMiddleware': 543,
}
- 1
- 2
- 3
articlespider3\Lib\site-packages\scrapy\downloadermiddlewares\useragent.py
class UserAgentMiddleware(object):
"""This middleware allows spiders to override the user_agent"""
def __init__(self, user_agent='Scrapy'):
self.user_agent = user_agent
@classmethod
def from_crawler(cls, crawler):
o = cls(crawler.settings['USER_AGENT'])
crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
return o
def spider_opened(self, spider):
self.user_agent = getattr(spider, 'user_agent', self.user_agent)
def process_request(self, request, spider):
if self.user_agent:
request.headers.setdefault(b'User-Agent', self.user_agent)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
重要方法process_request
**配置默認useagent為none
DOWNLOADER_MIDDLEWARES = {
'ArticleSpider.middlewares.MyCustomDownloaderMiddleware': 543,
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None
}
- 1
- 2
- 3
- 4
使用fakeuseragent
pip install fake-useragent
settinf.py設置隨機模式RANDOM_UA_TYPE = "random"
from fake_useragent import UserAgent
class RandomUserAgentMiddlware(object):
#隨機更換user-agent
def __init__(self, crawler):
super(RandomUserAgentMiddlware, self).__init__()
self.ua = UserAgent()
self.ua_type = crawler.settings.get("RANDOM_UA_TYPE", "random")
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_request(self, request, spider):
def get_ua():
return getattr(self.ua, self.ua_type)
request.headers.setdefault('User-Agent', get_ua())
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
6. 使用西刺代理創建ip代理池保存到數據庫*
ip動態變化:重啟路由器等
ip代理的原理:
不直接發送自己真實ip,而使用中間代理商(代理服務器),那么服務器不知道我們的ip也就不會把我們禁掉
setting.py設置
“
class RandomProxyMiddleware(object):
#動態設置ip代理
def process_request(self, request, spider):
request.meta["proxy"] = "http://111.198.219.151:8118"
- 1
- 2
- 3
- 4
使用西刺代理創建代理池保存到數據庫
# _*_ coding: utf-8 _*_
__author__ = 'mtianyan'
__date__ = '2017/5/24 16:27'
import requests
from scrapy.selector import Selector
import MySQLdb
conn = MySQLdb.connect(host="127.0.0.1", user="root", passwd="password", db="article_spider", charset="utf8")
cursor = conn.cursor()
def crawl_ips():
#爬取西刺的免費ip代理
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0"}
for i in range(1568):
re = requests.get("http://www.xicidaili.com/nn/{0}".format(i), headers=headers)
selector = Selector(text=re.text)
all_trs = selector.css("#ip_list tr")
ip_list = []
for tr in all_trs[1:]:
speed_str = tr.css(".bar::attr(title)").extract()[0]
if speed_str:
speed = float(speed_str.split("秒")[0])
all_texts = tr.css("td::text").extract()
ip = all_texts[0]
port = all_texts[1]
proxy_type = all_texts[5]
ip_list.append((ip, port, proxy_type, speed))
for ip_info in ip_list:
cursor.execute(
"insert proxy_ip(ip, port, speed, proxy_type) VALUES('{0}', '{1}', {2}, 'HTTP')".format(
ip_info[0], ip_info[1], ip_info[3]
)
)
conn.commit()
class GetIP(object):
def delete_ip(self, ip):
#從數據庫中刪除無效的ip
delete_sql = """ delete from proxy_ip where ip='{0}' """.format(ip)
cursor.execute(delete_sql)
conn.commit()
return True
def judge_ip(self, ip, port):
#判斷ip是否可用
http_url = "http://www.baidu.com"
proxy_url = "http://{0}:{1}".format(ip, port)
try:
proxy_dict = {
"http":proxy_url,
}
response = requests.get(http_url, proxies=proxy_dict)
except Exception as e:
print ("invalid ip and port")
self.delete_ip(ip)
return False
else:
code = response.status_code
if code >= 200 and code < 300:
print ("effective ip")
return True
else:
print ("invalid ip and port")
self.delete_ip(ip)
return False
def get_random_ip(self):
#從數據庫中隨機獲取一個可用的ip
random_sql = """ SELECT ip, port FROM proxy_ip ORDER BY RAND() LIMIT 1 """
result = cursor.execute(random_sql)
for ip_info in cursor.fetchall():
ip = ip_info[0]
port = ip_info[1]
judge_re = self.judge_ip(ip, port)
if judge_re:
return "http://{0}:{1}".format(ip, port)
else:
return self.get_random_ip()
# print (crawl_ips())
if __name__ == "__main__":
get_ip = GetIP()
get_ip.get_random_ip()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
使用scrapy_proxies創建ip代理池
pip install scrapy_proxies
收費,但是簡單
https://github.com/scrapy-plugins/scrapy-crawlera
tor隱藏。vpn
http://www.theonionrouter.com/
7. 通過雲打碼實現驗證碼的識別
# _*_ coding: utf-8 _*_
__author__ = 'mtianyan'
__date__ = '2017/6/24 16:48'
import json
import requests
class YDMHttp(object):
apiurl = 'http://api.yundama.com/api.php'
username = ''
password = ''
appid = ''
appkey = ''
def __init__(self, username, password, appid, appkey):
self.username = username
self.password = password
self.appid = str(appid)
self.appkey = appkey
def balance(self):
data = {'method': 'balance', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey}
response_data = requests.post(self.apiurl, data=data)
ret_data = json.loads(response_data.text)
if ret_data["ret"] == 0:
print ("獲取剩余積分", ret_data["balance"])
return ret_data["balance"]
else:
return None
def login(self):
data = {'method': 'login', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey}
response_data = requests.post(self.apiurl, data=data)
ret_data = json.loads(response_data.text)
if ret_data["ret"] == 0:
print ("登錄成功", ret_data["uid"])
return ret_data["uid"]
else:
return None
def decode(self, filename, codetype, timeout):
data = {'method': 'upload', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey, 'codetype': str(codetype), 'timeout': str(timeout)}
files = {'file': open(filename, 'rb')}
response_data = requests.post(self.apiurl, files=files, data=data)
ret_data = json.loads(response_data.text)
if ret_data["ret"] == 0:
print ("識別成功", ret_data["text"])
return ret_data["text"]
else:
return None
def ydm(file_path):
username = ''
# 密碼
password = ''
# 軟件ID,開發者分成必要參數。登錄開發者后台【我的軟件】獲得!
appid =
# 軟件密鑰,開發者分成必要參數。登錄開發者后台【我的軟件】獲得!
appkey = ''
# 圖片文件
filename = 'image/1.jpg'
# 驗證碼類型,# 例:1004表示4位字母數字,不同類型收費不同。請准確填寫,否則影響識別率。在此查詢所有類型 http://www.yundama.com/price.html
codetype = 5000
# 超時時間,秒
timeout = 60
# 檢查
yundama = YDMHttp(username, password, appid, appkey)
if (username == 'username'):
print('請設置好相關參數再測試')
else:
# 開始識別,圖片路徑,驗證碼類型ID,超時時間(秒),識別結果
return yundama.decode(file_path, codetype, timeout);
if __name__ == "__main__":
# 用戶名
username = ''
# 密碼
password = ''
# 軟件ID,開發者分成必要參數。登錄開發者后台【我的軟件】獲得!
appid =
# 軟件密鑰,開發者分成必要參數。登錄開發者后台【我的軟件】獲得!
appkey = ''
# 圖片文件
filename = 'image/captcha.jpg'
# 驗證碼類型,# 例:1004表示4位字母數字,不同類型收費不同。請准確填寫,否則影響識別率。在此查詢所有類型 http://www.yundama.com/price.html
codetype = 5000
# 超時時間,秒
timeout = 60
# 檢查
if (username == 'username'):
print ('請設置好相關參數再測試')
else:
# 初始化
yundama = YDMHttp(username, password, appid, appkey)
# 登陸雲打碼
uid = yundama.login();
print('uid: %s' % uid)
# 登陸雲打碼
uid = yundama.login();
print ('uid: %s' % uid)
# 查詢余額
balance = yundama.balance();
print ('balance: %s' % balance)
# 開始識別,圖片路徑,驗證碼類型ID,超時時間(秒),識別結果
text = yundama.decode(filename, codetype, timeout);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
8. cookie的禁用。& 設置下載速度
http://scrapy-chs.readthedocs.io/zh_CN/latest/topics/autothrottle.html
setting.py:
# Disable cookies (enabled by default)
COOKIES_ENABLED = False
- 1
- 2
設置下載速度:
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
- 1
- 2
給不同的spider設置自己的setting值
custom_settings = {
"COOKIES_ENABLED": True
}
- 1
- 2
- 3
六、scrapy進階開發
1. Selenium動態頁面抓取
Selenium (瀏覽器自動化測試框架)
Selenium是一個用於Web應用程序測試的工具。Selenium測試直接運行在瀏覽器中,就像真正的用戶在操作一樣。支持的瀏覽器包括IE(7, 8, 9, 10, 11),Mozilla Firefox,Safari,Google Chrome,Opera等。這個工具的主要功能包括:測試與瀏覽器的兼容性——測試你的應用程序看是否能夠很好得工作在不同瀏覽器和操作系統之上。測試系統功能——創建回歸測試檢驗軟件功能和用戶需求。支持自動錄制動作和自動生成 .Net、Java、Perl等不同語言的測試腳本
安裝
pip install selenium
文檔地址:
http://selenium-python.readthedocs.io/api.html
安裝webdriver.exe
天貓價格獲取
from selenium import webdriver
from scrapy.selector import Selector
browser = webdriver.Chrome(executable_path="C:/chromedriver.exe")
#天貓價格獲取
browser.get("https://detail.tmall.com/item.htm?spm=a230r.1.14.3.yYBVG6&id=538286972599&cm_id=140105335569ed55e27b&abbucket=15&sku_properties=10004:709990523;5919063:6536025")
t_selector = Selector(text=browser.page_source)
print (t_selector.css(".tm-price::text").extract())
# print (browser.page_source)
browser.quit()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
知乎模擬登錄
from selenium import webdriver
from scrapy.selector import Selector
browser = webdriver.Chrome(executable_path="C:/chromedriver.exe")
#知乎模擬登陸
browser.get("https://www.zhihu.com/#signin")
browser.find_element_by_css_selector(".view-signin input[name='account']").send_keys("phone")
browser.find_element_by_css_selector(".view-signin input[name='password']").send_keys("password")
browser.find_element_by_css_selector(".view-signin button.sign-button").click()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
微博模擬登錄
微博開放平台api
from selenium import webdriver
from scrapy.selector import Selector
browser = webdriver.Chrome(executable_path="C:/chromedriver.exe")
#selenium 完成微博模擬登錄
browser.get("http://weibo.com/")
import time
time.sleep(5)
browser.find_element_by_css_selector("#loginname").send_keys("1147727180@qq.com")
browser.find_element_by_css_selector(".info_list.password input[node-type='password'] ").send_keys("password")
browser.find_element_by_xpath('//*[@id="pl_login_form"]/div/div[3]/div[6]/a').click()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
模擬JavaScript鼠標下滑
from selenium import webdriver
from scrapy.selector import Selector
browser = webdriver.Chrome(executable_path="C:/chromedriver.exe")
#開源中國博客
browser.get("https://www.oschina.net/blog")
import time
time.sleep(5)
for i in range(3):
browser.execute_script("window.scrollTo(0, document.body.scrollHeight); var lenOfPage=document.body.scrollHeight; return lenOfPage;")
time.sleep(3)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
頁面不加載圖片
from selenium import webdriver
from scrapy.selector import Selector
# 設置chromedriver不加載圖片
chrome_opt = webdriver.ChromeOptions()
prefs = {"profile.managed_default_content_settings.images":2}
chrome_opt.add_experimental_option("prefs", prefs)
browser = webdriver.Chrome(executable_path="C:/chromedriver.exe",chrome_options=chrome_opt)
browser.get("https://www.oschina.net/blog")
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
phantomjs無界面的瀏覽器獲取天貓價格
#phantomjs, 無界面的瀏覽器, 多進程情況下phantomjs性能會下降很嚴重
browser = webdriver.PhantomJS(executable_path="C:/phantomjs-2.1.1-windows/bin/phantomjs.exe")
browser.get("https://detail.tmall.com/item.htm?spm=a230r.1.14.3.yYBVG6&id=538286972599&cm_id=140105335569ed55e27b&abbucket=15&sku_properties=10004:709990523;5919063:6536025")
t_selector = Selector(text=browser.page_source)
print (t_selector.css(".tm-price::text").extract())
print (browser.page_source)
# browser.quit()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
2.selenium集成進scrapy
如何集成
創建中間件。
from selenium import webdriver
from scrapy.http import HtmlResponse
class JSPageMiddleware(object):
#通過chrome請求動態網頁
def process_request(self, request, spider):
if spider.name == "jobbole":
browser = webdriver.Chrome(executable_path="C:/chromedriver.exe")
spider.browser.get(request.url)
import time
time.sleep(3)
print ("訪問:{0}".format(request.url))
return HtmlResponse(url=spider.browser.current_url, body=spider.browser.page_source, encoding="utf-8", request=request)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
使用selenium集成到具體spider中
信號量:
dispatcher.connect 信號的映射,當spider結束該做什么
from scrapy.xlib.pydispatch import dispatcher
from scrapy import signals
#使用selenium
def __init__(self):
self.browser = webdriver.Chrome(executable_path="D:/Temp/chromedriver.exe")
super(JobboleSpider, self).__init__()
dispatcher.connect(self.spider_closed, signals.spider_closed)
def spider_closed(self, spider):
#當爬蟲退出的時候關閉chrome
print ("spider closed")
self.browser.quit()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
python下無界面瀏覽器
pip install pyvirtualdisplay
linux使用:
from pyvirtualdisplay import Display
display = Display(visible=0, size=(800, 600))
display.start()
browser = webdriver.Chrome()
browser.get()
- 1
- 2
- 3
- 4
- 5
- 6
錯誤:cmd=[‘xvfb’,’help’]
os error
sudo apt-get install xvfb
pip install xvfbwrapper
scrapy-splash:
支持分布式,穩定性不如chorme
selenium grid
支持分布式
splinter
https://github.com/cobrateam/splinter
scrapy的暫停重啟
scrapy crawl lagou -s JOBDIR=job_info/001
pycharm進程直接殺死 kiil -9
一次 ctrl+c可接受信號
Lib\site-packages\scrapy\dupefilters.py
先hash將url變成定長的字符串
然后使用集合set去重
telnet
遠程登錄
telnet localhost 6023 連接當前spider
est()命令查看spider當前狀態
spider.settings["COOKIES_ENABLED"]
Lib\site-packages\scrapy\extensions\telnet.py
數據收集 & 狀態收集
Scrapy提供了方便的收集數據的機制。數據以key/value方式存儲,值大多是計數值。 該機制叫做數據收集器(Stats Collector),可以通過 Crawler API 的屬性 stats 來使用。在下面的章節 常見數據收集器使用方法 將給出例子來說明。
無論數據收集(stats collection)開啟或者關閉,數據收集器永遠都是可用的。 因此您可以import進自己的模塊並使用其API(增加值或者設置新的狀態鍵(stat keys))。 該做法是為了簡化數據收集的方法: 您不應該使用超過一行代碼來收集您的spider,Scrpay擴展或任何您使用數據收集器代碼里頭的狀態。
http://scrapy-chs.readthedocs.io/zh_CN/latest/topics/stats.html
狀態收集,數據收集器
# 收集伯樂在線所有404的url以及404頁面數
handle_httpstatus_list = [404]
- 1
- 2
七、scrapy-redis 分布式爬蟲
1. 分布式爬蟲設計及redis介紹
多個爬蟲如何進行調度,一個集中的狀態管理器
優點:
- 利用多機器帶寬
- 利用多ip加速爬取速度
兩個問題:
- request隊列的集中管理
- 去重集中管理
分布式。
2. redis命令
hexists course_dict mtianyan
hexists course_dict mtianyan2
Redis HEXISTS命令被用來檢查哈希字段是否存在。
返回值
回復整數,1或0。
- 1, 如果哈希包含字段。
- 0 如果哈希不包含字段,或key不存在。
hdel course_dict mtianyan
Redis HDEL命令用於從存儲在鍵散列刪除指定的字段。如果沒有這個哈希中存在指定的字段將被忽略。如果鍵不存在,它將被視為一個空的哈希與此命令將返回0。
返回值回復整數,從散列中刪除的字段的數量,不包括指定的但不是現有字段。
hgetall course_dict
Redis Hgetall 命令用於返回哈希表中,所有的字段和值。
在返回值里,緊跟每個字段名(field name)之后是字段的值(value),所以返回值的長度是哈希表大小的兩倍。
hset course_dict bobby “python scrapy”
Redis Hset 命令用於為哈希表中的字段賦值 。
如果哈希表不存在,一個新的哈希表被創建並進行 HSET 操作。
如果字段已經存在於哈希表中,舊值將被覆蓋。
hkey course_dict
Redis Keys 命令用於查找所有符合給定模式 pattern 的 key 。。
hvals course_dict
Redis Hvals 命令返回哈希表所有字段的值。
lpush mtianyan “scary”
rpush mtianyan “scary”
存入key-value
lrange mtianyan 0 10
取出mtianyan的0到10

| 命令 | 說明 |
|---|---|
| lpop/rpop | 左刪除/右刪除 |
| llen mtianyan | 長度 |
| lindex mtianyan 3 | 第幾個元素 |
| sadd | 集合做減法 |
| siner | 交集 |
| spop | 隨機刪除 |
| srandmember | 隨機選擇多個元素 |
| smembers | 獲取set所有元素 |
| srandmember | 隨機選擇多個元素 |
| zadd | 每個數有分數 |
| zcount key 0 100 | 0-100分數據量統計 |
3. scrapy-redis搭建分布式爬蟲
需要的環境:
Python 2.7, 3.4 or 3.5
Redis >= 2.8
Scrapy >= 1.1
redis-py >= 2.10
pip install redis
setting.py設置
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 300
}
- 1
- 2
- 3
- 4
- 5
要繼承redisspider
from scrapy_redis.spiders import RedisSpider
class MySpider(RedisSpider):
name = 'myspider'
def parse(self, response):
# do stuff
pass
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
啟動spider
scrapy runspider myspider.py
push urls to redis:放置初始url進入隊列
redis-cli lpush myspider:start_urls http://google.com
搭建示例
1. 創建新的scrapy項目
2. 去github拷貝scrapy-redis源碼
不同spider使用不同redis list
將隊列從內存放入redis中
next_requests
所有的yield出去的request會被
ScrapyRedisTest\scrapy_redis\scheduler.py
的以及重寫的enqueue_request接收
八、elasticsearch搭建搜索引擎
elasticsearch介紹:一個基於lucene的搜索服務器,分布式多用戶的全文搜索引擎 java開發的 基於restful web接口
自己搭建的網站或者程序,添加搜索功能比較困難
所以我們希望搜索解決方案要高效
零配置並且免費
能夠簡單的通過json和http與搜索引擎交互
希望搜索服務很穩定
簡單的將一台服務器擴展到多台服務器
內部功能:
分詞 搜索結果打分 解析搜索要求
全文搜索引擎:solr sphinx
很多大公司都用elasticsearch 戴爾 Facebook 微軟等等
-
elasticsearch對Lucene進行了封裝,既能存儲數據,又能分析數據,適合與做搜索引擎
關系數據搜索缺點:
無法對搜素結果進行打分排序
沒有分布式,搜索麻煩,對程序員的要求比較高
無法解析搜索請求,對搜索的內容無法進行解析,如分詞等
數據多了,效率低
需要分詞,把關系,數據,重點分出來 -
nosql數據庫:
文檔數據庫 json代碼,在關系數據庫中數據存儲,需要存到多個表,內部有多對多等關系之類的,需要涉及到多個表才能將json里面的內容存下來,nosql直接將一個json的內容存起來,作為一個文檔存檔到數據庫。
mongodb:
1. elasticsearch安裝與配置
- java sdk安裝
- elasticsearch安裝官網下載 不使用官網的版本,提供原始的插件不多
- elasticsearc-rtf github搜索,中文發行版,已經安裝了很多插件 https://github.com/medcl/elasticsearch-rtf
- 運行elasticsearch的方法,在bin文件目錄下進入命令行,執行elasticsearch.bat
5.配置文件:elasticsearch-rtf\elasticsearch-rtf-master\config\elasticsearch.yml

2. elasticsearch兩個重要插件:head和kibana的安裝
head插件相當於Navicat,用於管理數據庫,基於瀏覽器
https://github.com/mobz/elasticsearch-head
Running with built in server
git clone git://github.com/mobz/elasticsearch-head.git
cd elasticsearch-head
npm install
npm run start
open http://localhost:9100/
- 1
- 2
- 3
- 4
- 5
- 6
- 7
配置elasticsearch與heade互通

kibana.bat

2. elasticsearch基礎概念
- 集群:一個或多個節點組織在一起
- 節點:一個集群中的一台服務器
- 分片:索引划分為多份的能力,允許水平分割,擴展容量,多個分片響應請求
- 副本:分片的一份或多分,一個節點失敗,其他節點頂上
|index | 數據庫|
|type | 表|
|document | 行|
|fields | 列|
集合搜索和保存:增加了五種方法:
OPTIONS & PUT & DELETE & TRACE & CONNECT
3. 倒排索引:


倒排索引待解決的問題:



4. elasticsearch命令
PUT lagou/job/1
1為idPUT lagou/job/
不指明id自動生成uuid。修改部分字段
POST lagou/job/1/_updateDELETE lagou/job/1
elasticserach批量操作:
查詢index為testdb下的job1表的id為1和job2表的id為2的數據
GET _mget
{
"docs":[
{
"_index":"testdb",
"_type":"job1",
"_id":1
},
{
"_index":"testdb",
"_type":"job2",
"_id":2
}
]
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
index已經指定了,所有在doc中就不用指定了
GET testdb/_mget{
"docs":[
{
"_type":"job1",
"_id":1
},
{
"_type":"job2",
"_id":2
}
]
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
連type都一樣,只是id不一樣
GET testdb/job1/_megt
{
"docs":[
{
"_id":1
},
{
"_id":2
}
]
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
或者繼續簡寫
GET testdb/job1/_megt
{
"ids":[1,2]
}
- 1
- 2
- 3
- 4
elasticsearch的bulk批量操作:可以合並多個操作,比如index,delete,update,create等等,包括從一個索引到另一個索引:
- action_and_meta_data\n
- option_source\n
- action_and_meta_data\n
- option_source\n
- ….
- action_and_meta_data\n
- option_source\n
每個操作都是由兩行構成,除了delete除外,由元信息行和數據行組成
注意數據不能美化,即只能是兩行的形式,而不能是經過解析的標准的json排列形式,否則會報錯
POST _bulk
{"index":...}
{"field":...}
- 1
- 2
- 3
elasticserach的mapping映射
elasticserach的mapping映射:創建索引時,可以預先定義字段的類型以及相關屬性,每個字段定義一種類型,屬性比mysql里面豐富,前面沒有傳入,因為elasticsearch會根據json源數據來猜測是什么基礎類型。M挨批評就是我們自己定義的字段的數據類型,同時告訴elasticsearch如何索引數據以及是否可以被搜索。
作用:會讓索引建立的更加細致和完善,對於大多數是不需要我們自己定義
相關屬性的配置
- String類型: 兩種text keyword。text會對內部的內容進行分析,索引,進行倒排索引等,為設置為keyword則會當成字符串,不會被分析,只能完全匹配才能找到String。 在es5已經被廢棄了
- 日期類型:date 以及datetime等
- 數據類型:integer long double等等
- bool類型
- binary類型
- 復雜類型:object nested
- geo類型:geo-point地理位置
- 專業類型:ip competition
- object :json里面內置的還有下層{}的對象
- nested:數組形式的數據
elasticserach查詢:
大概分為三類:
- 基本查詢:
- 組合查詢:
- 過濾:查詢同時,通過filter條件在不影響打分的情況下篩選數據
match查詢:
后面為關鍵詞,關於python的都會提取出來,match查詢會對內容進行分詞,並且會自動對傳入的關鍵詞進行大小寫轉換,內置ik分詞器會進行切分,如python網站,只要搜到存在的任何一部分,都會返回
GET lagou/job/_search
{
"query":{ "match":{ "title":"python" } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
term查詢
區別,對傳入的值不會做任何處理,就像keyword,只能查包含整個傳入的內容的,一部分也不行,只能完全匹配
terms查詢
title里傳入多個值,只要有一個匹配,就會返回結果
控制查詢的返回數量
GET lagou/_serach
{
"query":{
"match":{
"title":"python"
}
},
"form":1,
"size":2
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
通過這里就可以完成分頁處理洛,從第一條開始查詢兩條
match_all 返回所有
GET lagou/_search
{
“query”:{
“match_all”:{}
}
}
match_phrase查詢 短語查詢
GET lagou/_search
{
"query":{
"match_phrase":{
"title":{
"query":"python系統",
"slop":6
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
python系統,將其分詞,分為詞條,滿足詞條里面的所有詞才會返回結果,slop參數說明兩個詞條之間的最小距離
multi_match查詢
比如可以指定多個字段,比如查詢title和desc這兩個字段包含python的關鍵詞文檔
GET lagou/_search
{
"query":{
"multi_match":{
"query":"python",
"fileds":["title^3","desc"]
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
query為要查詢的關鍵詞 fileds在哪些字段里查詢關鍵詞,只要其中某個字段中出現了都返回
^3的意思為設置權重,在title中找到的權值為在desc字段中找到的權值的三倍
指定返回字段
GET lagou/_search{
"stored_fields":["title","company_name"],
"query":{
"match":{
"title":"pyhton"
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
通過sort把結果排序
GET lagou/_search
{
"query";{
"match_all":{}
},
"sort":[{
"comments":{
"order":"desc"
}
}]
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
sort是一個數組,里面是一個字典,key就是要sort的字段,asc desc是升序降序的意思
查詢范圍 range查詢
GET lagou/_search
{
“query”;{
“range”:{
“comments”:{
“gte”:10,
“lte”:20,
“boost”:2.0
}
}
}
}
range是在query里面的,boost是權重,gte lte是大於等於 小於等於的意思
對時間的范圍查詢,則是以字符串的形式傳入
wildcard模糊查詢,可以使用通配符
*
組合查詢:bool查詢
bool查詢包括了must should must_not filter來完成
格式如下:
bool:{
"filter":[],
"must":[],
"should":[],
"must_not":[],
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
5. 把爬取的數據保存至elasticsearch
class ElasticsearchPipeline(object):
#將數據寫入到es中
def process_item(self, item, spider):
#將item轉換為es的數據
item.save_to_es()
return item
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
elasticsearch-dsl-py
High level Python client for Elasticsearch
pip install elasticsearch-dsl
items.py 中將數據保存至es
def save_to_es(self):
article = ArticleType()
article.title = self['title']
article.create_date = self["create_date"]
article.content = remove_tags(self["content"])
article.front_image_url = self["front_image_url"]
if "front_image_path" in self:
article.front_image_path = self["front_image_path"]
article.praise_nums = self["praise_nums"]
article.fav_nums = self["fav_nums"]
article.comment_nums = self["comment_nums"]
article.url = self["url"]
article.tags = self["tags"]
article.meta.id = self["url_object_id"]
article.suggest = gen_suggests(ArticleType._doc_type.index, ((article.title,10),(article.tags, 7)))
article.save()
redis_cli.incr("jobbole_count")
return
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
6. elasticsearch結合django搭建搜索引擎
獲取elasticsearch的查詢接口
body={
"query":{
"multi_match":{
"query":key_words,
"fields":["tags", "title", "content"]
}
},
"from":(page-1)*10,
"size":10,
"highlight": {
"pre_tags": ['<span class="keyWord">'],
"post_tags": ['</span>'],
"fields": {
"title": {},
"content": {},
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
使django與其交互。


版權聲明:本文為博主原創文章,未經博主允許不得轉載。
所有文字版權歸mtianyan所有 https://blog.csdn.net/qq_23079443/article/details/73920584
