微博關鍵詞爬蟲——基於requests和aiohttp


  requests庫是python爬蟲中最常見的庫,與內置的urllib庫相比,它更加簡潔高效,是每一個接觸爬蟲者都務必要掌握的基礎;但它也是有缺點的,就是不支持異步操作,雖然可以通過多線程來解決,但當需要發送大量請求時,創建大量的線程會浪費過多的資源;此時出現了一個新的庫aiohttp,它是支持異步操作的,可以在一個線程中,通過異步多任務來實現快速發送請求,提高效率。這次,我基於這兩個庫,做一個高效的微博關鍵詞爬蟲,源碼在文章的末尾。

  首先,我是從微博的移動端地址入手,發現它是ajsx請求,請求參數中,除頁碼外,其他都是常量,所以要實現多頁請求,直接將頁碼當作參數發送即可。但是頁面返回的json數據並沒有直接標明總頁數,所以需要自己計算。進一步分析,發現數據中包含了微博的總條數和每一頁的條數,這就是突破口,對它進行簡單的運算就可以拿到總頁碼。此處只需要發送一次請求,就可以獲取到信息,所以這里采用的是requests。

def get_page():
    """
    先用requests構造請求,解析出關鍵詞搜索出來的微博總頁數
    :return: 返回每次請求需要的data參數
    """
    data_list = []
    data = {
        'containerid': '100103type=1&q={}'.format(kw),
        'page_type': 'searchall'}
    resp = requests.get(url=url, headers=headers, params=data)
    total_page = resp.json()['data']['cardlistInfo']['total']  # 微博總數
    # 一頁有10條微博,用總數對10整除,余數為0則頁碼為總數/10,余數不為0則頁碼為(總數/10)+1
    if total_page % 10 == 0:
        page_num = int(total_page / 10)
    else:
        page_num = int(total_page / 10) + 1
    # 頁碼為1,data為當前data,頁碼不為1,通過for循環構建每一頁的data參數
    if page_num == 1:
        data_list.append(data)
        return data_list
    else:
        for i in range(1, page_num + 1):
            data['page'] = i
            data_list.append(copy.deepcopy(data))
        return data_list
頁碼解析

  獲取完頁碼之后,就可以進行數據解析。每一頁都需要單獨發送請求,為了提高效率,此處采用的是aiohttp。通過async關鍵詞來定義特殊函數,返回一個協程對象,注意函數內部所有代碼必須是支持異步操作的。在構建請求的時候需要注意特定的格式。

# async定義函數,返回一個協程對象
async def crawl(data):
    """
    多任務異步解析頁面,存儲數據
    :param data: 請求所需的data參數
    :return: None
    """
    async with aiohttp.ClientSession() as f:  # 實例化一個ClientSession
        async with await f.get(url=url, headers=headers, params=data) as resp:  # 攜帶參數發送請求
            text = await resp.text()  # await 等待知道獲取完整數據
            text_dict = json.loads(text)['data']['cards']
            parse_dict = {}
            for card in text_dict:
                if card['card_type'] == 9:
                    scheme = card['scheme']
                    if card['mblog']['isLongText'] is False:
                        text = card['mblog']['text']
                        text = re.sub(r'<.*?>|\n+', '', text)
                    else:
                        text = card['mblog']['longText']['longTextContent']
                    user = card['mblog']['user']['profile_url']
                    comments_count = card['mblog']['comments_count']
                    attitudes_count = card['mblog']['attitudes_count']
                    parse_dict['url'] = scheme
                    parse_dict['text'] = text
                    parse_dict['author'] = user
                    parse_dict['comments_count'] = comments_count
                    parse_dict['attitudes_count'] = attitudes_count
                    parse_dict_list.append(copy.deepcopy(parse_dict))
數據解析

  最關鍵的一步,將協程對象添加到事件循環中,實現異步執行。

task_list = []  # 定義一個任務列表
    for data in data_list:
        c = crawl(data)  # 調用協程,傳參
        task = asyncio.ensure_future(c)  # 創建任務對象
        task_list.append(task)  # 將任務添加到列表中
    loop = asyncio.get_event_loop()  # 創建事件循環
    loop.run_until_complete(asyncio.wait(task_list))  # 開啟循環,並將阻塞的任務掛起
事件循環

  以上部分就是整個爬蟲的關鍵,剩下的數據寫入(導出到excle)就直接放在源碼中,不足之處,請大家指正!

import copy
import aiohttp
import requests
import re
import asyncio
import json
import xlwt


def get_page():
    """
    先用requests構造請求,解析出關鍵詞搜索出來的微博總頁數
    :return: 返回每次請求需要的data參數
    """
    data_list = []
    data = {
        'containerid': '100103type=1&q={}'.format(kw),
        'page_type': 'searchall'}
    resp = requests.get(url=url, headers=headers, params=data)
    total_page = resp.json()['data']['cardlistInfo']['total']  # 微博總數
    # 一頁有10條微博,用總數對10整除,余數為0則頁碼為總數/10,余數不為0則頁碼為(總數/10)+1
    if total_page % 10 == 0:
        page_num = int(total_page / 10)
    else:
        page_num = int(total_page / 10) + 1
    # 頁碼為1,data為當前data,頁碼不為1,通過for循環構建每一頁的data參數
    if page_num == 1:
        data_list.append(data)
        return data_list
    else:
        for i in range(1, page_num + 1):
            data['page'] = i
            data_list.append(copy.deepcopy(data))
        return data_list


# async定義函數,返回一個協程對象
async def crawl(data):
    """
    多任務異步解析頁面,存儲數據
    :param data: 請求所需的data參數
    :return: None
    """
    async with aiohttp.ClientSession() as f:  # 實例化一個ClientSession
        async with await f.get(url=url, headers=headers, params=data) as resp:  # 攜帶參數發送請求
            text = await resp.text()  # await 等待知道獲取完整數據
            text_dict = json.loads(text)['data']['cards']
            parse_dict = {}
            for card in text_dict:
                if card['card_type'] == 9:
                    scheme = card['scheme']
                    if card['mblog']['isLongText'] is False:
                        text = card['mblog']['text']
                        text = re.sub(r'<.*?>|\n+', '', text)
                    else:
                        text = card['mblog']['longText']['longTextContent']
                    user = card['mblog']['user']['profile_url']
                    comments_count = card['mblog']['comments_count']
                    attitudes_count = card['mblog']['attitudes_count']
                    parse_dict['url'] = scheme
                    parse_dict['text'] = text
                    parse_dict['author'] = user
                    parse_dict['comments_count'] = comments_count
                    parse_dict['attitudes_count'] = attitudes_count
                    parse_dict_list.append(copy.deepcopy(parse_dict))


def insert_data(file_name):
    """
    將數據導出到excle中
    :param file_name: 文件名
    :return:
    """
    wr = xlwt.Workbook(encoding='utf8')
    table = wr.add_sheet(file_name)
    table.write(0, 0, '原鏈接')
    table.write(0, 1, '正文')
    table.write(0, 2, '作者首頁')
    table.write(0, 3, '評論數')
    table.write(0, 4, '點贊數')
    for index, data in enumerate(parse_dict_list):
        table.write(index + 1, 0, data['url'])
        table.write(index + 1, 1, data['text'])
        table.write(index + 1, 2, data['author'])
        table.write(index + 1, 3, data['comments_count'])
        table.write(index + 1, 4, data['attitudes_count'])
    file_path = file_name + '.xls'
    wr.save(file_path)


def main(file_name):
    """
    開啟多任務循環
    :return: None
    """
    data_list = get_page()  # 接收data參數列表
    task_list = []  # 定義一個任務列表
    for data in data_list:
        c = crawl(data)  # 調用協程,傳參
        task = asyncio.ensure_future(c)  # 創建任務對象
        task_list.append(task)  # 將任務添加到列表中
    loop = asyncio.get_event_loop()  # 創建事件循環
    loop.run_until_complete(asyncio.wait(task_list))  # 開啟循環,並將阻塞的任務掛起
    insert_data(file_name)


if __name__ == '__main__':
    kw = input('關鍵詞:')
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2'}
    url = 'https://m.weibo.cn/api/container/getIndex'
    parse_dict_list = []  # 臨時存放爬取的數據
    main(kw)
完整代碼

  注意,由於微博的反爬機制,每次短時間的大量請求都會導致ip被短時間禁用,此處可以通過添加代理的方式來解決。我的想法是在頁碼解析部分添加代理池,隨機選擇代理,如果當前ip返回的狀態碼為200,則進行解析出頁碼,並將該ip攜帶到頁面解析;若狀態碼不是200,則循環選擇下一個ip。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM