之前有寫過用單線程建立代理ip池,但是大家很快就會發現,用單線程來一個個測試代理ip實在是太慢了,跑一次要很久才能結束,完全無法忍受。所以這篇文章就是換用多線程來建立ip池,會比用單線程快很多。之所以用多線程而不是多進程,是因為測試時間主要是花費在等待網絡傳遞數據上,處理本地計算的時間很短,用多線程能更好地發揮單核性能,而且多線程開銷比多進程開銷小得多。當然,單核性能會有極限,如果想再提高性能就需要多進程和多線程混用了。當然這里說的是用CPython作為解釋器時候的情況,因為絕大多數人用的都是CPython,所以以下說的都是這種情況。
受限於個人學識,對多進程和多線程的理解也不是很深刻,如果以后有機會會寫寫關於並發編程的文章。CPython因為GIL鎖的原因,多線程無法發揮多核性能,但是可以用多進程來發揮多核性能。注意GIL鎖不是python語言特性,只是CPython解釋器的原因。任何python線程在執行前,都必須獲得GIL鎖,然后每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程執行。所以python線程只能交替執行,即使有多個線程跑在多核CPU上,也只能利用一個核。
其實程序主體在之前的文章已經寫好了,我們需要的只是稍微做點改進,以適合多線程編程。我的思路是,設置一個線程專門用來爬取待測試ip,其他線程獲取待測試ip進行測試。這也是分布式編程的思想。
我們首先設置一個隊列,用來儲存待測試ip。
thread_lock = threading.Lock() test_ip_list = Queue()
然后對之前的函數進行一些修改。
def download_page(url, timeout=10): headers=hidden_reptile.random_header() data = requests.get(url, headers=headers, timeout=timeout) return data def test_ip(test_url): while True: if test_ip_list.empty(): return ip = test_ip_list.get() proxies = { 'http': ip[0]+':'+ip[1], 'https': ip[0] + ':' + ip[1] } try_ip = ip[0] try: r=requests.get(test_url,headers=hidden_reptile.random_header(),proxies=proxies,timeout=10) if r.status_code == 200: r.encoding = 'gbk' result=re.search('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',r.text) result=result.group() if result[:9]==try_ip[:9]: print('%s:%s 測試通過' % (ip[0],ip[1])) thread_lock.acquire() with open('proxy_ip.txt', 'a') as f: f.write(ip[0] + ':' + ip[1] + '\n') thread_lock.release() else: print('%s:%s 攜帶代理失敗,使用了本地IP' %(ip[0],ip[1])) else: print('%s:%s 請求碼不是200' %(ip[0],ip[1])) except Exception as e: print(e) print('%s:%s 錯誤' %(ip[0],ip[1])) def get_proxies(page_num, ip_url_list): for ip_url in ip_url_list: for page in range(1, page_num+1): print("抓取第%d頁代理IP" %page) url= ip_url.format(page) r=download_page(url) r.encoding='utf-8' pattern = re.compile('<td class="country">.*?alt="Cn" />.*?</td>.*?<td>(.*?)</td>.*?<td>(.*?)</td>', re.S) ip_list= re.findall(pattern, r.text) for ip in ip_list: test_ip_list.put(ip) time.sleep(10) print('{}抓取結束'.format(ip_url))
注意寫入文件的時候需要加進程鎖,因為寫入的是同一個文件,不加線程鎖的話可能一個線程寫入到一半,就被其他線程搶了,然后寫入其他東西。所有的待測試ip都來自python隊列test_ip_list,對其進行操作的時候不用添加線程鎖,因為它自帶了線程鎖。
最后,寫運行部分。
if __name__ == '__main__': number_of_threads = 8 total_pages = 20 threads = [] url = ["http://www.xicidaili.com/nt/{}"] test_url = 'http://ip.tool.chinaz.com/' t = threading.Thread(target=get_proxies, args=(total_pages, url)) t.setDaemon(True) t.start() threads.append(t) time.sleep(1) for i in range(1, number_of_threads): t = threading.Thread(target=test_ip, args=(test_url,)) t.setDaemon(True) threads.append(t) t.start() for thread in threads: thread.join()
如果有其他可以爬取ip的網址可以加到url列表中,total_page是總共爬取的頁數。開了第一個線程之后暫停1s,是在等待它添加待測試ip進入隊列中。