網絡爬蟲中,URL隊列(URL Frontier)的設計與實現


    URL隊列被爬行進程賦予一個URL(或者來自於其他爬行進程的主機分離器)。它維護了一個包含大量URL的隊列,並且每當有爬蟲線程尋找URL的時候,它都會按照某種順序重新排序。以何種順序返回隊列中的URL,需要有兩個方面的考慮。

     第一個要考慮的是具有很高更新頻率的高質量頁面,即頁面的優先級。一個頁面的優先級權值應該是由它的改變頻率和它本身網頁質量(使用一些恰當的質量評估方法)共同決定的。這是很必要的,因為在每次抓取的時候,很多更新頻率很高的頁面都是質量很差的垃圾頁面。

     第二個要考慮的就是禮貌策略:我們必須避免在很短的時間間隔內重復抓取同一個主機。因此,如果URL隊列被設計成簡單的優先級隊列的話,可能會造成對某一主機的大量的訪問請求。就算我們設定對於某台主機,任何時候最多只允許一個線程可以進行爬取,這樣的情況仍然會發生。一個好的想法是在對某一主機進行連續的爬取請求之間插入一段時間間隔,這個空隙的數量級應該大於最近大部分對該主機爬取所花費的時間。

     圖1 展示了一個基於禮貌和權值策略的URL隊列的實現。它的目標是確保(i)每次只有一個連接去訪問一台主機 (ii)連續對同一個主機的訪問請求之間存在幾秒鍾的等待時間(有時robots.txt文件會指定這個時間) (iii)具有高優先級的頁面將會被優先爬取。

     圖1 中有兩個重要的子模塊,前部分的front隊列集合F以及后部分的back隊列集合B。這兩種隊列均是FIFO隊列。Front隊列實現了對權值相關處理,而back隊列實現了對禮貌策略的相關處理。在一條URL被添加到隊列的過程中,它將會先后穿越front和back隊列。首先,權值計算器會給該URL分配一個介於1和F之間的整數權值,該權值是可能基於爬取歷史記錄而得出的,比如參考該URL所指向的頁面內容在最近幾次爬取之間改變的頻率(關於優先級權值的討論是一個很大的話題,不展開討論)。例如,具有很高更新頻率的文檔將會被賦予一個很高的權值。類似的還有某些明確的特定的應用,比如新聞類的頁面可能總是會被賦予很高的權值。在該URL被賦予了權值i之后,它將會被添加到front隊列集合中的第i個隊列。

     每個back隊列需要遵循下面的幾條不變定律:

    (i)當它處於信息采集過程中,必須保證其隊列是非空的。

    (ii)它只包含來自於同一台主機上的URL。一個輔助表T(圖2)被用來維護主機到back隊列之間的映射關系。每當back隊列為空,要被front隊列重新填充的時,T表必須要進行相應的更新。

image

圖1 URL隊列

從爬取頁面上抽取出來的URL會流向圖表中的頂端。爬取線程會從圖示的底部抽取出待爬取的URL。一個URL在整個過程會穿過負責處理權值的front隊列,以及負責處理爬蟲禮貌策略的back隊列。

主機

Back 隊列

standford.edu

23

microsoft.com

47

acm.org

12

圖2 主機到Back隊列對應關系的輔助表格樣例

     此外,我們需要維護一個堆,堆里存放着的條目對應每一個back隊列,該條目記錄着該隊列所對應的主機可以再次被連接的最早時間te。請求獲取URL的爬蟲線程會抽取出堆頂元素,然后一直等到相應te時間。接下來,它會獲取到該堆頂元素所對應的back隊列j的隊首URL u,進而開始進行URL u的抓取。抓取過程完成后,調用線程會檢查隊列j是否為空。如果為空,它會挑選一個front隊列,然后抽取出其隊首URL v。Front 隊列的選擇方法對於更高優先級的隊列來說可能並不公平(通常是一個隨機的過程),但這樣做是確保高優先級的URL可以更快的流入到back隊列中來。接下來,我們會檢查v,判斷v所對應的主機是否已經存在並且已經存有一些URL。如果是這樣的話,v 會被添加到該隊列里,然后重新返回到front隊列,尋找另一個可以插入到空隊列 j 的URL。這個過程會一直持續,直到隊列 j 再次變為非空。同時,該線程會向堆中插入一條包含最早開始時間 te的新條目,這個時間是根據隊列 j 中最新被提取的URL的相關屬性所決定的(比如上次何時進行的連接或是上次爬取花費的時間),之后會繼續執行這個過程。

     Front隊列的數量以及分配權值和挑選隊列的策略共同組成了我們希望植入系統的優先級屬性。Back隊列的數量決定着我們可以維持多少線程處於運行狀態同時又遵守着禮貌性特征。Mercator的設計者提出一個比較粗糙的建議:可以使用數量三倍於爬蟲線程的back隊列。

在大規模下的信息采集過程中,隨着URL隊列的增長,可能會造成節點的可用內存不足(經過實驗,的確是這樣,這個問題也是很棘手的問題)。一個解決方法是讓大多數URL隊列存儲在磁盤上,只將每個隊列中的一部分保存在內存中,當內存中數據不足時,可以從磁盤中讀取更多的數據。

一些關鍵實現細節(linux,c++):

維護一個最小堆,堆的比較元素為爬取時間te,使用的sys/time.h文件下的timeval變量類型,它在linux環境下,可以精確到微妙,但是不會特別精確。

/*
 * minheap.h
 *
 *  Created on: 2012-2-24
 *      Author: xiaojay
 */

#ifndef MINHEAP_H_
#define MINHEAP_H_
#include <sys/time.h>

struct node {
	timeval te;
	int backpos;
public:
	node() //default constructor
	{
		backpos = -1;
	}
};

class minheap 
{
private:
	int maxheapsize;
	int currentsize;
	node * heap;
	//adjust the head from upon downto bottom
	void siftdown(int currentPos , int m);
	//adjust the heap from bottom to upon
	void siftup(int start);
public:
	//build a min heap
	minheap(int maxheapsize);
	~minheap();
	//insert an element
	void insert(timeval te, int backpos);
	//remove an element
	//position of back queue in urlfrontier.h and node.te returns as reference value
	void removemin(int & backpos , timeval & te);
	inline int size()  {return this->currentsize ;}
};
#endif

#include"minheap.h"
#include<assert.h>
//constructor
minheap::minheap(int maxheapsize)
{
	assert(maxheapsize>0);
	this->maxheapsize = maxheapsize;
	this->currentsize = 0;
	heap = new node [maxheapsize];
}
//destructor
minheap::~minheap()
{
	delete [] heap;
}
//adjust the heap from top to bottom
void minheap::siftdown(int currentPos , int m)
{
	int i=currentPos;
	int j=currentPos*2+1;//i's leftChild
	timeval temp=heap[i].te;
	int temppos = heap[i].backpos;
	while(j<=m)
	{
		if(j<m&&timercmp(&heap[j].te,&heap[j+1].te,>)) j++; // j points to minChild
		if(timercmp(&temp,&heap[j].te,<=)) break;
		else 
		{
			heap[i].te=heap[j].te;
			heap[i].backpos = heap[j].backpos;
			i=j;
			j=2*i+1;
		}
	}
	heap[i].te=temp;
	heap[i].backpos = temppos;
}

//adjust the heap from bottom to top
void minheap::siftup(int start)
{
	int i=start,j=(i-1)/2;
	timeval temp=heap[i].te;
	int temppos = heap[i].backpos;
	while(i>0)
	{
		if(timercmp(&heap[j].te,&temp,>))
		{
			heap[i].te=heap[j].te;
			heap[i].backpos = heap[j].backpos;
			i=j;
			j=(i-1)/2;
		}
		else break;
	}
	heap[i].te=temp;
	heap[i].backpos = temppos;
}
//add a node to heap 
void minheap::insert(timeval te , int backpos)
{
	if(currentsize>=maxheapsize)
	{
		return ;
	}
	heap[currentsize].te=te;
	heap[currentsize].backpos = backpos;
	siftup(currentsize);
	currentsize++;
}
//pass value to backpos and te then remove the node 
void minheap::removemin(int & backpos , timeval & te)
{
	assert(currentsize>=0);
	backpos = heap[0].backpos;
	te = heap[0].te;
	heap[0] = heap[currentsize-1];
	currentsize --;
	siftdown(0, currentsize-1);
}

 

urlFrontier 實現:

/*
 * urlfrontier.h
 *
 *  Created on: 2012-3-2
 *      Author: xiaojay
 */

#ifndef URLFRONTIER_H_
#define URLFRONTIER_H_
#include <queue>
#include <map>
#include <string>
#include "url.h"
#include "minheap.h"
#include "../config.h"
#include <time.h>
#include <sys/time.h>

class urlfrontier
{
private:
	/*frontQueue maintains urls which have higher priority 
	  cound route to back queue more quickly.
	frontQueue[i] contains all urls whose priority is i.*/
	queue<url *> * frontQueue;
	/*
	  backQueue[i] contains urls whose have the same hostname 
	*/
	queue<url *> * backQueue;
	int maxF,maxB;//the max number of frontQueue and backQueue
	map<string,int> hostmap;
	minheap * heap;
	//route urls from frontQueue to backQueue
	bool router();
	int size;
public:
	//constructor
	//parameters: nf->maxF, nb->maxB
	urlfrontier (int nf , int nb);
	//destructor
	~urlfrontier();
	//add a url
	void pushurl(url * u) ;
	//get a url
	url * popurl();
	void displayState();
	int getsize(){return this->size;}
};

#endif /* URLFRONTIER_H_ */

#include "urlfrontier.h"

//constructor
urlfrontier::urlfrontier( int nf , int nb)
{
	this->maxF = nf;
	this->maxB = nb;
	this->size = 0;
	frontQueue = new queue<url *>[maxF];
	backQueue  = new queue<url *>[maxB];
	heap = new minheap(nb);
}

//destructor
urlfrontier::~urlfrontier()
{
	delete [] frontQueue;
	delete [] backQueue;
	delete heap;
}

//add url 
void urlfrontier::pushurl(url * u)
{
	if(!u->isValid()) return ;
	int priority = u->getPriority();
	if(priority>=maxF|| priority<0) return ;
	frontQueue[priority].push(u);
	size++;
	//router();
}

//route urls from frontQueue to backQueue
bool urlfrontier ::router()
{
	int pos = maxF;
	int size = 0;
	/*MAX_URLS_ONCE limits the max number of urls moved ,
	to ignore much expenses*/
	int limit = MAX_URLS_ONCE;
	while(pos>0&&limit>0)
	{
		limit--;
		pos--;
		size = frontQueue[pos].size();
		if (size==0) continue;
		while(!frontQueue[pos].empty()&&size>0)
		{
			size--;
			url * u = frontQueue[pos].front();
			if(hostmap.count(u->getHost()))
			{
				int backpos = hostmap[u->getHost()];
				backQueue[backpos].push(u);
				frontQueue[pos].pop();
			}
			else
			{
				//find empty pos ;
				int posB = 0;
				while(posB<maxB&&!backQueue[posB].empty()) posB++;
				if(posB==maxB) 	
				{
					frontQueue[pos].pop();
					frontQueue[pos].push(u);
					continue;
				}
				//update the hostmap
				hostmap.insert(map<string,int>::value_type(u->getHost(),posB));
				backQueue[posB].push(u);
				//update heap
				timeval now ;
				gettimeofday(&now,NULL);
				heap->insert(now,posB);
				frontQueue[pos].pop();
			}
		}
	}
	return true;
}

//get a url
url * urlfrontier::popurl()
{
	if(heap->size()<=0)
	{
		router();
	}
	if(heap->size()<=0) return NULL;
	int backpos;
	timeval te;
	heap->removemin(backpos,te);
	url * u = backQueue[backpos].front();
	u->setTe(te);
	if(u==NULL) return NULL;
	backQueue[backpos].pop();
	if(backQueue[backpos].empty())
	{
		router();//route urls from frontqueue to backqueue
		hostmap.erase(u->getHost());
	}
	else
	{
		//the time between two request is 5 seconds
		te.tv_sec += 5;
		heap->insert(te,backpos);
	}
	size --;
	return u ;
}

參考資料:

(美)Christopher D. Manning, Prabhakar Raghavan, Hinrich Schütze. Introduction to information retrieval: 信息檢索導論[M].北京:人民郵電出版社,2010


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM