以前一直使用PHP寫爬蟲,用Snoopy配合simple_html_dom用起來也挺好的,至少能夠解決問題。
PHP一直沒有一個好用的多線程機制,雖然可以使用一些trick的手段來實現並行的效果(例如借助apache或者nginx服務器等,或者fork一個子進程,或者直接動態生成多個PHP腳本多進程運行),但是無論從代碼結構上,還是從使用的復雜程度上,用起來都不是那么順手。還聽說過一個pthreads的PHP的擴展,這是一個真正能夠實現PHP多線程的擴展,看github上它的介紹:Absolutely, this is not a hack, we don't use forking or any other such nonsense, what you create are honest to goodness posix threads that are completely compatible with PHP and safe ... this is true multi-threading :)
扯遠了,PHP的內容在本文中不再贅述,既然決定嘗試一下Python的采集,同時一定要學習一下Python的多線程知識的。以前一直聽各種大牛們將Python有多么多么好用,不真正用一次試試,自己也沒法明確Python具體的優勢在哪,處理哪些問題用Python合適。
廢話就說這么多吧,進入正題
- 采集目標:淘寶
- 采集數據:某一關鍵詞領域的淘寶店鋪名稱、URL地址、店鋪等級
- 用到的第三方packages:
- requests(話說是看了前兩天的一篇文章 Python modules you should know (傳送門) 才知道的,以前只知道urllib2)
- BeautifulSoup(現在貌似有新版本bs4了,不過我用的是舊版本的)
- Redis
采集
單線程版本
代碼:
search_config.py
1 #!/usr/bin/env python 2 # coding=utf-8 3 class config: 4 keyword = '青島' 5 search_type = 'shop' 6 url = 'http://s.taobao.com/search?q=' + keyword + '&commend=all&search_type='+ search_type +'&sourceId=tb.index&initiative_id=tbindexz_20131207&app=shopsearch&s='
single_scrapy.py
#!/usr/bin/env python # coding=utf-8 import requests from search_config import config class Scrapy(): def __init__(self, threadname, start_num): self.threadname = threadname self.start_num = start_num print threadname + 'start.....' def run(self): url = config.url + self.start_num response = requests.get(url) print self.threadname + 'end......' def main(): for i in range(0,13,6): scrapy = Scrapy('scrapy', str(i)) scrapy.run() if __name__ == '__main__': main()
運行分析:
這是最簡單最常規的一種采集方式,按照順序循環進行網絡連接,獲取頁面信息。看截圖可知,這種方式的效率其實是極低的,一個連接進行網絡I/O的時候,其他的必須等待前面的連接完成才能進行連接,換句話說,就是前面的連接阻塞的后面的連接。
多線程版本
代碼:
#!/usr/bin/env python # coding=utf-8 import requests from search_config import config import threading class Scrapy(threading.Thread): def __init__(self, threadname, start_num): threading.Thread.__init__(self, name = threadname) self.threadname = threadname self.start_num = start_num print threadname + 'start.....' #重寫Thread類的run方法 def run(self): url = config.url + self.start_num response = requests.get(url) print self.threadname + 'end......' def main(): for i in range(0,13,6): scrapy = Scrapy('scrapy', str(i)) scrapy.start() if __name__ == '__main__': main()
運行分析:
通過截圖可以看到,采集同樣數量的頁面,通過開啟多線程,時間縮短了很多,但是CPU利用率高了。
頁面信息解析
html頁面信息拿到以后,我們需要對其進行解析操作,從中提取出我們所需要的信息,包含:
- 店鋪名稱
- 店鋪URL
- 店鋪等級
使用BeautifulSoup這個庫,可以直接按照class或者id等html的attr來進行提取,比直接寫正則直觀不少,難度也小了很多,當然,執行效率上,相應的也就大打折扣了。
代碼:
-
這里使用
Queue實現一個生產者和消費者模式- 生產者消費者模式:
- 生產者將數據依次存入隊列,消費者依次從隊列中取出數據。
- 本例中,通過scrapy線程不斷提供數據,parse線程從隊列中取出數據進行相應解析
- 生產者消費者模式:
-
Queue模塊
- Python中的Queue對象也提供了對線程同步的支持,使用Queue對象可以實現多個生產者和多個消費者形成的FIFO的隊列。
- 當共享信息需要安全的在多線程之間交換時,Queue非常有用。
- Queue的默認長度是無限的,但是可以設置其構造函數的maxsize參數來設定其長度。
#!/usr/bin/env python # coding=utf-8 import requests from BeautifulSoup import BeautifulSoup from search_config import config from Queue import Queue import threading class Scrapy(threading.Thread): def __init__(self, threadname, queue, out_queue): threading.Thread.__init__(self, name = threadname) self.sharedata = queue self.out_queue= out_queue self.threadname = threadname print threadname + 'start.....' def run(self): url = config.url + self.sharedata.get() response = requests.get(url) self.out_queue.put(response) print self.threadname + 'end......' class Parse(threading.Thread): def __init__(self, threadname, queue, out_queue): threading.Thread.__init__(self, name = threadname) self.sharedata = queue self.out_queue= out_queue self.threadname = threadname print threadname + 'start.....' def run(self): response = self.sharedata.get() body = response.content.decode('gbk').encode('utf-8') soup = BeautifulSoup(body) ul_html = soup.find('ul',{'id':'list-container'}) lists = ul_html.findAll('li',{'class':'list-item'}) stores = [] for list in lists: store= {} try: infos = list.findAll('a',{'trace':'shop'}) for info in infos: attrs = dict(info.attrs) if attrs.has_key('class'): if 'rank' in attrs['class']: rank_string = attrs['class'] rank_num = rank_string[-2:] if (rank_num[0] == '-'): store['rank'] = rank_num[-1] else: store['rank'] = rank_num if attrs.has_key('title'): store['title'] = attrs['title'] store['href'] = attrs['href'] except AttributeError: pass if store: stores.append(store) for store in stores: print store['title'] + ' ' + store['rank'] print self.threadname + 'end......' def main(): queue = Queue() targets = Queue() stores = Queue() scrapy = [] for i in range(0,13,6): #queue 原始請求 #targets 等待解析的內容 #stores解析完成的內容,這里為了簡單直觀,直接在線程中輸出了內容,並沒有使用該隊列 queue.put(str(i)) scrapy = Scrapy('scrapy', queue, targets) scrapy.start() parse = Parse('parse', targets, stores) parse.start() if __name__ == '__main__': main()
運行結果
看這個運行結果,可以看到,我們的scrapy過程很快就完成了,我們的parse也很早就開始了,可是在運行的時候,卻卡在parse上好長時間才出的運行結果,每一個解析結果出現,都需要3~5秒的時間,雖然我用的是台老IBM破本,但按理說使用了多線程以后不應該會這么慢的啊。
同樣的數據,我們再看一下單線程下,運行結果。這里為了方便,我在上一個multi_scrapy里加入了redis,使用redis存儲爬行下來的原始頁面,這樣在single_parse.py里面可以單獨使用,更方便一些。
單線程版本:
代碼:
#!/usr/bin/env python # coding=utf-8 from BeautifulSoup import BeautifulSoup import redis class Parse(): def __init__(self, threadname, content): self.threadname = threadname self.content = content print threadname + 'start.....' def run(self): response = self.content if response: body = response.decode('gbk').encode('utf-8') soup = BeautifulSoup(body) ul_html = soup.find('ul',{'id':'list-container'}) lists = ul_html.findAll('li',{'class':'list-item'}) stores = [] for list in lists: store= {} try: infos = list.findAll('a',{'trace':'shop'}) for info in infos: attrs = dict(info.attrs) if attrs.has_key('class'): if 'rank' in attrs['class']: rank_string = attrs['class'] rank_num = rank_string[-2:] if (rank_num[0] == '-'): store['rank'] = rank_num[-1] else: store['rank'] = rank_num if attrs.has_key('title'): store['title'] = attrs['title'] store['href'] = attrs['href'] except AttributeError: pass if store: stores.append(store) for store in stores: try: print store['title'] + ' ' + store['rank'] except KeyError: pass print self.threadname + 'end......' else: pass def main(): r = redis.StrictRedis(host='localhost', port=6379) while True: content = r.lpop('targets') if (content): parse = Parse('parse', content) parse.run() else: break if __name__ == '__main__': main()
運行結果:

結果可以看到,單線程版本中,耗時其實和多線程是差不多的,上文中的多線程版本,雖然包含了獲取頁面的時間,但是地一個例子里我們已經分析了,使用多線程以后,三個頁面的抓取,完全可以在1s內完成的,也就是說,使用多線程進行數據解析,並沒有獲得實質上的效率提高。
分析原因
- 看兩個運行的CPU占用,第一個127%,第二個98%,都是非常高的,這說明,在處理字符串解析匹配提取等運算密集型的工作時,並行的概念並沒有很好得得到發揮
- 由於共享數據不存在安全問題,所以上面的例子都是非線程安全的,並沒有為共享數據加鎖,只是實現了最簡單的FIFO,所以也不會是因為鎖的開銷導致效率沒有得到真正提高
- 網上搜索資料,發現python多線程似乎並不能利用多核,問題似乎就是出在這里了,在python上開啟多個線程,由於GIL的存在,每個單獨線程都會在競爭到GIL后才運行,這樣就干預OS內部的進程(線程)調度,結果在多核CPU上,python的多線程實際是串行執行的,並不會同一時間多個線程分布在多個CPU上運行。Python由於有全鎖局的存在(同一時間只能有一個線程執行),並不能利用多核優勢。所以,如果你的多線程進程是CPU密集型的,那多線程並不能帶來效率上的提升,相反還可能會因為線程的頻繁切換,導致效率下降;如果是IO密集型,多線程進程可以利用IO阻塞等待時的空閑時間執行其他線程,提升效率。
- 問題答案:由於數據解析操作是CPU密集型的操作,而網絡請求是I/O密集型操作,所以出現了上述結果。
解決方法
GIL既然是針對一個python解釋器進程而言的,那么,如果解釋器可以多進程解釋執行,那就不存在GIL的問題了。同樣,他也不會導致你多個解釋器跑在同一個核上。 所以,最好的解決方案,是多線程+多進程結合。通過多線程來跑I/O密集型程序,通過控制合適數量的進程來跑CPU密集型的操作,這樣就可以跑慢CPU了:)
