第一篇我們做了一個簡單的頁面廣度優先來抓取url,很顯然缺點有很多,第一:數據結構都是基於內存的,第二:單線程抓取
速度太慢,在實際開發中肯定不會這么做的,起碼得要有序列化到硬盤的機制,對於整個爬蟲架構來說,構建好爬蟲隊列相當重要。
先上一幅我自己構思的架構圖,不是很完善,算是一個雛形吧。
一:TODO隊列和Visited集合
在眾多的nosql數據庫中,mongodb還是很不錯的,這里也就選擇它了,做集群,做分片輕而易舉。
二:中央處理器
群架,斗毆都是有帶頭的,那中央處理器就可以干這樣的事情,它的任務很簡單,
第一: 啟動時,根據我們定義好的規則將種子頁面分發到各個執行服務器。
第二: 定時輪詢”TODO——MongoDB“,將讀取的新Url根據規則分發到對應的執行服務器中。
三:分發服務器
中央處理器將url分發到了執行服務器的內存中,分發服務器可以開啟10個線程依次讀取隊列來獲取url,然后解析url,
第一:如果url是外鏈,直接剔除。
第二:如果url不是本機負責抓取的,就放到”TODO——MongoDB“中。
第三:如果是本機負責的,將新提取的url放入本機內存隊列中。
四:代碼實現
首先下載mongodb http://www.mongodb.org/downloads,簡單起見就在一個database里面建兩個collection。迫不及
待了,我要爬一個美女網站,http://www.800meinv.com ,申明一下,並非推廣網站,看下”中央處理器“的實現。
1 namespace CrawlerCPU 2 { 3 /* 根據規格,一個服務爬取3個導航頁(由 中央處理器 統一管理) 4 * 第一個服務:日韓時裝,港台時裝 5 * 第二個服務:,歐美時裝,明星穿衣,顯瘦搭配 6 * 第三個服務:少女搭配,職場搭配,裙裝搭配 7 */ 8 public class Program 9 { 10 static Dictionary<string, string> dicMapping = new Dictionary<string, string>(); 11 12 static void Main(string[] args) 13 { 14 //初始Url的分發 15 foreach (var key in ConfigurationManager.AppSettings) 16 { 17 var factory = new ChannelFactory<ICrawlerService>(new NetTcpBinding(), new EndpointAddress(key.ToString())) 18 .CreateChannel(); 19 20 var urls = ConfigurationManager.AppSettings[key.ToString()] 21 .Split(new char[] { ';' }, StringSplitOptions.RemoveEmptyEntries) 22 .ToList(); 23 24 factory.AddRange(urls); 25 26 //將網址和“WCF分發地址“建立Mapping映射 27 foreach (var item in urls) 28 dicMapping.Add(item, key.ToString()); 29 } 30 31 Console.WriteLine("爬蟲 中央處理器開啟,正在監視TODO列表!"); 32 33 //開啟定時監視MongoDB 34 Timer timer = new Timer(); 35 36 timer.Interval = 1000 * 10; //10s輪詢一次 37 timer.Elapsed += new ElapsedEventHandler(timer_Elapsed); 38 timer.Start(); 39 40 Console.Read(); 41 } 42 43 static void timer_Elapsed(object sender, ElapsedEventArgs e) 44 { 45 //獲取mongodb里面的數據 46 MongodbHelper<Message> mongodb = new MongodbHelper<Message>("todo"); 47 48 //根據url的類型分發到相應的服務器中去處理 49 var urls = mongodb.List(100); 50 51 if (urls == null || urls.Count == 0) 52 return; 53 54 foreach (var item in dicMapping.Keys) 55 { 56 foreach (var url in urls) 57 { 58 //尋找正確的 wcf 分發地址 59 if (url.Url.StartsWith(item)) 60 { 61 var factory = new ChannelFactory<ICrawlerService>(new NetTcpBinding(), 62 new EndpointAddress(dicMapping[item])) 63 .CreateChannel(); 64 65 //向正確的地方分發地址 66 factory.Add(url.Url); 67 68 break; 69 } 70 } 71 } 72 73 //刪除mongodb中的TODO表中指定數據 74 mongodb.Remove(urls); 75 } 76 } 77 }
接下來,我們開啟WCF服務,當然我們可以做10份,20份的copy,核心代碼如下:
1 /// <summary> 2 /// 開始執行任務 3 /// </summary> 4 public static void Start() 5 { 6 while (true) 7 { 8 //監視工作線程,如果某個線程已經跑完數據,則重新分配任務給該線程 9 for (int j = 0; j < 10; j++) 10 { 11 if (tasks[j] == null || tasks[j].IsCompleted || tasks[j].IsCanceled || tasks[j].IsFaulted) 12 { 13 //如果隊列還有數據 14 if (todoQueue.Count > 0) 15 { 16 string currentUrl = string.Empty; 17 18 todoQueue.TryDequeue(out currentUrl); 19 20 Console.WriteLine("當前隊列的個數為:{0}", todoQueue.Count); 21 22 tasks[j] = Task.Factory.StartNew((obj) => 23 { 24 DownLoad(obj.ToString()); 25 26 }, currentUrl); 27 } 28 } 29 } 30 } 31 }
然后我們把”分發服務器“和”中央處理器“開啟:
好了,稍等會,我們就會看到,數據已經嘩啦啦的往mongodb里面跑了。
五:不足點
有的時候會出現某些機器非常free,而某些機器非常busy,這時就要實現雙工通訊了,當執行服務器的內存隊列到達
一個量級的時候就應該通知中央處理器,要么減緩對該執行服務器的任務分發,要么將任務分給其他的執行服務器。
最后是工程代碼,有什么好的建議可以提出來,大家可以一起研究研究:ConsoleApplication1.rar