這幾天在忙一個爬蟲程序,一直在改進他,從一開始的單線程,好幾秒一張圖片(網絡不好),,,到現在每秒鍾十幾張圖片,,, 四個小時586萬條數據,,,簡直不要太爽 先上圖
最終寫出來的程序,線程數已經可以動態調整了,賊暴力。。。峰值能穩定在50個線程,具體思路可以繼續看
這里終於用到了操作系統的知識,就是生產者和消費者的模型。。。(參考源碼忘記記錄了,抱歉
先簡單說一下目標網站的情況,目標網站是一個圖片網站,有一個列表頁,點進列表頁之后,可以看到很多圖片,這只爬蟲的目的是收集這些圖片鏈接(有了鏈接當然也能下載了...
簡單分析之后發現,在列表頁,會向后台請求一個json格式的數據文件,然后js動態的把里面的id組合成一個鏈接,最終組成如下樣式的鏈接
http://www.xxxxxx.com/photo/json?page=1977
顯而易見,page參數就是指定頁數的,那么,這里就可以先生成一個列表,用for循環把所有列表頁的url加進去,接下來只需要遍歷這個鏈接列表就好了。
#首先構造產品隊列 for i in range(1,11613): url_list.append("http://www.xxxxxx.com/photo/json?page="+str(i)); print('產生鏈接完成');
接下來,就是啟動生產者線程,通過列表里的url,獲取到每一個詳情頁的id,進而拼接出詳情頁的url,接下來把生產的詳情頁url添加到一個任務隊列里面就好了,這就是生產者的工作。
#生產者 def producer(url_list,in_queue): print('進入生產者線程'); global flag; for url in url_list: html = open_page(url); #獲取總頁json 得到每一個頁的id 進而得到每個頁的url if html == '0': continue; else: idurl_list = get_idurl(html); #得到第n頁的所有詳情頁url if len(idurl_list)==0: #如果取不到url 直接進行下一頁 continue; for idurl in idurl_list: in_queue.put(idurl); #print('生產完成一個'); flag=1; print('產品生產完成');
接着,需要在等待幾秒鍾,讓生產者先生產一些產品。
然后創建一個管理消費者的線程,能夠創建新的消費者線程
#線程管理線程 consumer_thread = Thread(target=manger_thread,args=(in_queue,)); consumer_thread.daemon = True; consumer_thread.start();
線程里面的代碼是這樣的
def manger_thread(in_queue): global thread_num; while True: if in_queue.qsize()>3000 and thread_num<80: #設置最大線程80 consumer_thread = Thread(target=consumer,args=(in_queue,)); consumer_thread.daemon = True; consumer_thread.start(); thread_num+=1;
簡單解釋一下,有一個全局變量,thread_num 這個就是用來調整進程數的依據,始終為消費者數目。
接着,創建一個死循環,不停的判斷任務隊列中的產品數量,超過3000個,並且現在線程數小於80個,那就創建一個消費者線程。
消費者代碼:
#消費者 def consumer(in_queue): global count; global flag; global thread_num; print('進入消費者線程,隊列長度: '+str(in_queue.qsize())); while True: if in_queue.qsize()<3000 and thread_num>10: #隊列中數量小於5000 並且線程數大於10 就取消一個線程 thread_num-=1; return; html = open_page(in_queue.get()); #取得一個詳情頁鏈接開始取得源碼 if html == '0': #獲取源碼失敗 in_queue.task_done(); #雖然打開網頁失敗了 但是似乎還是得確認完成 continue; image_url = get_url(html); #得到詳情頁圖片url列表 save_url(image_url); #保存鏈接 #print('隊列長度: '+str(in_queue.qsize())); count+=1; os.system('title '+'已爬組數:'+str(count)+'_隊列長度:'+str(in_queue.qsize())+'_線程數:'+str(thread_num)); in_queue.task_done();
首先聲明的幾個全局變量是用來顯示各種參數的
這里依舊是一個死循環,循環中判斷 任務隊列中產品數量小於3000並且線程數大於10的話,那就退出這個線程。 通過線程管理線程以及這里的調整,隊列長度穩定在3000
然后打開網頁源碼,解析圖片鏈接即可。
值得一提的是,直接獲取那個網頁的源碼,並不能得到圖片的鏈接,需要對連接中字符串進行替換,,,具體怎么替換,需要查看js代碼,然后用python源碼實現一遍就好。
下面放出所有的源碼(要注意,代碼中所有url全部都是修改了的,所以代碼不能直接運行,,,如果想讓他運行起來,可以私信我,或者留言給我
#encoding:utf-8 import bs4; import urllib.request; import urllib.error; # abc from urllib.request import urlretrieve import time; import os; import json; from queue import Queue; import threading; from retrying import retry; from threading import Thread; count = 0; #記錄組數 thread_num = 0; #線程數 flag = 0; #生產者完成標志 #打開網頁 直接返回源碼 @retry(wait_fixed=1000,stop_max_attempt_number=50) #異常重試 def open_page(url): print('打開網頁: '+url); header = {}; header['User-Agent'] = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 Safari/537.36"; req = urllib.request.Request(url,headers=header); response = urllib.request.urlopen(req,timeout=5); #times=str(time.time()); #print('讀取內容'+times); temp = response.read().decode('utf-8'); #print('讀取結束'+times); return temp; #保存url def save_url(url_list): #考慮到追加字符串比較頻繁 所以組合成一個大的字符串一起寫入可以降低磁盤I/O (大概? #print('保存url: '+str(len(url_list))); file_handle = open('list.txt','a+'); #不存在文件就創建並且追加 url_lists=[url+'\n' for url in url_list]; #添加回車 file_handle.writelines(url_lists); file_handle.close(); #從詳細頁中得到照片的url 返回列表 def get_url(html): image_url=[]; try: if len(html)==0: #傳參是空的 return []; soup = bs4.BeautifulSoup(html,'html.parser'); #解析html代碼 for img in soup.find_all('img'): #這個循環用來得到 if 'type3' in img['data-avaurl']: str2 = img['data-avaurl'].replace('type3','https://xxxxxx9.com'); if 'type4' in img['data-avaurl']: str2 = img['data-avaurl'].replace('type4','https://xxxxxx4.com'); if 'type5' in img['data-avaurl']: str2 = img['data-avaurl'].replace('type5','https://xxxxxx1.com'); image_url.append(str2); except Exception as e: #返回空列表 print('發生錯誤: '+e); return []; return image_url; #得到每個id 對應的詳情頁url 返回列表 def get_idurl(html): idurl_list=[]; if len(html)==0: #傳參為空 直接返回 return []; for item in json.loads(html)['data']['items']: idurl_list.append("http://www.xxxxxxx.com/photo/show?id="+str(item['id'])); #獲取到每一頁的url return idurl_list; #生產者 def producer(url_list,in_queue): print('進入生產者線程'); global flag; for url in url_list: html = open_page(url); #獲取總頁json 得到每一個頁的id 進而得到每個頁的url if html == '0': continue; else: idurl_list = get_idurl(html); #得到第n頁的所有詳情頁url if len(idurl_list)==0: #如果取不到url 直接進行下一頁 continue; for idurl in idurl_list: in_queue.put(idurl); #print('生產完成一個'); flag=1; print('產品生產完成'); #消費者 def consumer(in_queue): global count; global flag; global thread_num; print('進入消費者線程,隊列長度: '+str(in_queue.qsize())); while True: if in_queue.qsize()<3000 and thread_num>10: #隊列中數量小於5000 並且線程數大於10 就取消一個線程 thread_num-=1; return; html = open_page(in_queue.get()); #取得一個詳情頁鏈接開始取得源碼 if html == '0': #獲取源碼失敗 in_queue.task_done(); #雖然打開網頁失敗了 但是似乎還是得確認完成 continue; image_url = get_url(html); #得到詳情頁圖片url列表 save_url(image_url); #保存鏈接 #print('隊列長度: '+str(in_queue.qsize())); count+=1; os.system('title '+'已爬組數:'+str(count)+'_隊列長度:'+str(in_queue.qsize())+'_線程數:'+str(thread_num)); in_queue.task_done(); def manger_thread(in_queue): global thread_num; while True: if in_queue.qsize()>3000 and thread_num<80: #設置最大線程80 consumer_thread = Thread(target=consumer,args=(in_queue,)); consumer_thread.daemon = True; consumer_thread.start(); thread_num+=1; if __name__=='__main__': start_time = time.time(); url_list = []; #構造的產品集合 in_queue = Queue(); #次級產品隊列 queue = Queue(); #線程隊列 #首先構造產品隊列 for i in range(1,11613): url_list.append("http://www.xxxxxxx.com/photo/json?page="+str(i)); print('產生鏈接完成'); producer_thread = Thread(target=producer,args=(url_list,in_queue,)); #創建生產者線程 producer_thread.daemon = True; #設置為守護線程,主線程不退出,子線程也不退出 producer_thread.start(); #啟動生產者線程,生產url time.sleep(15); #線程管理線程 consumer_thread = Thread(target=manger_thread,args=(in_queue,)); consumer_thread.daemon = True; consumer_thread.start(); in_queue.join(); #阻塞,直到所有的次級產品消耗完畢 print('所有產品消費完成,花費時間: '+str(time.time()-start_time)+'已爬組數: '+count); exit();
因為我自己也是才開始寫爬蟲的原因,上面的代碼很粗糙,,,但是我發誓,我有用心寫。
代碼的缺點也很明顯,就是不停的銷毀線程,創建線程很耗費資源,,,這里需要改進,也許需要使用線程池(我的服務器CPU滿載了,驚喜的是 網絡頁滿載了,意味着,基本上速度最快了(帶寬瓶頸
動態調整線程的原因是因為,列表頁的服務器和詳情頁圖片的服務器不一樣,這就意味着有時候任務隊列中任務很多,有時候消費者又會餓着,浪費時間。
還有就是,這次的目標網站幾乎沒有反爬措施(如果詳情頁圖片鏈接需要替換不算反爬措施),,, 所以很順利,也能很暴力 但是更多的網站都是有反爬的。。。需要混合代理服務器
需要運行代碼調試學習交流的朋友請在評論區留言或者發私信
希望能幫助大家,更希望有大佬指導 謝謝 ^ _ ^