純手工打造簡單分布式爬蟲(Python)


前言

  這次分享的文章是我《Python爬蟲開發與項目實戰》基礎篇 第七章的內容,關於如何手工打造簡單分布式爬蟲 (如果大家對這本書感興趣的話,可以看一下 試讀樣章),下面是文章的具體內容。

  本章講的依舊是實戰項目,實戰內容是打造分布式爬蟲,這對初學者來說,是一個不小的挑戰,也是一次有意義的嘗試。這次打造的分布式爬蟲采用比較簡單的主從模式,完全手工打造,不使用成熟框架,基本上涵蓋了前六章的主要知識點,其中涉及分布式的知識點是分布式進程和進程間通信的內容,算是對Python爬蟲基礎篇的總結。

  現在大型的爬蟲系統都是采取分布式爬取結構,通過此次實戰項目,讓大家對分布式爬蟲有一個比較清晰地了解,為之后系統的講解分布式爬蟲打下基礎,其實它並沒有多么困難。實戰目標:爬取2000個百度百科網絡爬蟲詞條以及相關詞條的標題、摘要和鏈接等信息,采用分布式結構改寫第六章的基礎爬蟲,使功能更加強大。爬取頁面請看圖6.1。

                       

 

7.1簡單分布式爬蟲結構

  本次分布式爬蟲采用主從模式。主從模式是指由一台主機作為控制節點負責所有運行網絡爬蟲的主機進行管理,爬蟲只需要從控制節點那里接收任務,並把新生成任務提交給控制節點就可以了,在這個過程中不必與其他爬蟲通信,這種方式實現簡單利於管理。而控制節點則需要與所有爬蟲進行通信,因此可以看到主從模式是有缺陷的,控制節點會成為整個系統的瓶頸,容易導致整個分布式網絡爬蟲系統性能下降。

此次使用三台主機進行分布式爬取,一台主機作為控制節點,另外兩台主機作為爬蟲節點。爬蟲結構如圖7.1所示:

                        

圖7.1 主從爬蟲結構

 

7.2 控制節點ControlNode

  控制節點主要分為URL管理器、數據存儲器和控制調度器。控制調度器通過三個進程來協調URL管理器和數據存儲器的工作,一個是URL管理進程,負責URL的管理和將URL傳遞給爬蟲節點,一個是數據提取進程,負責讀取爬蟲節點返回的數據,將返回數據中的URL交給URL管理進程,將標題和摘要等數據交給數據存儲進程,最后一個是數據存儲進程,負責將數據提取進程中提交的數據進行本地存儲。執行流程如圖7.2所示:

                      

圖7.2 控制節點執行流程

7.2.1 URL管理器

  URL管理器查考第六章的代碼,做了一些優化修改。由於我們采用set內存去重的方式,如果直接存儲大量的URL鏈接,尤其是URL鏈接很長的時候,很容易造成內存溢出,所以我們采用將爬取過的URL進行MD5處理,由於字符串經過MD5處理后的信息摘要長度可以128bit,將生成的MD5摘要存儲到set后,可以減少好幾倍的內存消耗,Python中的MD5算法生成的是32位的字符串,由於我們爬取的url較少,md5沖突不大,完全可以取中間的16位字符串,即16位MD5加密。同時添加了save_progress和load_progress方法進行序列化的操作,將未爬取URL集合和已爬取的URL集合序列化到本地,保存當前的進度,以便下次恢復狀態。URL管理器URLManager.py代碼如下:

#coding:utf-8
import cPickle
import hashlib
class UrlManager(object):
    def __init__(self):
        self.new_urls = self.load_progress('new_urls.txt')#未爬取URL集合
        self.old_urls = self.load_progress('old_urls.txt')#已爬取URL集合
    def has_new_url(self):
        '''
        判斷是否有未爬取的URL
        :return:
        '''
        return self.new_url_size()!=0

    def get_new_url(self):
        '''
        獲取一個未爬取的URL
        :return:
        '''
        new_url = self.new_urls.pop()
        m = hashlib.md5()
        m.update(new_url)
        self.old_urls.add(m.hexdigest()[8:-8])
        return new_url

    def add_new_url(self,url):
        '''
         將新的URL添加到未爬取的URL集合中
        :param url:單個URL
        :return:
        '''
        if url is None:
            return
        m = hashlib.md5()
        m.update(url)
        url_md5 =  m.hexdigest()[8:-8]
        if url not in self.new_urls and url_md5 not in self.old_urls:
            self.new_urls.add(url)

    def add_new_urls(self,urls):
        '''
        將新的URLS添加到未爬取的URL集合中
        :param urls:url集合
        :return:
        '''
        if urls is None or len(urls)==0:
            return
        for url in urls:
            self.add_new_url(url)

    def new_url_size(self):
        '''
        獲取未爬取URL集合的s大小
        :return:
        '''
        return len(self.new_urls)

    def old_url_size(self):
        '''
        獲取已經爬取URL集合的大小
        :return:
        '''
        return len(self.old_urls)

    def save_progress(self,path,data):
        '''
        保存進度
        :param path:文件路徑
        :param data:數據
        :return:
        '''
        with open(path, 'wb') as f:
            cPickle.dump(data, f)

    def load_progress(self,path):
        '''
        從本地文件加載進度
        :param path:文件路徑
        :return:返回set集合
        '''
        print '[+] 從文件加載進度: %s' % path
        try:
            with open(path, 'rb') as f:
                tmp = cPickle.load(f)
                return tmp
        except:
            print '[!] 無進度文件, 創建: %s' % path
        return set()

  

7.2.2數據存儲器

數據存儲器的內容基本上和第六章的一樣,不過生成文件按照當前時間進行命名避免重復,同時對文件進行緩存寫入。代碼如下:

#coding:utf-8
import codecs
import time
class DataOutput(object):
    def __init__(self):
        self.filepath='baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) )
        self.output_head(self.filepath)
        self.datas=[]
    def store_data(self,data):
        if data is None:
            return
        self.datas.append(data)
        if len(self.datas)>10:
            self.output_html(self.filepath)


    def output_head(self,path):
        '''
        將HTML頭寫進去
        :return:
        '''
        fout=codecs.open(path,'w',encoding='utf-8')
        fout.write("<html>")
        fout.write("<body>")
        fout.write("<table>")
        fout.close()

    def output_html(self,path):
        '''
        將數據寫入HTML文件中
        :param path: 文件路徑
        :return:
        '''
        fout=codecs.open(path,'a',encoding='utf-8')
        for data in self.datas:
            fout.write("<tr>")
            fout.write("<td>%s</td>"%data['url'])
            fout.write("<td>%s</td>"%data['title'])
            fout.write("<td>%s</td>"%data['summary'])
            fout.write("</tr>")
            self.datas.remove(data)
        fout.close()

    def ouput_end(self,path):
        '''
        輸出HTML結束
        :param path: 文件存儲路徑
        :return:
        '''
        fout=codecs.open(path,'a',encoding='utf-8')
        fout.write("</table>")
        fout.write("</body>")
        fout.write("</html>")
        fout.close()

  

7.2.3控制調度器

  控制調度器主要是產生並啟動URL管理進程、數據提取進程和數據存儲進程,同時維護4個隊列保持進程間的通信,分別為url_queue,result_queue, conn_q,store_q。4個隊列說明如下:

  • url_q隊列是URL管理進程將URL傳遞給爬蟲節點的通道
  • result_q隊列是爬蟲節點將數據返回給數據提取進程的通道
  • conn_q隊列是數據提取進程將新的URL數據提交給URL管理進程的通道
  • store_q隊列是數據提取進程將獲取到的數據交給數據存儲進程的通道

因為要和工作節點進行通信,所以分布式進程必不可少。參考1.4.4小節分布式進程中服務進程中的代碼(linux版),創建一個分布式管理器,定義為start_manager方法。方法代碼如下:

def start_Manager(self,url_q,result_q):
    '''
    創建一個分布式管理器
    :param url_q: url隊列
    :param result_q: 結果隊列
    :return:
    '''
    #把創建的兩個隊列注冊在網絡上,利用register方法,callable參數關聯了Queue對象,
    # 將Queue對象在網絡中暴露
    BaseManager.register('get_task_queue',callable=lambda:url_q)
    BaseManager.register('get_result_queue',callable=lambda:result_q)
    #綁定端口8001,設置驗證口令‘baike’。這個相當於對象的初始化
    manager=BaseManager(address=('',8001),authkey='baike')
    #返回manager對象
    return manager

  

URL管理進程將從conn_q隊列獲取到的新URL提交給URL管理器,經過去重之后,取出URL放入url_queue隊列中傳遞給爬蟲節點,代碼如下:

def url_manager_proc(self,url_q,conn_q,root_url):
    url_manager = UrlManager()
    url_manager.add_new_url(root_url)
    while True:
        while(url_manager.has_new_url()):

            #從URL管理器獲取新的url
            new_url = url_manager.get_new_url()
            #將新的URL發給工作節點
            url_q.put(new_url)
            print 'old_url=',url_manager.old_url_size()
            #加一個判斷條件,當爬去2000個鏈接后就關閉,並保存進度
            if(url_manager.old_url_size()>2000):
                #通知爬行節點工作結束
                url_q.put('end')
                print '控制節點發起結束通知!'
                #關閉管理節點,同時存儲set狀態
                url_manager.save_progress('new_urls.txt',url_manager.new_urls)
                url_manager.save_progress('old_urls.txt',url_manager.old_urls)
                return
        #將從result_solve_proc獲取到的urls添加到URL管理器之間
        try:
            if not conn_q.empty():
                urls = conn_q.get()
                url_manager.add_new_urls(urls)
        except BaseException,e:
            time.sleep(0.1)#延時休息

  

  數據提取進程從result_queue隊列讀取返回的數據,並將數據中的URL添加到conn_q隊列交給URL管理進程,將數據中的文章標題和摘要添加到store_q隊列交給數據存儲進程。代碼如下:

def result_solve_proc(self,result_q,conn_q,store_q):
    while(True):
        try:
            if not result_q.empty():
                content = result_q.get(True)
                if content['new_urls']=='end':
                    #結果分析進程接受通知然后結束
                    print '結果分析進程接受通知然后結束!'
                    store_q.put('end')
                    return
                conn_q.put(content['new_urls'])#url為set類型
                store_q.put(content['data'])#解析出來的數據為dict類型
            else:
                time.sleep(0.1)#延時休息
        except BaseException,e:
            time.sleep(0.1)#延時休息

  

數據存儲進程從store_q隊列中讀取數據,並調用數據存儲器進行數據存儲。代碼如下:

def store_proc(self,store_q):
    output = DataOutput()
    while True:
        if not store_q.empty():
            data = store_q.get()
            if data=='end':
                print '存儲進程接受通知然后結束!'
                output.ouput_end(output.filepath)

                return
            output.store_data(data)
        else:
            time.sleep(0.1)

  

最后將分布式管理器、URL管理進程、 數據提取進程和數據存儲進程進行啟動,並初始化4個隊列。代碼如下:

if __name__=='__main__':
    #初始化4個隊列
    url_q = Queue()
    result_q = Queue()
    store_q = Queue()
    conn_q = Queue()
    #創建分布式管理器
    node = NodeManager()
    manager = node.start_Manager(url_q,result_q)
    #創建URL管理進程、 數據提取進程和數據存儲進程
    url_manager_proc = Process(target=node.url_manager_proc, args=(url_q,conn_q,'http://baike.baidu.com/view/284853.htm',))
    result_solve_proc = Process(target=node.result_solve_proc, args=(result_q,conn_q,store_q,))
    store_proc = Process(target=node.store_proc, args=(store_q,))
    #啟動3個進程和分布式管理器
    url_manager_proc.start()
    result_solve_proc.start()
    store_proc.start()
    manager.get_server().serve_forever()

  

7.3 爬蟲節點SpiderNode

爬蟲節點相對簡單,主要包含HTML下載器、HTML解析器和爬蟲調度器。執行流程如下:

  • 爬蟲調度器從控制節點中的url_q隊列讀取URL
  • 爬蟲調度器調用HTML下載器、HTML解析器獲取網頁中新的URL和標題摘要
  • 最后爬蟲調度器將新的URL和標題摘要傳入result_q隊列交給控制節點

7.3.1 HTML下載器

HTML下載器的代碼和第六章的一致,只要注意網頁編碼即可。代碼如下:

#coding:utf-8
import requests
class HtmlDownloader(object):

    def download(self,url):
        if url is None:
            return None
        user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
        headers={'User-Agent':user_agent}
        r = requests.get(url,headers=headers)
        if r.status_code==200:
            r.encoding='utf-8'
            return r.text
        return None

  

7.3.2 HTML解析器

HTML解析器的代碼和第六章的一致,詳細的網頁分析過程可以回顧第六章。代碼如下:

#coding:utf-8
import re
import urlparse
from bs4 import BeautifulSoup


class HtmlParser(object):

    def parser(self,page_url,html_cont):
        '''
        用於解析網頁內容抽取URL和數據
        :param page_url: 下載頁面的URL
        :param html_cont: 下載的網頁內容
        :return:返回URL和數據
        '''
        if page_url is None or html_cont is None:
            return
        soup = BeautifulSoup(html_cont,'html.parser',from_encoding='utf-8')
        new_urls = self._get_new_urls(page_url,soup)
        new_data = self._get_new_data(page_url,soup)
        return new_urls,new_data


    def _get_new_urls(self,page_url,soup):
        '''
        抽取新的URL集合
        :param page_url: 下載頁面的URL
        :param soup:soup
        :return: 返回新的URL集合
        '''
        new_urls = set()
        #抽取符合要求的a標簽
        links = soup.find_all('a',href=re.compile(r'/view/\d+\.htm'))
        for link in links:
            #提取href屬性
            new_url = link['href']
            #拼接成完整網址
            new_full_url = urlparse.urljoin(page_url,new_url)
            new_urls.add(new_full_url)
        return new_urls
    def _get_new_data(self,page_url,soup):
        '''
        抽取有效數據
        :param page_url:下載頁面的URL
        :param soup:
        :return:返回有效數據
        '''
        data={}
        data['url']=page_url
        title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
        data['title']=title.get_text()
        summary = soup.find('div',class_='lemma-summary')
        #獲取tag中包含的所有文版內容包括子孫tag中的內容,並將結果作為Unicode字符串返回
        data['summary']=summary.get_text()
        return data

  

7.3.3 爬蟲調度器

  爬蟲調度器需要用到分布式進程中工作進程的代碼,具體內容可以參考第一章的分布式進程章節。爬蟲調度器需要先連接上控制節點,然后依次完成從url_q隊列中獲取URL,下載並解析網頁,將獲取的數據交給result_q隊列,返回給控制節點等各項任務,代碼如下:

class SpiderWork(object):
    def __init__(self):
        #初始化分布式進程中的工作節點的連接工作
        # 實現第一步:使用BaseManager注冊獲取Queue的方法名稱
        BaseManager.register('get_task_queue')
        BaseManager.register('get_result_queue')
        # 實現第二步:連接到服務器:
        server_addr = '127.0.0.1'
        print('Connect to server %s...' % server_addr)
        # 端口和驗證口令注意保持與服務進程設置的完全一致:
        self.m = BaseManager(address=(server_addr, 8001), authkey='baike')
        # 從網絡連接:
        self.m.connect()
        # 實現第三步:獲取Queue的對象:
        self.task = self.m.get_task_queue()
        self.result = self.m.get_result_queue()
        #初始化網頁下載器和解析器
        self.downloader = HtmlDownloader()
        self.parser = HtmlParser()
        print 'init finish'

    def crawl(self):
        while(True):
            try:
                if not self.task.empty():
                    url = self.task.get()

                    if url =='end':
                        print '控制節點通知爬蟲節點停止工作...'
                        #接着通知其它節點停止工作
                        self.result.put({'new_urls':'end','data':'end'})
                        return
                    print '爬蟲節點正在解析:%s'%url.encode('utf-8')
                    content = self.downloader.download(url)
                    new_urls,data = self.parser.parser(url,content)
                    self.result.put({"new_urls":new_urls,"data":data})
            except EOFError,e:
                print "連接工作節點失敗"
                return
            except Exception,e:
                print e
                print 'Crawl  fali '

if __name__=="__main__":
    spider = SpiderWork()
    spider.crawl()

  

  在爬蟲調度器設置了一個本地IP:127.0.0.1,大家可以將在一台機器上測試代碼的正確性。當然也可以使用三台VPS服務器,兩台運行爬蟲節點程序,將IP改為控制節點主機的公網IP,一台運行控制節點程序,進行分布式爬取,這樣更貼近真實的爬取環境。下面圖7.3為最終爬取的數據,圖7.4為new_urls.txt內容,圖7.5為old_urls.txt內容,大家可以進行對比測試,這個簡單的分布式爬蟲還有很大發揮的空間,希望大家發揮自己的聰明才智進一步完善。

                                 

圖7.3 最終爬取的數據

                                  

圖7.4 new_urls.txt

                                    

圖7.5 old_urls.txt

7.4小結

  本章講解了一個簡單的分布式爬蟲結構,主要目的是幫助大家將Python爬蟲基礎篇的知識進行總結和強化,開拓大家的思維,同時也讓大家知道分布式爬蟲並不是這么高不可攀。不過當你親手打造一個分布式爬蟲后,就會知道分布式爬蟲的難點在於節點的調度,什么樣的結構能讓各個節點穩定高效的運作才是分布式爬蟲要考慮的核心內容。到本章為止,Python爬蟲基礎篇已經結束,這個時候大家基本上可以編寫簡單的爬蟲,爬取一些靜態網站的內容,但是Python爬蟲開發不僅如此,大家接着往下學習吧。

最后

打個廣告,如果大家對這本書感興趣的話,可以看一下 試讀樣章 或者直接購買

歡迎大家支持我公眾號:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM