玩玩小爬蟲——試搭小架構


 

     第一篇我們做了一個簡單的頁面廣度優先來抓取url,很顯然缺點有很多,第一:數據結構都是基於內存的,第二:單線程抓取

速度太慢,在實際開發中肯定不會這么做的,起碼得要有序列化到硬盤的機制,對於整個爬蟲架構來說,構建好爬蟲隊列相當重要。

     先上一幅我自己構思的架構圖,不是很完善,算是一個雛形吧。

一:TODO隊列和Visited集合

     在眾多的nosql數據庫中,mongodb還是很不錯的,這里也就選擇它了,做集群,做分片輕而易舉。

二:中央處理器

     群架,斗毆都是有帶頭的,那中央處理器就可以干這樣的事情,它的任務很簡單,

    第一: 啟動時,根據我們定義好的規則將種子頁面分發到各個執行服務器。

    第二: 定時輪詢”TODO——MongoDB“,將讀取的新Url根據規則分發到對應的執行服務器中。

三:分發服務器

    中央處理器將url分發到了執行服務器的內存中,分發服務器可以開啟10個線程依次讀取隊列來獲取url,然后解析url,

第一:如果url是外鏈,直接剔除。

第二:如果url不是本機負責抓取的,就放到”TODO——MongoDB“中。

第三:如果是本機負責的,將新提取的url放入本機內存隊列中。

 

 四:代碼實現

 首先下載mongodb http://www.mongodb.org/downloads,簡單起見就在一個database里面建兩個collection。迫不及

待了,我要爬一個美女網站,http://www.800meinv.com ,申明一下,並非推廣網站,看下”中央處理器“的實現。

 1 namespace CrawlerCPU
 2 {
 3     /* 根據規格,一個服務爬取3個導航頁(由 中央處理器 統一管理)
 4      * 第一個服務:日韓時裝,港台時裝
 5      * 第二個服務:,歐美時裝,明星穿衣,顯瘦搭配
 6      * 第三個服務:少女搭配,職場搭配,裙裝搭配
 7      */
 8     public class Program
 9     {
10         static Dictionary<string, string> dicMapping = new Dictionary<string, string>();
11 
12         static void Main(string[] args)
13         {
14             //初始Url的分發
15             foreach (var key in ConfigurationManager.AppSettings)
16             {
17                 var factory = new ChannelFactory<ICrawlerService>(new NetTcpBinding(), new EndpointAddress(key.ToString()))
18                                  .CreateChannel();
19 
20                 var urls = ConfigurationManager.AppSettings[key.ToString()]
21                                                .Split(new char[] { ';' }, StringSplitOptions.RemoveEmptyEntries)
22                                                .ToList();
23 
24                 factory.AddRange(urls);
25 
26                 //將網址和“WCF分發地址“建立Mapping映射
27                 foreach (var item in urls)
28                     dicMapping.Add(item, key.ToString());
29             }
30 
31             Console.WriteLine("爬蟲 中央處理器開啟,正在監視TODO列表!");
32 
33             //開啟定時監視MongoDB
34             Timer timer = new Timer();
35 
36             timer.Interval = 1000 * 10; //10s輪詢一次
37             timer.Elapsed += new ElapsedEventHandler(timer_Elapsed);
38             timer.Start();
39 
40             Console.Read();
41         }
42 
43         static void timer_Elapsed(object sender, ElapsedEventArgs e)
44         {
45             //獲取mongodb里面的數據
46             MongodbHelper<Message> mongodb = new MongodbHelper<Message>("todo");
47 
48             //根據url的類型分發到相應的服務器中去處理
49             var urls = mongodb.List(100);
50 
51             if (urls == null || urls.Count == 0)
52                 return;
53 
54             foreach (var item in dicMapping.Keys)
55             {
56                 foreach (var url in urls)
57                 {
58                     //尋找正確的 wcf 分發地址
59                     if (url.Url.StartsWith(item))
60                     {
61                         var factory = new ChannelFactory<ICrawlerService>(new NetTcpBinding(),
62                                       new EndpointAddress(dicMapping[item]))
63                                       .CreateChannel();
64 
65                         //向正確的地方分發地址
66                         factory.Add(url.Url);
67 
68                         break;
69                     }
70                 }
71             }
72 
73             //刪除mongodb中的TODO表中指定數據
74             mongodb.Remove(urls);
75         }
76     }
77 }

接下來,我們開啟WCF服務,當然我們可以做10份,20份的copy,核心代碼如下:

 1         /// <summary>
 2         /// 開始執行任務
 3         /// </summary>
 4         public static void Start()
 5         {
 6             while (true)
 7             {
 8                 //監視工作線程,如果某個線程已經跑完數據,則重新分配任務給該線程
 9                 for (int j = 0; j < 10; j++)
10                 {
11                     if (tasks[j] == null || tasks[j].IsCompleted || tasks[j].IsCanceled || tasks[j].IsFaulted)
12                     {
13                         //如果隊列還有數據
14                         if (todoQueue.Count > 0)
15                         {
16                             string currentUrl = string.Empty;
17 
18                             todoQueue.TryDequeue(out currentUrl);
19 
20                             Console.WriteLine("當前隊列的個數為:{0}", todoQueue.Count);
21 
22                             tasks[j] = Task.Factory.StartNew((obj) =>
23                             {
24                                 DownLoad(obj.ToString());
25 
26                             }, currentUrl);
27                         }
28                     }
29                 }
30             }
31         }

然后我們把”分發服務器“和”中央處理器“開啟:

好了,稍等會,我們就會看到,數據已經嘩啦啦的往mongodb里面跑了。

 

五:不足點

    有的時候會出現某些機器非常free,而某些機器非常busy,這時就要實現雙工通訊了,當執行服務器的內存隊列到達

一個量級的時候就應該通知中央處理器,要么減緩對該執行服務器的任務分發,要么將任務分給其他的執行服務器。

最后是工程代碼,有什么好的建議可以提出來,大家可以一起研究研究:ConsoleApplication1.rar


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM