基本分布式爬蟲架構:實現分布式豆瓣爬蟲


一、控制節點- URL 管理器

1.1 簡單分布式爬蟲架構

本次分布式爬蟲采用主從模式,主從模式是指一台主機作為控制節點,負責管理所有運行網絡爬蟲的主機,爬蟲只需要從控制節點那里接收任務,並把新生成任務提交給控制節點就可以了,在這個過程中不必與其他爬蟲通信,這種方式實現簡單、利於管理。而控制節點則需要與所有爬蟲進行通信,因此可以看到主從模式是有缺陷的,控制節點會成為整個系統的瓶頸,容易導致整個分布式網絡爬蟲系統性能下降。

1.2 控制節點

控制節點主要分為 URL 管理器、數據存儲器和被控制調度器。控制調度器通過三個進程來協調 URL 管理器和數據存儲器的工作:一個是 URL 管理進程,負責 URL 的管理和將 URL 傳遞給爬蟲節點;一個是數據提取進程,負責讀取爬蟲節點返回的數據,將返回數據中的 URL 交給 URL 管理進程,將標題和摘要等數據交給數據存儲進程;最后一個是數據存儲進程,負責將數據提取進程中提交的數據進行本地存儲。

1.3 URL 管理器

對之前的 url 管理器進行優化,采用 set 內存去重的方式,如果直接存儲大量的 URL 鏈接,尤其是 URL 鏈接很長的時候,很容易造成內存溢出,所以我們將爬取過的 URL 進行 MD5 處理。字符串經過 MD5 處理后的信息摘要長度為128位,將生成的 MD5 摘要存儲到 set 后,可以減少好幾倍的內存消耗,不過 Python 中的 MD5 算法生成的是256位,取中間的128位即可。我們同時添加了 save_progress 和 load_progress 方法進行序列化的操作,將未爬取 URL 集合和已爬取的 URL 集合序列化到本地,保存當前的進度,以便下次恢復狀態。

1.4 代碼如下

 1 import pickle
 2 import hashlib
 3 
 4 
 5 class UrlManager:
 6     def __init__(self):
 7         self.new_urls = self.load_progress('new_urls.txt')   # 未爬取 url 集合
 8         self.old_urls = self.load_progress('old_urls.txt')   # 已爬取 url 集合
 9 
10     def has_new_url(self):
11         """
12         判斷是否有未爬取的 url
13         :return: bool
14         """
15         return self.new_urls_size() != 0
16 
17     def get_new_url(self):
18         """
19         返回一個未爬取的 url
20         :return: str
21         """
22         new_url = self.new_urls.pop()
23         m = hashlib.md5()
24         m.update(new_url.encode('utf-8'))
25         self.old_urls.add(m.hexdigest()[8:-8])
26 
27         return new_url
28 
29     def add_new_url(self, url):
30         """
31         添加一個新的 url
32         :param url: 單個 url
33         :return: None
34         """
35         if url is None:
36             return None
37         m = hashlib.md5()
38         m.update(url.encode('utf-8'))
39         url_md5 = m.hexdigest()[8:-8]
40         if (url not in self.new_urls) and (url_md5 not in self.old_urls):
41             self.new_urls.add(url)
42 
43     def add_new_urls(self, urls):
44         """
45         添加多個新的url
46         :param urls: 多個 url
47         :return: None
48         """
49         if urls is None:
50             return None
51         for url in urls:
52             self.add_new_url(url)
53 
54     def new_urls_size(self):
55         """
56         返回未爬過的 url 集合的大小
57         :return: int
58         """
59         return len(self.new_urls)
60 
61     def old_urls_size(self):
62         """
63         返回已爬過的 url 集合的大小
64         :return: int
65         """
66         return len(self.old_urls)
67 
68     def save_progress(self, path, data):
69         """
70         保存進度
71         :param path: 路徑
72         :return: None
73         """
74         with open(path, 'wb') as file:
75             pickle.dump(data, file)
76 
77     def load_progress(self, path):
78         """
79         從本地文件加載進度
80         :param path: 路徑
81         :return: set
82         """
83         print('[+] 從文件加載進度{}'.format(path))
84         try:
85             with open(path, 'rb') as file:
86                 return pickle.load(file)
87         except:
88             print('[!] 無進度文件')
89 
90         return set()

二、控制節點-數據存儲器

2.1 實現原理

因為存儲方式相同所以數據存儲器的代碼無需修改

2.2 代碼如下

 1 import csv
 2 
 3 class DataOutput: 4 def __init__(self): 5 self.file = open('數據.csv', 'w') 6 self.csv_file = csv.writer(self.file) 7 self.csv_file.writerow(['電影名', '評分', '評分人數']) 8 9 def output_csv(self, data): 10 """ 11 將數據寫入 csv 文件 12 :param data: 數據 13 :return: None 14 """ 15 self.csv_file.writerow(data)

三、控制節點-控制調度器

3.1 實現原理

控制調度器主要是產生並啟動 URL 管理進程、數據提取進程和數據存儲進程,同時維護4個隊列保持進程間的通信,分別為 url_q、result_q、conn_q、store_q。4個隊列說明如下:

  • url_q:隊列是 URL 管理進程將 URL 傳遞給爬蟲節點的通道。
  • result_q:隊列是爬蟲節點將數據返回給數據提取進程的通道。
  • conn_q:隊列是數據提取進程將新的 URL 數據提交給 URL 管理進程的通道。
  • store_q:隊列是數據提取進程將獲取到的數據交給數據存儲進程的通道。

3.2 代碼如下

  1 from multiprocessing.managers import BaseManager
  2 from multiprocessing import Queue, Process 3 from DataOutput import DataOutput 4 from UrlManager import UrlManager 5 import time 6 7 8 class NodeManager: 9 def start_manager(self, url_q, result_q): 10 """ 11 創建一個分布式管理器 12 :param url_q: url 隊列 13 :param result_q: 結果隊列 14 :return: BaseManager 15 """ 16 # 把創建的兩個隊列注冊在網絡上,利用 register 方法,callable 參數關聯了 Queue 對象 17 # 將 Queue 對象在網絡中暴露 18 BaseManager.register('get_task_queue', callable=lambda:url_q) 19 BaseManager.register('get_result_queue', callable=lambda:result_q) 20 # 綁定端口 8001,設置驗證口令"douban",相當於對象的初始化並返回 21 return BaseManager(address=('', 8001), authkey='douban'.encode('utf-8')) 22 23 def url_manager_proc(self, url_q, conn_q, root_url): 24 """ 25 url 管理進程 26 :param url_q: url 隊列 27 :param conn_q: 解析得到的 url 隊列 28 :param root_url: 起始 url 29 :return: None 30 """ 31 url_manage = UrlManager() 32  url_manage.add_new_url(root_url) 33 while True: 34 while url_manage.has_new_url(): 35 print('old_urls={}'.format(url_manage.old_urls_size())) 36 new_url = url_manage.get_new_url() 37  url_q.put(new_url) 38 urls = conn_q.get() 39  url_manage.add_new_urls(urls) 40 else: 41 url_q.put('end') 42 print('控制節點發起結束通知') 43 url_manage.save_progress('old_urls.txt', url_manage.old_urls) 44 url_manage.save_progress('new_urls.txt', url_manage.new_urls) 45 return 46 47 def result_solve_proc(self, result_q, conn_q, store_q): 48 """ 49 數據提取進程 50 :param result_q: 未處理數據隊列 51 :param conn_q: 解析得到的 url 隊列 52 :param store_q: 解析后的數據隊列 53 :return: 54 """ 55 while True: 56 try: 57 if not result_q.empty(): 58 content = result_q.get() 59 if content['new_urls'] == 'end': 60 print('結果分析進程接收通知然后結束') 61 store_q.put('end') 62 return 63 64 conn_q.put(content['new_urls']) 65 store_q.put(content['data']) 66 else: 67 time.sleep(0.1) 68 except: 69 time.sleep(0.1) 70 71 def store_proc(self, store_q): 72 """ 73 數據存儲進程 74 :param store_q: 解析后的數據隊列 75 :return: 76 """ 77 output = DataOutput() 78 while True: 79 if not store_q.empty(): 80 data = store_q.get() 81 82 if data == 'end': 83 print('存儲進程接收結束通知然后結束') 84 return 85 86 for item in data: 87  output.output_csv(item) 88 else: 89 time.sleep(0.1) 90 91 92 if __name__ == '__main__': 93 # 初始化 4 個隊列 94 url_q = Queue() 95 result_q = Queue() 96 conn_q = Queue() 97 store_q = Queue() 98 # 創建分布式管理器 99 node = NodeManager() 100 manager = node.start_manager(url_q, result_q) 101 # 創建 url 管理進程、數據提取進程和數據存儲進程 102 url = 'https://movie.douban.com/top250?start=0' 103 url_manager_proc = Process(target=node.url_manager_proc, args=(url_q, conn_q, url,)) 104 result_solve_proc = Process(target=node.result_solve_proc, args=(result_q, conn_q, store_q,)) 105 store_proc = Process(target=node.store_proc, args=(store_q,)) 106 # 啟動 3 個進程和分布式管理器 107  url_manager_proc.start() 108  result_solve_proc.start() 109  store_proc.start() 110 manager.get_server().serve_forever()

四、爬蟲節點- HTML 下載器

4.1 爬蟲節點

爬蟲節點相對簡單,主要包含 HTML 下載器、HTML 解析器和爬蟲調度器。執行流程如下:

  • 爬蟲調度器從控制節點中的 url_q 隊列讀取 URL。
  • 爬蟲調度器調用 HTML 下載器、HTML 解析器獲取網頁中心的 URL 和標題摘要。
  • 爬蟲調度器將新的 URL 和標題摘要傳入 result_q 隊列交給控制節點。

4.2 代碼如下

 1 import requests
 2 
 3 
 4 class HtmlDownloader: 5 def download(self, url): 6 """ 7 下載 html 頁面源碼 8 :param url: url 9 :return: str / None 10 """ 11 if not url: 12 return None 13 14 headers = { 15 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:63.0) Gecko/20100101 Firefox/63.0', 16  } 17 r = requests.get(url, headers=headers) 18 if r.status_code == 200: 19 r.encoding = 'utf-8' 20 return r.text 21 else: 22 return None

五、爬蟲節點- HTML 解析器

5.1 實現原理

解析規則不變,代碼不變

5.2 代碼如下

 1 from lxml.html import etree
 2 import re 3 4 class HtmlParser: 5 def parser(self, page_url, html_text): 6 """ 7 解析頁面新的 url 鏈接和數據 8 :param page_url: url 9 :param html_text: 頁面內容 10 :return: tuple / None 11 """ 12 if not page_url and not html_text: 13 return None 14 new_urls = self._get_new_urls(page_url, html_text) 15 new_data = self._get_new_data(html_text) 16 17 return new_urls, new_data 18 19 def _get_new_urls(self, page_url, html_text): 20 """ 21 返回解析后的 url 集合 22 :param page_url: url 23 :param html_text: 頁面內容 24 :return: set 25 """ 26 new_urls = set() 27 links = re.compile(r'\?start=\d+').findall(html_text) 28 for link in links: 29 new_urls.add(page_url.split('?')[0] + link) 30 return new_urls 31 32 def _get_new_data(self, html_text): 33 """ 34 返回解析后的數據列表 35 :param html_text: 頁面內容 36 :return: list 37 """ 38 datas = [] 39 for html in etree.HTML(html_text).xpath('//ol[@class="grid_view"]/li'): 40 name = html.xpath('./div/div[@class="info"]/div[@class="hd"]/a/span[1]/text()')[0] 41 score = html.xpath('./div/div[@class="info"]/div[@class="bd"]/div[@class="star"]/span[2]/text()')[0] 42 person_num = html.xpath('./div/div[@class="info"]/div[@class="bd"]/div[@class="star"]/span[4]/text()')[0].strip('人評價') 43  datas.append([name, score, person_num]) 44 return datas

六、爬蟲節點- 爬蟲調度器

6.1 實現原理

爬蟲調度器需要先連接上控制節點,然后從 url_q 隊列中獲取 URL,下載並解析網頁,接着將獲取的數據交給 result_q 隊列並返回給控制節點

6.2 代碼如下

 1 from multiprocessing.managers import BaseManager
 2 from HtmlParser import HtmlParser 3 from HtmlDownloader import HtmlDownloader 4 5 6 class SpiderWork: 7 def __init__(self): 8 BaseManager.register('get_task_queue') 9 BaseManager.register('get_result_queue') 10 11 server_adrr = '192.168.31.101' 12 print('連接到服務器 {}'.format(server_adrr)) 13 self.m = BaseManager(address=(server_adrr, 8001), authkey='douban'.encode('utf-8')) 14  self.m.connect() 15 self.task = self.m.get_task_queue() 16 self.result = self.m.get_result_queue() 17 18 self.downloader = HtmlDownloader() 19 self.parser = HtmlParser() 20 print('初始化完成') 21 22 def crawl(self): 23 while True: 24 try: 25 if not self.task.empty(): 26 url = self.task.get() 27 if url == 'end': 28 print('控制節點通知爬蟲節點停止工作') 29 self.result.put({'new_urls': 'end', 'data': 'end'}) 30 return 31 32 print('爬蟲節點正在解析: {}'.format(url.encode('utf-8'))) 33 content = self.downloader.download(url) 34 new_urls, data = self.parser.parser(url, content) 35 self.result.put({'new_urls': new_urls, 'data': data}) 36 except EOFError: 37 print('連接失敗!') 38 except Exception as e: 39 print(e) 40 print('爬取失敗!') 41 42 43 if __name__ == '__main__': 44 spider = SpiderWork() 45 spider.crawl()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM