之前寫腳本爬斗魚主播信息時用了一個pymongo的去重語句
db['host_info'].update({'主播': data['主播'], '時間': data['時間']}, {'$set': data}, True):
這句話以主播和時間為索引判斷數據庫中如果沒有同一主播同一時間的數據就更新到數據庫。一開始還是很好用的,爬取速度還可以,但是我的計划是每天晚上爬取黃金時間整點段的數據,幾個小時過后數據量就達到了十幾萬條,然后速度越來越慢,mongodb進程占用cpu率很高,可以看到數據是一條條地存進去。畢竟以十幾萬條數據為基准去重工作量很大,隨着數據量的增長會更加慢,直到我的電腦爆掉。
仔細分析了一下,我用主播和時間作為索引,每一個整點爬取一次,所以每一次爬取的時間肯定不一樣,也就是每一次爬的過程中可能會有重復數據,次與次之間不會存在重復數據。於是就把數據先放入一個空的臨時數據表中,仍然用上面的去重方法做去重,等數據全部爬完后再把臨時空數據庫中的數據存入主數據庫中,這時只需要插入,不需要去重,存入的速度是很快的。
#-*- coding:utf-8 -*- #_author:John #date:2018/12/29 20:11 import time from functools import partial import requests import json from multiprocessing import Pool import pymongo import datetime client = pymongo.MongoClient('localhost') db = client['douyu'] def single_page_info(page, cur_time): # 防止網絡無響應,重試3次 for i in range(3): try: respones = requests.get('https://www.douyu.com/gapi/rkc/directory/0_0/{}'.format(page)) break except Exception as E: print(E) respones = None time.sleep(10) if respones: datas = json.loads(respones.text) items = datas['data']['rl'] for item in items: data = { '標題': item['rn'], '主播': item['nn'], '人氣': item['ol'], '類別': item['c2name'], '房間號': item['rid'], '時間': cur_time } # 用臨時數據表完成此次爬蟲的去重工作,在程序結束前把臨時數據表刪除 if db['host_info_draft'].update({'主播': data['主播']}, {'$set': data}, True): print('Save to Mongo, {}'.format(data)) else: print('Save to Mong fail, {}'.format(data)) print('已經完成第{}頁'.format(page)) # 存入主數據表 def write_to_primary_db(data): db['host_infos'].insert_one(data) # 刪除臨時數據表 def drop_draft(): db.drop_collection('host_info_draft') if __name__ == '__main__': pool = Pool() print('start') # 多線程抓200頁 while True: # 判斷當前時間是否為整點,如果是則開始爬蟲程序 minute = datetime.datetime.now().strftime('%M') if int(minute) < 2: # 把初始時間傳入作為此次爬蟲統一時間,python3用partial可以在map中傳入的函數傳遞參數 cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M') pool.map(partial(single_page_info, cur_time=cur_time), [page for page in range(1, 201)]) print('Move the temporary table to the primary table') pool.map(write_to_primary_db, [data for data in db['host_info_draft'].find()]) print('End with this cycle') drop_draft() sleep_time = 60 - int(datetime.datetime.now().strftime('%M')) time.sleep(sleep_time*60) else: time.sleep(58) # cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M') # pool.map(partial(single_page_info, cur_time=cur_time), [page for page in range(1, 201)]) # drop_draft()