我們在第五節中提到一個問題,任務隊列增長速度太快,與之對應的采集、分析、處理速度遠遠跟不上,造成內存快速增長,帶寬占用過高,CPU使用率過高,這樣是極度有害系統健康的。
我們在開發采集程序的時候,總是希望能夠盡快將數據爬取下來,如果總任務數量很小(2~3K請求數之內),總耗費時長很短(1~2分鍾之內),那么,對系統的正常運行不會造成太嚴重的影響,我們盡可以肆無忌憚。但,當總任務數量更多,總耗費時長更長,那么,無休止的任務堆積,就會給系統帶來難以預料甚至是很嚴重的后果。
為此,我們不得不考慮幾個問題:
- 我們的任務總量大概在什么量級,全速采集大概需要耗費多少時間、多少資源,未來的發展是不是可控?
- 采集系統自身依托的環境資源是否充足,是否能夠滿足隨之而來的巨大的資源消耗?
- 采集的目標資源系統是否具有某些反爬策略限制?
- 采集的目標資源系統是否能夠承受得住如此數量級的並發采集請求(無論單點或分布式采集系統,都要考慮這點)?
- 隨着采集結果返回,帶來的后續分析、處理、存儲能力是否能夠滿足大量數據的瞬時到來?
由以上問題也可以看出,一個爬蟲系統策略的制定,需要考慮的問題也是全方位的,而不僅僅是采集本身,不同的環境、規模、目標,采用的策略也不盡相同。本節,我們將討論一下,如果我們的能力不能滿足上述條件的情況下,如何來制定一個並發策略以及如何實現它。
並發策略,從規模上可以分為全局並發策略和單點並發策略,全局並發策略包含單點並發策略,不過它也需要同時考慮負載均衡策略對制定並發策略的影響。目前,我們還沒有將爬蟲框架擴展到分布式框架,暫時先不考慮全局並發策略的制定。主要探討一下單點並發策略制定與實現。
單點並發策略的制定:
通常,我們在制定單點並發策略時,需要從哪些角度考慮,使用什么方法,以及如何決策?下面我們就來詳細聊聊:)
1、我們先來梳理一下采集系統自身所依托的環境資源:
除了CPU、內存、存儲器、帶寬這些耳熟能詳的資源外,還有就是比較容易被忽略的操作系統可用端口。對於各種資源的占用情況,下面給出一些建議(均值):
- CPU:采集系統的占用總量建議不超過30%,CPU總使用量建議不超過50%。(雖然我這個瘋子經常貪婪過渡T_T)。對於多核CPU,線程創建數量建議不超過CPU核數的兩倍。
- 內存:采集系統的占用總量建議不超過50%,內存總使用量建議不超過70%。
- 存儲器:對於商業或者大規模的爬蟲體系,建議將存儲分離,使用外部存儲設備,比如NAS、分布式緩存、數據倉庫等;當然,其他爬蟲體系也這么建議,但如果條件不允許的話,只能存儲在本地磁盤的話,就需要考慮磁盤的IOPS了,即使是使用緩存、數據庫系統來作為中間存儲媒介,實質上也是與磁盤IO打交道,不過一般的緩存、數據庫系統都會對IO做優化,而且能干預的力度比較小,倒是可以略微“省心”。這個,本人也無法給出一個合理的通用的建議值,磁盤的性能千奇百怪,只能是按實際環境來拿捏了。
- 帶寬:分為上行、下行兩個帶寬指標,采集系統在這兩個指標中的占用總量都不建議超過80%。除了考慮ISP分配的帶寬,還要考慮會影響其效能的周邊設備,比如貓、交換機、路由器甚至是網線的吞吐能力。說來尷尬,我經常在家里做實驗,爬蟲系統和目標資源系統都還OK,聯通的光貓跪了……重啟復活……又跪了……重啟復活……又跪了……重啟復活……
- 可用端口:這個是一個隱性條件,也是經常被忽略的限制。拿Windows系統來說,可用的端口最大數量為UInt16.MaxValue(65535)個,而伴隨着系統啟動,就會有一系列的服務占用了部分端口,比如IIS中的網站、數據庫、QQ,而系統本身也會保留一部分端口,比如443、3389等。而是否能夠使用端口重用技術來緩解疼痛,對具體實現以及NAS端口映射規則的要求更高,不好或不可控。所以爬蟲本身能夠使用的端口數就有一個極限限制,這個也沒有建議值,具體情況各不相同。
總之,資源總是有限的,大體原則就是:做人留一線,日后好相見:)
2、對於目標資源系統的資源環境:
通常,我們無法探知具體的資源情況,再加上對方可能使用反爬策略,就是知道具體的資源情況,也不見得就有用。對於制定並發策略,我們更關心的是對方能夠吃的下多大的鴨梨,以及探索其反爬策略允許的極限。為此,我們需要使用下述的方法,來輔助我們制定策略。
3、通常使用的方法:
3.1、需要找到目標資源系統中的一個URI,原則是輕量、成功率高(最好是100%),比如,一張小小的圖片、一個簡單短小的ajax接口、一個靜態html甚至是一個xxx.min.css,但要注意,我們選取的URI可千萬不要是經過CDN加速的,否則T_T;
3.2、接下來,我們就針對選取的URI進行周期采集,對於一般的資源站點,初始頻率設置為1秒1次,就可以了;
3.3、然后就是運行一段時間觀察結果,后面我們再說運行多長時間觀察為合適;
3.4、如果觀察結果OK,成功率能夠達到95%+,那么,我們就可以適量縮小采集周期,反之,就要適當延長采集周期;
3.5、重復3.3~3.4,最后得到一個合理的極限周期;
3.6、至於一次觀察多長時間,不同的反爬策略,有着不同的限制,這個需要小心。我曾經的一個項目,當時就比較心急,觀測了5分鍾,沒什么問題,就丟出去了,結果后來現實告訴我,他們的策略是1分鍾累計限制、10分鍾累計限制、20分鍾累計限制、30分鍾累計限制、1小時累計限制……而且累計限制逐級遞減,也就是說,你滿足了1分鍾的累計限制,x10,就不一定滿足10分鍾的累計限制,x60就有可能遠遠超出了1小時累計限制。這里給出一個建議,至少30分鍾。因為目標系統去統計每一個來源IP的訪問周期,也是一個不小的代價,所以也不可能做到無限期的監測,通常半小時到一小時已經是極限了。這里也給出一個最保險的觀測周期,那就是根據請求總量及當前頻率,預估耗費總時長,作為觀測周期,這樣是最穩妥的,但,這也可能是不切實際的:(
4、如何制定並發策略:
通過上述3步,結合自身的資源情況、目標的反爬策略及承受能力、以及觀測結果,我們就可以制定一個大概的並發量了,制定決策也就不那么困難了;
我們的任務都是存儲在隊列中,並發的限制,無非就是控制入隊的頻率,所以,只需要把前面的統計結果轉化為最小請求間隔,就是我們最終的並發策略了;
為什么是控制入隊,而不是出隊呢?因為如果不控制入隊,那么隊列還是會無限暴增,直至“死亡”,而限制入隊,一方面避免隊列暴增,另一方面,阻塞新任務的生成,降低CPU及內存使用量;
單點並發策略的實現:
有了理論基礎,在技術實現上,就不是什么難事兒了。

1 namespace MikeWare.Core.Components.CrawlerFramework.Policies 2 { 3 using System; 4 5 public abstract class AConcurrentPolicy 6 { 7 public virtual bool WaitOne(TimeSpan timeout) => throw new NotImplementedException(); 8 9 public virtual void ReleaseOne() => throw new NotImplementedException(); 10 } 11 }
這是一個抽象類,具有兩個抽象方法,作為並發策略的基礎實現;
我寫了兩種並發策略的具體實現,PeriodConcurrentPolicy和SemaphoreConcurrentPolicy,他們的目的都是用來控制入隊的頻率,目標一致,方法不同,您也可以實現自己的並發策略;
本節,我們主要說道說道System.Threading.Semaphore的使用及SemaphoreConcurrentPolicy的實現原理;

1 namespace MikeWare.Core.Components.CrawlerFramework.Policies 2 { 3 using System; 4 using System.Threading; 5 6 public class SemaphoreConcurrentPolicy : AConcurrentPolicy 7 { 8 private Semaphore semaphore = null; 9 10 public SemaphoreConcurrentPolicy(int init, int max) 11 { 12 semaphore = new Semaphore(init, max); 13 } 14 15 public override bool WaitOne(TimeSpan timeout) 16 { 17 return semaphore.WaitOne(timeout); 18 } 19 20 public override void ReleaseOne() 21 { 22 semaphore.Release(1); 23 } 24 } 25 }
SemaphoreConcurrentPolicy繼承自AConcurrentPolicy,定義了一個私有變量Semaphore semaphore,以及重寫了基類的兩個抽象方法;

namespace System.Threading { // // Summary: // Limits the number of threads that can access a resource or pool of resources // concurrently. public sealed class Semaphore : WaitHandle { // // Summary: // Initializes a new instance of the System.Threading.Semaphore class, specifying // the initial number of entries and the maximum number of concurrent entries. // // Parameters: // initialCount: // The initial number of requests for the semaphore that can be granted concurrently. // // maximumCount: // The maximum number of requests for the semaphore that can be granted concurrently. // // Exceptions: // T:System.ArgumentException: // initialCount is greater than maximumCount. // // T:System.ArgumentOutOfRangeException: // maximumCount is less than 1. -or- initialCount is less than 0. public Semaphore(int initialCount, int maximumCount); // // Summary: // Initializes a new instance of the System.Threading.Semaphore class, specifying // the initial number of entries and the maximum number of concurrent entries, and // optionally specifying the name of a system semaphore object. // // Parameters: // initialCount: // The initial number of requests for the semaphore that can be granted concurrently. // // maximumCount: // The maximum number of requests for the semaphore that can be granted concurrently. // // name: // The name of a named system semaphore object. // // Exceptions: // T:System.ArgumentException: // initialCount is greater than maximumCount. -or- name is longer than 260 characters. // // T:System.ArgumentOutOfRangeException: // maximumCount is less than 1. -or- initialCount is less than 0. // // T:System.IO.IOException: // A Win32 error occurred. // // T:System.UnauthorizedAccessException: // The named semaphore exists and has access control security, and the user does // not have System.Security.AccessControl.SemaphoreRights.FullControl. // // T:System.Threading.WaitHandleCannotBeOpenedException: // The named semaphore cannot be created, perhaps because a wait handle of a different // type has the same name. public Semaphore(int initialCount, int maximumCount, string name); // // Summary: // Initializes a new instance of the System.Threading.Semaphore class, specifying // the initial number of entries and the maximum number of concurrent entries, optionally // specifying the name of a system semaphore object, and specifying a variable that // receives a value indicating whether a new system semaphore was created. // // Parameters: // initialCount: // The initial number of requests for the semaphore that can be satisfied concurrently. // // maximumCount: // The maximum number of requests for the semaphore that can be satisfied concurrently. // // name: // The name of a named system semaphore object. // // createdNew: // When this method returns, contains true if a local semaphore was created (that // is, if name is null or an empty string) or if the specified named system semaphore // was created; false if the specified named system semaphore already existed. This // parameter is passed uninitialized. // // Exceptions: // T:System.ArgumentException: // initialCount is greater than maximumCount. -or- name is longer than 260 characters. // // T:System.ArgumentOutOfRangeException: // maximumCount is less than 1. -or- initialCount is less than 0. // // T:System.IO.IOException: // A Win32 error occurred. // // T:System.UnauthorizedAccessException: // The named semaphore exists and has access control security, and the user does // not have System.Security.AccessControl.SemaphoreRights.FullControl. // // T:System.Threading.WaitHandleCannotBeOpenedException: // The named semaphore cannot be created, perhaps because a wait handle of a different // type has the same name. public Semaphore(int initialCount, int maximumCount, string name, out bool createdNew); // // Summary: // Opens the specified named semaphore, if it already exists. // // Parameters: // name: // The name of the system semaphore to open. // // Returns: // An object that represents the named system semaphore. // // Exceptions: // T:System.ArgumentException: // name is an empty string. -or- name is longer than 260 characters. // // T:System.ArgumentNullException: // name is null. // // T:System.Threading.WaitHandleCannotBeOpenedException: // The named semaphore does not exist. // // T:System.IO.IOException: // A Win32 error occurred. // // T:System.UnauthorizedAccessException: // The named semaphore exists, but the user does not have the security access required // to use it. public static Semaphore OpenExisting(string name); // // Summary: // Opens the specified named semaphore, if it already exists, and returns a value // that indicates whether the operation succeeded. // // Parameters: // name: // The name of the system semaphore to open. // // result: // When this method returns, contains a System.Threading.Semaphore object that represents // the named semaphore if the call succeeded, or null if the call failed. This parameter // is treated as uninitialized. // // Returns: // true if the named semaphore was opened successfully; otherwise, false. // // Exceptions: // T:System.ArgumentException: // name is an empty string. -or- name is longer than 260 characters. // // T:System.ArgumentNullException: // name is null. // // T:System.IO.IOException: // A Win32 error occurred. // // T:System.UnauthorizedAccessException: // The named semaphore exists, but the user does not have the security access required // to use it. public static bool TryOpenExisting(string name, out Semaphore result); // // Summary: // Exits the semaphore and returns the previous count. // // Returns: // The count on the semaphore before the System.Threading.Semaphore.Release* method // was called. // // Exceptions: // T:System.Threading.SemaphoreFullException: // The semaphore count is already at the maximum value. // // T:System.IO.IOException: // A Win32 error occurred with a named semaphore. // // T:System.UnauthorizedAccessException: // The current semaphore represents a named system semaphore, but the user does // not have System.Security.AccessControl.SemaphoreRights.Modify. -or- The current // semaphore represents a named system semaphore, but it was not opened with System.Security.AccessControl.SemaphoreRights.Modify. public int Release(); // // Summary: // Exits the semaphore a specified number of times and returns the previous count. // // Parameters: // releaseCount: // The number of times to exit the semaphore. // // Returns: // The count on the semaphore before the System.Threading.Semaphore.Release* method // was called. // // Exceptions: // T:System.ArgumentOutOfRangeException: // releaseCount is less than 1. // // T:System.Threading.SemaphoreFullException: // The semaphore count is already at the maximum value. // // T:System.IO.IOException: // A Win32 error occurred with a named semaphore. // // T:System.UnauthorizedAccessException: // The current semaphore represents a named system semaphore, but the user does // not have System.Security.AccessControl.SemaphoreRights.Modify rights. -or- The // current semaphore represents a named system semaphore, but it was not opened // with System.Security.AccessControl.SemaphoreRights.Modify rights. public int Release(int releaseCount); } }
看它的summary,我們大體了解這個類就是專門用來做並發限制的,它具有三個構造函數,我們最關心的,就是其中兩個參數int initialCount, int maximumCount及其涵義;
initialCount:能夠被Semaphore 授予的數量的初始值;
maximumCount:能夠被Semaphore 授予的最大值;
字面意思可能不太好理解,我們來把官宣翻譯成普通話:)
舉個栗子,我們把Semaphore看成是一個用來裝鑰匙的盒子,每一個想要進入隊列這道“門”的任務,都需要先從盒子里取一把鑰匙,才能進入;initialCount,就是說,這個盒子,一開始的時候,放幾把鑰匙,但是進入隊列的任務,時時不肯出來,不歸還鑰匙,無鑰匙可用,這時管理員就決定再多配一些鑰匙,以備用,於是,一些新鑰匙又被放入盒子里,但盒子的容積有限,一共能容納多少把鑰匙,就是maximumCount了。
當然,我們常見的情況是構造盒子的時候,initialCount == maximumCount,特殊場景下,會設置不相同,這個視具體業務而定。然而,maximumCount不能小於initialCount,initialCount不能小於0,這個是硬性的。
這樣是不是initialCount 和 maximumCount就很容易理解了。
同時,Semaphore 還有非常重要的方法(Release)方法,再把上面的栗子舉起來說話,Release就是歸還鑰匙,任務結束了,那么就出門還鑰匙,然后其它在門口等待的任務就可以領到鑰匙進門了:)
再者,Semaphore 繼承自System.Threading.WaitHandle,於是乎,它就具有了一系列Wait方法,當有新任務來領鑰匙,一看,盒子空了,那怎么辦呢,等吧,但是等多久呢,是一直等下去還是等一個超時時間,這就看業務邏輯了。
在我的SemaphoreConcurrentPolicy實現里,會提供一個超時時間,爬蟲螞蟻小隊長會判斷,如果沒拿到鑰匙,就會再次回來嘗試取鑰匙。
OK,接下來,就是對我們的螞蟻小隊長進行改造了:

1 namespace MikeWare.Core.Components.CrawlerFramework 2 { 3 using MikeWare.Core.Components.CrawlerFramework.Policies; 4 using System; 5 using System.Collections.Concurrent; 6 using System.Threading; 7 using System.Threading.Tasks; 8 9 public class LeaderAnt : Ant 10 { 11 private ConcurrentQueue<JobContext> Queue; 12 private ManualResetEvent mre = new ManualResetEvent(false); 13 public AConcurrentPolicy EnqueuePolicy { get; set; } 14 15 …… 16 17 public void Enqueue(JobContext context) 18 { 19 if (null != EnqueuePolicy) 20 { 21 while (!EnqueuePolicy.WaitOne(TimeSpan.FromMilliseconds(3)) && !mre.WaitOne(1)) 22 continue; 23 } 24 25 Queue.Enqueue(context); 26 } 27 28 …… 29 }
主要是在入隊的時候,增加了拿鑰匙的環節;

1 namespace MikeWare.Crawlers.EBooks.Bizs 2 { 3 using MikeWare.Core.Components.CrawlerFramework; 4 using MikeWare.Core.Components.CrawlerFramework.Policies; 5 using MikeWare.Crawlers.EBooks.Entities; 6 using System; 7 using System.Collections.Generic; 8 using System.Net; 9 10 public class EBooksCrawler 11 { 12 public static void Start(int pageIndex, DateTime lastUpdateTime) 13 { 14 var leader = new LeaderAnt() 15 { 16 EnqueuePolicy = new SemaphoreConcurrentPolicy(100, 100) 17 //EnqueuePolicy = new PeriodEnqueuePolicy(TimeSpan.FromMilliseconds(150)) 18 }; 19 20 var newContext = new JobContext 21 { 22 JobName = $"奇書網-最新電子書-列表-第{pageIndex.ToString("00000")}頁", 23 Uri = $"http://www.xqishuta.com/s/new/index_{pageIndex}.html", 24 Method = WebRequestMethods.Http.Get, 25 InParams = new Dictionary<string, object>(), 26 Analizer = new BooksListAnalizer(), 27 }; 28 newContext.InParams.Add(Consts.PAGE_INDEX, 1); 29 newContext.InParams.Add(Consts.LAST_UPDATE_TIME, DateTime.MinValue); 30 31 leader.Enqueue(newContext); 32 33 leader.Work(); 34 } 35 } 36 }
主要是在構造LeaderAnt的時候,為其指定了我們要使用的策略;
同時需要注意的是,這個SemaphoreConcurrentPolicy並發策略的實現,並沒有規定入隊的時間間隔,而是控制了最大的隊列長度,所以,並發的頻率可能高,可能低,這個策略可以用來制衡資源的使用情況。關於入隊時間間隔,可以使用PeriodConcurrentPolicy或自己實現策略來控制;
另一個策略的實現,我們就不在這里細說了。有興趣的同學可以看看源碼。
好了,本節的內容就這么多吧,相信大家對並發策略的制定與實現,都有了各自的理解。
后續章節同樣精彩,敬請期待……
喜歡本系列叢書的朋友,可以點擊鏈接加入QQ交流群(994761602)【C# 破境之道】
方便各位在有疑問的時候可以及時給我個反饋。同時,也算是給各位志同道合的朋友提供一個交流的平台。
需要源碼的童鞋,也可以在群文件中獲取最新源代碼。