pandas數據處理(一)pymongo數據庫量大插入時去重速度慢


  之前寫腳本爬斗魚主播信息時用了一個pymongo的去重語句

db['host_info'].update({'主播': data['主播'], '時間': data['時間']}, {'$set': data}, True):

  這句話以主播和時間為索引判斷數據庫中如果沒有同一主播同一時間的數據就更新到數據庫。一開始還是很好用的,爬取速度還可以,但是我的計划是每天晚上爬取黃金時間整點段的數據,幾個小時過后數據量就達到了十幾萬條,然后速度越來越慢,mongodb進程占用cpu率很高,可以看到數據是一條條地存進去。畢竟以十幾萬條數據為基准去重工作量很大,隨着數據量的增長會更加慢,直到我的電腦爆掉。

 

  仔細分析了一下,我用主播和時間作為索引,每一個整點爬取一次,所以每一次爬取的時間肯定不一樣,也就是每一次爬的過程中可能會有重復數據,次與次之間不會存在重復數據。於是就把數據先放入一個空的臨時數據表中,仍然用上面的去重方法做去重,等數據全部爬完后再把臨時空數據庫中的數據存入主數據庫中,這時只需要插入,不需要去重,存入的速度是很快的。

#-*- coding:utf-8 -*-
#_author:John
#date:2018/12/29 20:11
import time
from functools import partial
import requests
import json
from multiprocessing import Pool
import pymongo
import datetime

client = pymongo.MongoClient('localhost')
db = client['douyu']


def single_page_info(page, cur_time):
    # 防止網絡無響應,重試3次
    for i in range(3):
        try:
            respones = requests.get('https://www.douyu.com/gapi/rkc/directory/0_0/{}'.format(page))
            break
        except Exception as E:
            print(E)
            respones = None
            time.sleep(10)
    if respones:
        datas = json.loads(respones.text)
        items = datas['data']['rl']
        for item in items:
            data = {
                '標題': item['rn'],
                '主播': item['nn'],
                '人氣': item['ol'],
                '類別': item['c2name'],
                '房間號': item['rid'],
                '時間': cur_time
            }
            # 用臨時數據表完成此次爬蟲的去重工作,在程序結束前把臨時數據表刪除
            if db['host_info_draft'].update({'主播': data['主播']}, {'$set': data}, True):
                print('Save to Mongo, {}'.format(data))
            else:
                print('Save to Mong fail, {}'.format(data))
        print('已經完成第{}頁'.format(page))


# 存入主數據表
def write_to_primary_db(data):
    db['host_infos'].insert_one(data)
# 刪除臨時數據表
def drop_draft():
    db.drop_collection('host_info_draft')


if __name__ == '__main__':
    pool = Pool()
    print('start')
    # 多線程抓200頁
    while True:
        # 判斷當前時間是否為整點,如果是則開始爬蟲程序
        minute = datetime.datetime.now().strftime('%M')
        if int(minute) < 2:
            # 把初始時間傳入作為此次爬蟲統一時間,python3用partial可以在map中傳入的函數傳遞參數
            cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
            pool.map(partial(single_page_info, cur_time=cur_time), [page for page in range(1, 201)])
            print('Move the temporary table to the primary table')
            pool.map(write_to_primary_db, [data for data in db['host_info_draft'].find()])
            print('End with this cycle')
            drop_draft()
            sleep_time = 60 - int(datetime.datetime.now().strftime('%M'))
            time.sleep(sleep_time*60)
        else:
            time.sleep(58)

    # cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
    # pool.map(partial(single_page_info, cur_time=cur_time), [page for page in range(1, 201)])
    # drop_draft()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM