動態調整線程數的python爬蟲代碼分享


  這幾天在忙一個爬蟲程序,一直在改進他,從一開始的單線程,好幾秒一張圖片(網絡不好),,,到現在每秒鍾十幾張圖片,,, 四個小時586萬條數據,,,簡直不要太爽 先上圖

  

  最終寫出來的程序,線程數已經可以動態調整了,賊暴力。。。峰值能穩定在50個線程,具體思路可以繼續看

  這里終於用到了操作系統的知識,就是生產者和消費者的模型。。。(參考源碼忘記記錄了,抱歉

  先簡單說一下目標網站的情況,目標網站是一個圖片網站,有一個列表頁,點進列表頁之后,可以看到很多圖片,這只爬蟲的目的是收集這些圖片鏈接(有了鏈接當然也能下載了...

  簡單分析之后發現,在列表頁,會向后台請求一個json格式的數據文件,然后js動態的把里面的id組合成一個鏈接,最終組成如下樣式的鏈接

    

http://www.xxxxxx.com/photo/json?page=1977

  顯而易見,page參數就是指定頁數的,那么,這里就可以先生成一個列表,用for循環把所有列表頁的url加進去,接下來只需要遍歷這個鏈接列表就好了。

  

#首先構造產品隊列
    for i in range(1,11613):
        url_list.append("http://www.xxxxxx.com/photo/json?page="+str(i));
    print('產生鏈接完成');

 

  接下來,就是啟動生產者線程,通過列表里的url,獲取到每一個詳情頁的id,進而拼接出詳情頁的url,接下來把生產的詳情頁url添加到一個任務隊列里面就好了,這就是生產者的工作。

  

#生產者
def producer(url_list,in_queue):
    print('進入生產者線程');
    global flag;
    for url in url_list:
        html = open_page(url);  #獲取總頁json 得到每一個頁的id 進而得到每個頁的url
        if html == '0':
            continue;
        else:
            idurl_list = get_idurl(html);   #得到第n頁的所有詳情頁url
            if len(idurl_list)==0:  #如果取不到url 直接進行下一頁
                continue;
            for idurl in idurl_list:
                in_queue.put(idurl);
                #print('生產完成一個');
    flag=1;
    print('產品生產完成');

 

  接着,需要在等待幾秒鍾,讓生產者先生產一些產品。

  然后創建一個管理消費者的線程,能夠創建新的消費者線程

    #線程管理線程
    consumer_thread = Thread(target=manger_thread,args=(in_queue,));
    consumer_thread.daemon = True;
    consumer_thread.start();

  線程里面的代碼是這樣的

  

def manger_thread(in_queue):
    global thread_num;
    while True:
        if in_queue.qsize()>3000 and thread_num<80: #設置最大線程80
            consumer_thread = Thread(target=consumer,args=(in_queue,));
            consumer_thread.daemon = True;
            consumer_thread.start();
            thread_num+=1;

   簡單解釋一下,有一個全局變量,thread_num 這個就是用來調整進程數的依據,始終為消費者數目。

  接着,創建一個死循環,不停的判斷任務隊列中的產品數量,超過3000個,並且現在線程數小於80個,那就創建一個消費者線程。

  消費者代碼:

  

#消費者
def consumer(in_queue):
    global count;
    global flag;
    global thread_num;
    print('進入消費者線程,隊列長度: '+str(in_queue.qsize()));
    while True:
        if in_queue.qsize()<3000 and thread_num>10:   #隊列中數量小於5000 並且線程數大於10 就取消一個線程
            thread_num-=1;
            return;
        html = open_page(in_queue.get());  #取得一個詳情頁鏈接開始取得源碼
        if html == '0': #獲取源碼失敗
            in_queue.task_done();   #雖然打開網頁失敗了 但是似乎還是得確認完成
            continue;
        image_url = get_url(html);  #得到詳情頁圖片url列表
        save_url(image_url);    #保存鏈接
        #print('隊列長度: '+str(in_queue.qsize()));
        count+=1;
        os.system('title '+'已爬組數:'+str(count)+'_隊列長度:'+str(in_queue.qsize())+'_線程數:'+str(thread_num));
        in_queue.task_done();

  首先聲明的幾個全局變量是用來顯示各種參數的

  這里依舊是一個死循環,循環中判斷 任務隊列中產品數量小於3000並且線程數大於10的話,那就退出這個線程。 通過線程管理線程以及這里的調整,隊列長度穩定在3000

  然后打開網頁源碼,解析圖片鏈接即可。

  值得一提的是,直接獲取那個網頁的源碼,並不能得到圖片的鏈接,需要對連接中字符串進行替換,,,具體怎么替換,需要查看js代碼,然后用python源碼實現一遍就好。

 

  下面放出所有的源碼(要注意,代碼中所有url全部都是修改了的,所以代碼不能直接運行,,,如果想讓他運行起來,可以私信我,或者留言給我

  

#encoding:utf-8 
import bs4;
import urllib.request;
import urllib.error; # abc
from urllib.request import urlretrieve
import time;
import os;
import json;
from queue import Queue;
import threading;
from retrying import retry;
from threading import Thread;

count = 0;  #記錄組數
thread_num = 0; #線程數
flag = 0; #生產者完成標志

#打開網頁 直接返回源碼
@retry(wait_fixed=1000,stop_max_attempt_number=50)  #異常重試
def open_page(url):
    print('打開網頁: '+url);
    header = {};
    header['User-Agent'] = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 Safari/537.36";
    req = urllib.request.Request(url,headers=header);
    response = urllib.request.urlopen(req,timeout=5);
    #times=str(time.time());
    #print('讀取內容'+times);
    temp = response.read().decode('utf-8');
    #print('讀取結束'+times);
    return temp;

#保存url
def save_url(url_list):  #考慮到追加字符串比較頻繁 所以組合成一個大的字符串一起寫入可以降低磁盤I/O (大概?
    #print('保存url: '+str(len(url_list)));
    file_handle = open('list.txt','a+');    #不存在文件就創建並且追加
    url_lists=[url+'\n' for url in url_list];   #添加回車
    file_handle.writelines(url_lists);
    file_handle.close();

#從詳細頁中得到照片的url 返回列表
def get_url(html):
    image_url=[];
    try:
        if len(html)==0:   #傳參是空的
            return [];
        soup = bs4.BeautifulSoup(html,'html.parser');   #解析html代碼
        for img in soup.find_all('img'):    #這個循環用來得到
            if 'type3' in img['data-avaurl']:
                str2 = img['data-avaurl'].replace('type3','https://xxxxxx9.com');
            if 'type4' in img['data-avaurl']:
                str2 = img['data-avaurl'].replace('type4','https://xxxxxx4.com');
            if 'type5' in img['data-avaurl']:
                str2 = img['data-avaurl'].replace('type5','https://xxxxxx1.com');
            image_url.append(str2);
    except Exception as e:  #返回空列表
        print('發生錯誤: '+e);
        return [];
    return image_url;

#得到每個id 對應的詳情頁url 返回列表
def get_idurl(html):
    idurl_list=[];
    if len(html)==0:    #傳參為空 直接返回
        return [];
    for item in json.loads(html)['data']['items']:
        idurl_list.append("http://www.xxxxxxx.com/photo/show?id="+str(item['id'])); #獲取到每一頁的url
    return idurl_list;

#生產者
def producer(url_list,in_queue):
    print('進入生產者線程');
    global flag;
    for url in url_list:
        html = open_page(url);  #獲取總頁json 得到每一個頁的id 進而得到每個頁的url
        if html == '0':
            continue;
        else:
            idurl_list = get_idurl(html);   #得到第n頁的所有詳情頁url
            if len(idurl_list)==0:  #如果取不到url 直接進行下一頁
                continue;
            for idurl in idurl_list:
                in_queue.put(idurl);
                #print('生產完成一個');
    flag=1;
    print('產品生產完成');

#消費者
def consumer(in_queue):
    global count;
    global flag;
    global thread_num;
    print('進入消費者線程,隊列長度: '+str(in_queue.qsize()));
    while True:
        if in_queue.qsize()<3000 and thread_num>10:   #隊列中數量小於5000 並且線程數大於10 就取消一個線程
            thread_num-=1;
            return;
        html = open_page(in_queue.get());  #取得一個詳情頁鏈接開始取得源碼
        if html == '0': #獲取源碼失敗
            in_queue.task_done();   #雖然打開網頁失敗了 但是似乎還是得確認完成
            continue;
        image_url = get_url(html);  #得到詳情頁圖片url列表
        save_url(image_url);    #保存鏈接
        #print('隊列長度: '+str(in_queue.qsize()));
        count+=1;
        os.system('title '+'已爬組數:'+str(count)+'_隊列長度:'+str(in_queue.qsize())+'_線程數:'+str(thread_num));
        in_queue.task_done();

def manger_thread(in_queue):
    global thread_num;
    while True:
        if in_queue.qsize()>3000 and thread_num<80: #設置最大線程80
            consumer_thread = Thread(target=consumer,args=(in_queue,));
            consumer_thread.daemon = True;
            consumer_thread.start();
            thread_num+=1;
            
    
if __name__=='__main__':
    start_time = time.time();
    url_list = [];    #構造的產品集合
    in_queue = Queue(); #次級產品隊列
    queue = Queue();    #線程隊列

    #首先構造產品隊列
    for i in range(1,11613):
        url_list.append("http://www.xxxxxxx.com/photo/json?page="+str(i));
    print('產生鏈接完成');
    
    producer_thread = Thread(target=producer,args=(url_list,in_queue,)); #創建生產者線程
    producer_thread.daemon = True;  #設置為守護線程,主線程不退出,子線程也不退出
    producer_thread.start();    #啟動生產者線程,生產url
    
    time.sleep(15);
    
    #線程管理線程
    consumer_thread = Thread(target=manger_thread,args=(in_queue,));
    consumer_thread.daemon = True;
    consumer_thread.start();
    
    in_queue.join();    #阻塞,直到所有的次級產品消耗完畢
    print('所有產品消費完成,花費時間: '+str(time.time()-start_time)+'已爬組數: '+count);
    exit();

 

  因為我自己也是才開始寫爬蟲的原因,上面的代碼很粗糙,,,但是我發誓,我有用心寫。

  代碼的缺點也很明顯,就是不停的銷毀線程,創建線程很耗費資源,,,這里需要改進,也許需要使用線程池(我的服務器CPU滿載了,驚喜的是 網絡頁滿載了,意味着,基本上速度最快了(帶寬瓶頸

  動態調整線程的原因是因為,列表頁的服務器和詳情頁圖片的服務器不一樣,這就意味着有時候任務隊列中任務很多,有時候消費者又會餓着,浪費時間。

  還有就是,這次的目標網站幾乎沒有反爬措施(如果詳情頁圖片鏈接需要替換不算反爬措施),,, 所以很順利,也能很暴力 但是更多的網站都是有反爬的。。。需要混合代理服務器

 

  需要運行代碼調試學習交流的朋友請在評論區留言或者發私信

  希望能幫助大家,更希望有大佬指導 謝謝 ^ _ ^


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM