requests庫是python爬蟲中最常見的庫,與內置的urllib庫相比,它更加簡潔高效,是每一個接觸爬蟲者都務必要掌握的基礎;但它也是有缺點的,就是不支持異步操作,雖然可以通過多線程來解決,但當需要發送大量請求時,創建大量的線程會浪費過多的資源;此時出現了一個新的庫aiohttp,它是支持異步操作的,可以在一個線程中,通過異步多任務來實現快速發送請求,提高效率。這次,我基於這兩個庫,做一個高效的微博關鍵詞爬蟲,源碼在文章的末尾。
首先,我是從微博的移動端地址入手,發現它是ajsx請求,請求參數中,除頁碼外,其他都是常量,所以要實現多頁請求,直接將頁碼當作參數發送即可。但是頁面返回的json數據並沒有直接標明總頁數,所以需要自己計算。進一步分析,發現數據中包含了微博的總條數和每一頁的條數,這就是突破口,對它進行簡單的運算就可以拿到總頁碼。此處只需要發送一次請求,就可以獲取到信息,所以這里采用的是requests。
def get_page(): """ 先用requests構造請求,解析出關鍵詞搜索出來的微博總頁數 :return: 返回每次請求需要的data參數 """ data_list = [] data = { 'containerid': '100103type=1&q={}'.format(kw), 'page_type': 'searchall'} resp = requests.get(url=url, headers=headers, params=data) total_page = resp.json()['data']['cardlistInfo']['total'] # 微博總數 # 一頁有10條微博,用總數對10整除,余數為0則頁碼為總數/10,余數不為0則頁碼為(總數/10)+1 if total_page % 10 == 0: page_num = int(total_page / 10) else: page_num = int(total_page / 10) + 1 # 頁碼為1,data為當前data,頁碼不為1,通過for循環構建每一頁的data參數 if page_num == 1: data_list.append(data) return data_list else: for i in range(1, page_num + 1): data['page'] = i data_list.append(copy.deepcopy(data)) return data_list
獲取完頁碼之后,就可以進行數據解析。每一頁都需要單獨發送請求,為了提高效率,此處采用的是aiohttp。通過async關鍵詞來定義特殊函數,返回一個協程對象,注意函數內部所有代碼必須是支持異步操作的。在構建請求的時候需要注意特定的格式。
# async定義函數,返回一個協程對象 async def crawl(data): """ 多任務異步解析頁面,存儲數據 :param data: 請求所需的data參數 :return: None """ async with aiohttp.ClientSession() as f: # 實例化一個ClientSession async with await f.get(url=url, headers=headers, params=data) as resp: # 攜帶參數發送請求 text = await resp.text() # await 等待知道獲取完整數據 text_dict = json.loads(text)['data']['cards'] parse_dict = {} for card in text_dict: if card['card_type'] == 9: scheme = card['scheme'] if card['mblog']['isLongText'] is False: text = card['mblog']['text'] text = re.sub(r'<.*?>|\n+', '', text) else: text = card['mblog']['longText']['longTextContent'] user = card['mblog']['user']['profile_url'] comments_count = card['mblog']['comments_count'] attitudes_count = card['mblog']['attitudes_count'] parse_dict['url'] = scheme parse_dict['text'] = text parse_dict['author'] = user parse_dict['comments_count'] = comments_count parse_dict['attitudes_count'] = attitudes_count parse_dict_list.append(copy.deepcopy(parse_dict))
最關鍵的一步,將協程對象添加到事件循環中,實現異步執行。
task_list = [] # 定義一個任務列表 for data in data_list: c = crawl(data) # 調用協程,傳參 task = asyncio.ensure_future(c) # 創建任務對象 task_list.append(task) # 將任務添加到列表中 loop = asyncio.get_event_loop() # 創建事件循環 loop.run_until_complete(asyncio.wait(task_list)) # 開啟循環,並將阻塞的任務掛起
以上部分就是整個爬蟲的關鍵,剩下的數據寫入(導出到excle)就直接放在源碼中,不足之處,請大家指正!
import copy import aiohttp import requests import re import asyncio import json import xlwt def get_page(): """ 先用requests構造請求,解析出關鍵詞搜索出來的微博總頁數 :return: 返回每次請求需要的data參數 """ data_list = [] data = { 'containerid': '100103type=1&q={}'.format(kw), 'page_type': 'searchall'} resp = requests.get(url=url, headers=headers, params=data) total_page = resp.json()['data']['cardlistInfo']['total'] # 微博總數 # 一頁有10條微博,用總數對10整除,余數為0則頁碼為總數/10,余數不為0則頁碼為(總數/10)+1 if total_page % 10 == 0: page_num = int(total_page / 10) else: page_num = int(total_page / 10) + 1 # 頁碼為1,data為當前data,頁碼不為1,通過for循環構建每一頁的data參數 if page_num == 1: data_list.append(data) return data_list else: for i in range(1, page_num + 1): data['page'] = i data_list.append(copy.deepcopy(data)) return data_list # async定義函數,返回一個協程對象 async def crawl(data): """ 多任務異步解析頁面,存儲數據 :param data: 請求所需的data參數 :return: None """ async with aiohttp.ClientSession() as f: # 實例化一個ClientSession async with await f.get(url=url, headers=headers, params=data) as resp: # 攜帶參數發送請求 text = await resp.text() # await 等待知道獲取完整數據 text_dict = json.loads(text)['data']['cards'] parse_dict = {} for card in text_dict: if card['card_type'] == 9: scheme = card['scheme'] if card['mblog']['isLongText'] is False: text = card['mblog']['text'] text = re.sub(r'<.*?>|\n+', '', text) else: text = card['mblog']['longText']['longTextContent'] user = card['mblog']['user']['profile_url'] comments_count = card['mblog']['comments_count'] attitudes_count = card['mblog']['attitudes_count'] parse_dict['url'] = scheme parse_dict['text'] = text parse_dict['author'] = user parse_dict['comments_count'] = comments_count parse_dict['attitudes_count'] = attitudes_count parse_dict_list.append(copy.deepcopy(parse_dict)) def insert_data(file_name): """ 將數據導出到excle中 :param file_name: 文件名 :return: """ wr = xlwt.Workbook(encoding='utf8') table = wr.add_sheet(file_name) table.write(0, 0, '原鏈接') table.write(0, 1, '正文') table.write(0, 2, '作者首頁') table.write(0, 3, '評論數') table.write(0, 4, '點贊數') for index, data in enumerate(parse_dict_list): table.write(index + 1, 0, data['url']) table.write(index + 1, 1, data['text']) table.write(index + 1, 2, data['author']) table.write(index + 1, 3, data['comments_count']) table.write(index + 1, 4, data['attitudes_count']) file_path = file_name + '.xls' wr.save(file_path) def main(file_name): """ 開啟多任務循環 :return: None """ data_list = get_page() # 接收data參數列表 task_list = [] # 定義一個任務列表 for data in data_list: c = crawl(data) # 調用協程,傳參 task = asyncio.ensure_future(c) # 創建任務對象 task_list.append(task) # 將任務添加到列表中 loop = asyncio.get_event_loop() # 創建事件循環 loop.run_until_complete(asyncio.wait(task_list)) # 開啟循環,並將阻塞的任務掛起 insert_data(file_name) if __name__ == '__main__': kw = input('關鍵詞:') headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2'} url = 'https://m.weibo.cn/api/container/getIndex' parse_dict_list = [] # 臨時存放爬取的數據 main(kw)
注意,由於微博的反爬機制,每次短時間的大量請求都會導致ip被短時間禁用,此處可以通過添加代理的方式來解決。我的想法是在頁碼解析部分添加代理池,隨機選擇代理,如果當前ip返回的狀態碼為200,則進行解析出頁碼,並將該ip攜帶到頁面解析;若狀態碼不是200,則循環選擇下一個ip。
