scrapy-redis分布式爬取知乎問答,使用docker布置多台機器。


先上結果:

問題:

答案:

可以看到現在答案文檔有十萬多,十萬個為什么~hh

 正文開始:

分布式爬蟲應該是在多台服務器(A B C服務器)布置爬蟲環境,讓它們重復交叉爬取,這樣的話需要用到狀態管理器。

狀態管理器主要負責url爬取隊列的管理,亦可以當爬蟲服務器。同時配置好redis及scrapy-redis環境就行~

爬蟲服務器主要負責數據的爬取、處理等。安裝好scrapy-redis就行~

如下圖:

 

需要多台機器同時爬取目標url並且同時從url中抽取數據,N台機器做一模一樣的事,通過redis來調度、中轉,也就是說它沒有主機從機之分。

要明白,scrapy是不支持分布式的。

  1. scrapy中request是放在內存的,現在兩台服務器就需要對隊列進行集中管理,將request放到redis里。
  2. 去重也要進行集中管理,也是用到redis去重。

分布式爬蟲的優點

  1. 充分利用多台機器的帶寬速度爬取數據
  2. 充分利用多台機器的IP爬取
使用scrapy-redis非常簡單,可以說修改scrapy項目里的settings.py的幾個配置基本就行了。
安裝:
pip install scrapy-redis

然后這個知乎爬取項目要從scrapy講起:

cmd中啟動一個scrapy項目:

scrapy startproject ArticleSpider

進入項目文件夾並開始一個爬蟲:

cd ArticleSpider

scrapy genspider zhihu www.zhihu.com

目前的項目文件:

分析知乎的問題的api:

在知乎首頁每次向下拉都會觸發這個ajax請求,並且返回內容是問題的url、標題等,很明顯它就是問題的api了~

https://www.zhihu.com/api/v3/feed/topstory/recommend?session_token=8c3313b2932c370198480b54dc89fd3a&desktop=true&page_number=2&limit=6&action=down&after_id=5

 

它的返回:

有趣的是每次請求它,它的response頭都會返回一個set-cookie,意味着要用返回的新的cookie請求下一頁的答案,否則返回錯誤。

 

看看問題api請求頭帶的參數:

這是個get請求,其實第一條鏈接可以在請求首頁后的html源碼里找到,這樣就找到了這幾個參數,需要變動的只有頁數page_number:

在html源碼里的問題api:

我們需要先請求首頁html然后以re匹配獲得這條開始的問題api,然后偽造后面頁數的請求。

 

答案api則是在往下拉所有答案時找到的:

 

答案api的請求更容易處理,只需要修改問題的id及offset偏移量就行,甚至不用cookie來請求。

比如這條就是某個問題的答案api:

https://www.zhihu.com/api/v4/questions/308761407/answers?include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelevant_info%2Cquestion%2Cexcerpt%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cis_labeled%2Cis_recognized%2Cpaid_info%2Cpaid_info_content%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cbadge%5B%2A%5D.topics&limit=5&offset=13&platform=desktop&sort_by=default

它的返回:

 問題及答案api返回的都是json,那就非常容易處理了。
 
可以先看看items.py的配置,配置了一些問題及答案需要爬取的項:
class QuestionItem(scrapy.Item):
    '''
    問題的item,問題和答案分兩個集合保存在mongodb中
    '''

    title = scrapy.Field()
    created = scrapy.Field()
    answer_num = scrapy.Field()
    comment_num = scrapy.Field()
    follow_nums = scrapy.Field()
    question_id = scrapy.Field()
    topics = scrapy.Field()
    url = scrapy.Field()
    author_url = scrapy.Field()
    author_name = scrapy.Field()
    author_headline = scrapy.Field()
    author_gender = scrapy.Field()
    crawl_time = scrapy.Field()


class Answer_Item(scrapy.Item):
    '''
    答案的item
    '''
    answer_id = scrapy.Field()
    question_id = scrapy.Field()
    url = scrapy.Field()
    user_name = scrapy.Field()
    user_id = scrapy.Field()
    content = scrapy.Field()
    praise_num = scrapy.Field()
    comment_num = scrapy.Field()
    create_time = scrapy.Field()
    update_time = scrapy.Field()
    crawl_time = scrapy.Field()

然后是spider.py的修改:

# -*- coding: utf-8 -*-
import re
import time
import json
import datetime
import scrapy

from ArticleSpider.items import QuestionItem, Answer_Item
from scrapy.http import Request
from scrapy_redis.spiders import RedisSpider


def timestamp_2_date(timestamp):
    '''
    用來將時間戳轉為日期時間形式
    '''
    time_array = time.localtime(timestamp)
    my_time = time.strftime("%Y-%m-%d %H:%M:%S", time_array)
    return my_time

def handle_cookie(response):
    '''
    用來處理set-cookie
    '''
    cookie_section = response.headers.get('set-cookie')
    # 匹配cookie片段
    sections = re.findall('(KLBRSID=.*?);', str(cookie_section))
    print(sections)
    raw_cookie = response.request.headers['Cookie'].decode('utf-8')
    # 替換cookie片段到完整cookie里
    cookie = re.sub('KLBRSID=.*', sections[0], raw_cookie)
    return cookie

class ZhihuSpider(RedisSpider):
    # spider名
    name = 'zhihu'
    # 允許訪問的域名
    allowed_domains = ['www.zhihu.com']
    # redis_key,到時scrapy會去redis讀這個鍵的值,即要訪問的url,原來start_url的值也是放在redis里
    redis_key = 'zhihu:start_urls'
    # spider的設置,在這里設置可以覆蓋setting.py里的設置
    custom_settings = {
        # 用來設置隨機延遲,最大5秒
        "RANDOM_DELAY": 5,
    }
    # 上面這樣設置好了就能使用scrapy-redis進行分布式的爬取,其他的比如parse()函數按照scrapy的邏輯設置就好
    # 答案的api
    answer_api = 'https://www.zhihu.com/api/v4/questions/{0}/answers?include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelevant_info%2Cquestion%2Cexcerpt%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cis_labeled%2Cis_recognized%2Cpaid_info%2Cpaid_info_content%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cbadge%5B%2A%5D.topics&limit={1}&offset={2}&platform=desktop&sort_by=default'

    def parse(self, response):
        '''
        解析首頁,獲取問題api
        '''
        # 每次請求知乎問題api都會返回一個新的set-cookie(只是一段cookie),用來設置新的cookie。舊的cookie無法訪問下一頁的鏈接
        cookie = handle_cookie(response)
        print(cookie)
        # 請求首頁后,在首頁html源碼里尋找問題的api
        question_api = re.findall('"previous":"(.*?)","next', response.text, re.S)
        question_url = question_api[0].replace('\\u002F', '/')
        # 用新的cookie請求問題api,回調函數為parse_question
        yield Request(url=question_url,callback=self.parse_question,headers={'cookie':cookie})

    def parse_question(self,response):
        '''
        解析問題api返回的json數據
        '''
        # 構造新cookie
        cookie = handle_cookie(response)
        dics = json.loads(response.text)
        for dic in dics['data']:
            try:
                ques_item = QuestionItem()
                if 'question' in dic['target']:
                    # 問題標題
                    ques_item['title'] = dic['target']['question']['title']
                    # 問題創建時間
                    ques_item['created'] = dic['target']['question']['created']
                    ques_item['created'] = timestamp_2_date(ques_item['created'])
                    # 回答數
                    ques_item['answer_num'] = dic['target']['question']['answer_count']
                    # 評論數
                    ques_item['comment_num'] = dic['target']['question']['comment_count']
                    # 關注人數
                    ques_item['follow_nums'] = dic['target']['question']['follower_count']
                    # 問題id
                    ques_item['question_id'] = dic['target']['question']['id']
                    #問題url
                    ques_item['url'] = dic['target']['question']['id']
                    ques_item['url'] = 'https://www.zhihu.com/question/' + str(ques_item['url'])
                    # 問題標簽
                    if 'uninterest_reasons' in dic:
                        topics = []
                        for i in dic['uninterest_reasons']:
                            topics.append(i['reason_text'])
                    ques_item['topics'] = topics
                    # 作者url
                    ques_item['author_url'] = dic['target']['question']['author']['url']
                    # 作者名
                    ques_item['author_name'] = dic['target']['question']['author']['name']
                    # 作者簽名
                    ques_item['author_headline'] = dic['target']['question']['author']['headline']
                    # 作者性別
                    ques_item['author_gender'] = dic['target']['question']['author'].get('gender')
                    if ques_item['author_gender']:
                        if ques_item['author_gender'] == 0:
                            ques_item['author_gender'] = ''
                        else:
                            ques_item['author_gender'] = ''
                    else:
                        ques_item['author_gender'] = '未知'
                    # 爬取時間
                    ques_item['crawl_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    yield ques_item
            except:
                pass
        # 問題api里會有個is_end的值,用來判斷是否還有下一頁
        if not dics['paging']['is_end']:
            # 有下一頁,獲取next里的下一頁鏈接
            next_url = dics['paging']['next']
            # 用新的cookie請求下一頁問題url
            yield Request(url=next_url, callback=self.parse_question, headers={'cookie': cookie})
            # 請求答案api,api需要傳入question_id, limit及頁碼
            yield Request(url=self.answer_api.format(ques_item['question_id'], 20, 0), callback=self.parse_answer)

    def parse_answer(self,response):
        #處理answerAPI返回的json
        ans_json = json.loads(response.text)
        # is_end的值意味着當前url是否是最后一頁
        is_end = ans_json['paging']['is_end']
        totals_answer = ans_json['paging']['totals']
        # 下一頁url
        next_url = ans_json['paging']['next']
        for answer in ans_json['data']:
            ans_item = Answer_Item()
            # 答案id
            ans_item['answer_id'] = answer['id']
            # 答案對應的問題id
            ans_item['question_id'] = answer['question']['id']
            # 答案url
            ans_item['url'] = answer['url']
            # 答者用戶名
            ans_item['user_name'] = answer['author']['name'] if 'name' in answer['author'] else None
            # 答者id
            ans_item['user_id'] = answer['author']['id'] if 'id' in answer['author'] else None
            # 答案內容
            ans_item['content'] = answer['content'] if 'content' in answer else None
            # 贊同人數
            ans_item['praise_num'] = answer['voteup_count']
            # 評論人數
            ans_item['comment_num'] = answer['comment_count']
            # 答案創建時間
            ans_item['create_time'] = answer['created_time']
            ans_item['create_time'] = timestamp_2_date(ans_item['create_time'])
            # 答案修改時間
            ans_item['update_time'] = answer['updated_time']
            ans_item['update_time'] = timestamp_2_date(ans_item['update_time'])
            # 爬取時間
            ans_item['crawl_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            yield ans_item
        # offset偏移,一頁20,每問題只爬50頁回答。即offest>1000
        offset = next_url.split('offset=')[1].split('\u0026')[0]
        if int(offset)>1000:
            pass
        else:
            # 當當前頁不為最后一頁且offset不大於1000時,繼續請求下一頁答案
            if not is_end:
                yield scrapy.Request(url=next_url, callback=self.parse_answer)

 

settings.py添加以下行:

# 指定使用scrapy-redis的調度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
 
 
# 指定使用scrapy-redis的去重
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
 
# 在redis中保持scrapy-redis用到的各個隊列,從而允許暫停和暫停后恢復,也就是不清理redis queues
SCHEDULER_PERSIST = True

FEED_EXPORT_ENCODING = 'utf-8'

# redis配置
REDIS_HOST = '填狀態管理器服務器ip,請一定要保證redis數據庫能遠程訪問'
REDIS_PORT = 6379
# redis密碼
REDIS_PARAMS = {'password': '123456'}

# 當scrapy-redis爬完之后會空等, 等待redis提供繼續爬取的url。但是如果已經爬完了。沒必要繼續等,設置這個當意味啟動3600s時停止spider。
CLOSESPIDER_TIMEOUT = 3600
pipelines.py的修改,將數據保存到遠程mongodb數據庫服務器:
from pymongo import MongoClient

class ArticlespiderPipeline(object):
    def process_item(self, item, spider):
        return item

class MongoPipeline(object):

    def __init__(self, databaseIp='遠程mongodb服務器ip', databasePort=27017, user="", password="",):
        self.client = MongoClient(databaseIp, databasePort)
        # self.db = self.client.test_database
        # self.db.authenticate(user, password)

    def process_item(self, item, spider):

        postItem = dict(item)  # 把item轉化成字典形式
        print(postItem)
        if item.__class__.__name__ == 'QuestionItem':
            mongodbName = 'zhihu'
            self.db = self.client[mongodbName]
            # 更新插入問題數據
            self.db.question.update({'question_id':postItem['question_id']},{'$set':postItem},upsert=True)
            
        elif item.__class__.__name__ == 'Answer_Item':
            mongodbName = 'zhihu'
            self.db = self.client[mongodbName]
            # 更新插入答案數據
            self.db.answer.update({'answer_id': postItem['answer_id']}, {'$set': postItem}, upsert=True)
        # 會在控制台輸出原item數據,可以選擇不寫
        return item
middlewares.py的修改:
import logging
import random
import time

class RandomDelayMiddleware(object):
    '''
    這個類用來設置自定義的隨機延時
    '''
    def __init__(self, delay):
        self.delay = delay

    @classmethod
    def from_crawler(cls, crawler):
        delay = crawler.spider.settings.get("RANDOM_DELAY", 10)
        if not isinstance(delay, int):
            raise ValueError("RANDOM_DELAY need a int")
        return cls(delay)

    def process_request(self, request, spider):
        if 'signin?next' in request.url:
            raise IgnoreRequest
        delay = random.randint(0, self.delay)
        logging.debug("### random delay: %s s ###" % delay)
        time.sleep(delay)

class RandomUserAgentMiddleware():
    '''
    這個類用來給spider隨機設置user_agent里的請求頭
    '''
    user_agent = [
        'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0',
        'Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko',
    ]
    def process_request(self, request, spider):
        request.headers['User-Agent'] = random.choice(self.user_agent)

class CookieMiddleware():
    '''
    這個類用來給spider隨機設置cookie,scrapy要求cookies是字典類型
    '''
    def process_request(self, request, spider):
        with open('cookies.txt') as f:
            raw_cookie = f.read()
        # 當請求首頁時,提供cookie
        if request.url in 'https://www.zhihu.com/':
            request.headers['cookie'] = raw_cookie
            print('---',request.headers)

 使用py文件 啟動scrapy-redis:

from scrapy.cmdline import execute
import sys
import os


# os.path.abspath(__file__)當前py文件的路徑
# os.path.dirname(os.path.abspath(__file__))當前文件目錄
# 設置工程目錄
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

# 相當於在cmd里執行scrapy crawl zhihu
execute(['scrapy','crawl','zhihu'])

啟動后scrapy-redis會等待start_urls push進redis

我在centos服務器布置好了redis,在終端執行以下命令以將知乎首頁放進redis:


redis-cli auth 123456 lpush zhihu:start_urls https://www.zhihu.com

項目docker鏡像制作

centos關於docker的安裝請參考,系統要求是centos7+:

https://www.runoob.com/docker/centos-docker-install.html

如果是阿里雲的服務器可以到阿里雲查看如何設置自己的鏡像加速器。

首先創建一個沒有后綴名的Dockerfile文件:

1 ##阿里雲python3 鏡像
2 FROM registry.cn‐shanghai.aliyuncs.com/tcc‐public/python:3
3 ##添加/usr/local/bin這個路徑
4 ENV PATH /usr/local/bin:$PATH
5 ##將本地的代碼放置到虛擬容器當中
6 ADD . /code
7 ##指定工作目錄
8 WORKDIR /code
9 ## 執行pip3 install ‐r requirements.txt
10 RUN pip3 install ‐r requirements.txt
11 ## 執行scrapy crawl zhihu開始爬取
12 CMD scrapy crawl zhihu

requirements.txt

lxml==4.4.2
pymongo==3.10.1
redis==3.4.1
requests==2.22.0
Scrapy==1.8.0
scrapy-redis==0.6.8

如下:

build:

docker build -t registry.cn-shenzhen.aliyuncs.com/test_for_tianchi/test_for_tianchi_submit:1.0 .

注意: registry.~~~ 是上面創建倉庫的公網地址,用自己倉庫地址替換。地址后面的:1.0為自己指定的版本號,用於區分每次build的鏡像。最后的.是構建鏡像的路徑,不可以

省掉。

測試是否能正常運行,正常運行后再進行推送。:

docker run registry.cn‐shenzhen.aliyuncs.com/test_for_tianchi/test_for_tianchi_submit:1.0

推送到鏡像倉庫:

docker push registry.cn‐shenzhen.aliyuncs.com/test_for_tianchi/test_for_tianchi_submit:1.0

 在另外機器使用docker運行項目:

先登錄

sudo docker login ‐‐username=*** registry.cn‐shanghai.aliyuncs.com

再直接運行即可:

docker run registry.cn‐shenzhen.aliyuncs.com/test_for_tianchi/test_for_tianchi_submit:1.0

 目前幾台機器同時運行時沒問題的:

 

最后,已將該項目放github,若有需要的話請上去拿~

https://github.com/dao233/spider/tree/master/ArticleSpider

 END~

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM