Python 多線程抓取網頁


   最近,一直在做網絡爬蟲相關的東西。 看了一下開源C++寫的larbin爬蟲,仔細閱讀了里面的設計思想和一些關鍵技術的實現。

1、larbin的URL去重用的很高效的bloom filter算法;
2、DNS處理,使用的adns異步的開源組件;
3、對於url隊列的處理,則是用部分緩存到內存,部分寫入文件的策略。
4、larbin對文件的相關操作做了很多工作
5、在larbin里有連接池,通過創建套接字,向目標站點發送HTTP協議中GET方法,獲取內容,再解析header之類的東西
6、大量描述字,通過poll方法進行I/O復用,很高效
7、larbin可配置性很強
8、作者所使用的大量數據結構都是自己從最底層寫起的,基本沒用STL之類的東西
......
還有很多,以后有時間在好好寫篇文章,總結下。

   這兩天,用python寫了個多線程下載頁面的程序,對於I/O密集的應用而言,多線程顯然是個很好的解決方案。剛剛寫過的線程池,也正好可以利用上了。其實用python爬取頁面非常簡單,有個urllib2的模塊,使用起來很方便,基本兩三行代碼就可以搞定。雖然使用第三方模塊,可以很方便的解決問題,但是對個人的技術積累而言沒有什么好處,因為關鍵的算法都是別人實現的,而不是你自己實現的,很多細節的東西,你根本就無法了解。 我們做技術的,不能一味的只是用別人寫好的模塊或是api,要自己動手實現,才能讓自己學習得更多。

  我決定從socket寫起,也是去封裝GET協議,解析header,而且還可以把DNS的解析過程單獨處理,例如DNS緩存一下,所以這樣自己寫的話,可控性更強,更有利於擴展。對於timeout的處理,我用的全局的5秒鍾的超時處理,對於重定位(301or302)的處理是,最多重定位3次,因為之前測試過程中,發現很多站點的重定位又定位到自己,這樣就無限循環了,所以設置了上限。具體原理,比較簡單,直接看代碼就好了。

   自己寫完之后,與urllib2進行了下性能對比,自己寫的效率還是比較高的,而且urllib2的錯誤率稍高一些,不知道為什么。網上有人說urllib2在多線程背景下有些小問題,具體我也不是特別清楚。

先貼代碼:

fetchPage.py  使用Http協議的Get方法,進行頁面下載,並存儲為文件

'''
Created on 2012-3-13
Get Page using GET method
Default using HTTP Protocol , http port 80
@author: xiaojay
'''
import socket
import statistics
import datetime
import threading

socket.setdefaulttimeout(statistics.timeout)

class Error404(Exception):
    '''Can not find the page.'''
    pass

class ErrorOther(Exception):
    '''Some other exception'''
    def __init__(self,code):
        #print 'Code :',code
        pass
class ErrorTryTooManyTimes(Exception):
    '''try too many times'''
    pass

def downPage(hostname ,filename , trytimes=0):
    try :
        #To avoid too many tries .Try times can not be more than max_try_times
        if trytimes >= statistics.max_try_times : 
            raise ErrorTryTooManyTimes
    except ErrorTryTooManyTimes :
        return statistics.RESULTTRYTOOMANY,hostname+filename
    try:
        s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 
        #DNS cache
        if statistics.DNSCache.has_key(hostname):
            addr = statistics.DNSCache[hostname]
        else:
            addr = socket.gethostbyname(hostname)
            statistics.DNSCache[hostname] = addr
        #connect to http server ,default port 80
        s.connect((addr,80))
        msg  = 'GET '+filename+' HTTP/1.0\r\n'
        msg += 'Host: '+hostname+'\r\n'
        msg += 'User-Agent:xiaojay\r\n\r\n'
        code = '' 
        f = None
        s.sendall(msg)
        first = True
        while True:
            msg = s.recv(40960)
            if not len(msg):
                if f!=None:
                    f.flush()
                    f.close()
                break
            # Head information must be in the first recv buffer
            if first:
                first = False                
                headpos = msg.index("\r\n\r\n")
                code,other = dealwithHead(msg[:headpos])
                if code=='200':
                    #statistics.fetched_url += 1
                    f = open('pages/'+str(abs(hash(hostname+filename))),'w')
                    f.writelines(msg[headpos+4:])
                elif code=='301' or code=='302':
                    #if code is 301 or 302 , try down again using redirect location
                    if other.startswith("http") :                
                        hname, fname = parse(other)
                        downPage(hname,fname,trytimes+1)#try again
                    else :
                        downPage(hostname,other,trytimes+1)
                elif code=='404':
                    raise Error404
                else : 
                    raise ErrorOther(code)
            else:
                if f!=None :f.writelines(msg)
        s.shutdown(socket.SHUT_RDWR)
        s.close()
        return statistics.RESULTFETCHED,hostname+filename
    except Error404 :
        return statistics.RESULTCANNOTFIND,hostname+filename
    except ErrorOther:
        return statistics.RESULTOTHER,hostname+filename
    except socket.timeout:
        return statistics.RESULTTIMEOUT,hostname+filename
    except Exception, e:
        return statistics.RESULTOTHER,hostname+filename
def dealwithHead(head):
    '''deal with HTTP HEAD'''
    lines = head.splitlines()
    fstline = lines[0]
    code =fstline.split()[1]
    if code == '404' : return (code,None)
    if code == '200' : return (code,None)
    if code == '301' or code == '302' : 
        for line in lines[1:]:
            p = line.index(':')
            key = line[:p]
            if key=='Location' :
                return (code,line[p+2:])
    return (code,None)
    
def parse(url):
    '''Parse a url to hostname+filename'''
    try:
        u = url.strip().strip('\n').strip('\r').strip('\t')
        if u.startswith('http://') :
            u = u[7:]
        elif u.startswith('https://'):
            u = u[8:]
        if u.find(':80')>0 :
            p = u.index(':80')
            p2 = p + 3
        else:
            if u.find('/')>0:
                p = u.index('/') 
                p2 = p
            else:
                p = len(u)
                p2 = -1
        hostname = u[:p]
        if p2>0 :
            filename = u[p2:]
        else : filename = '/'
        return hostname, filename
    except Exception ,e:
        print "Parse wrong : " , url
        print e

def PrintDNSCache():
    '''print DNS dict'''
    n = 1
    for hostname in statistics.DNSCache.keys():
        print n,'\t',hostname, '\t',statistics.DNSCache[hostname]
        n+=1

def dealwithResult(res,url):
    '''Deal with the result of downPage'''
    statistics.total_url+=1
    if res==statistics.RESULTFETCHED :
        statistics.fetched_url+=1
        print statistics.total_url , '\t fetched :', url
    if res==statistics.RESULTCANNOTFIND :
        statistics.failed_url+=1
        print "Error 404 at : ", url
    if res==statistics.RESULTOTHER :
        statistics.other_url +=1
        print "Error Undefined at : ", url
    if res==statistics.RESULTTIMEOUT :
        statistics.timeout_url +=1
        print "Timeout ",url
    if res==statistics.RESULTTRYTOOMANY:
        statistics.trytoomany_url+=1
        print e ,"Try too many times at", url

if __name__=='__main__':    
    print  'Get Page using GET method'
    

下面,我將利用上一篇的線程池作為輔助,實現多線程下的並行爬取,並用上面自己寫的下載頁面的方法和urllib2進行一下性能對比。

'''
Created on 2012-3-16
@author: xiaojay
'''
import fetchPage
import threadpool
import datetime
import statistics
import urllib2


'''one thread'''
def usingOneThread(limit):
    urlset = open("input.txt","r")
    start = datetime.datetime.now()
    for u in urlset:
        if limit <= 0 : break
        limit-=1
        hostname , filename = parse(u)
        res= fetchPage.downPage(hostname,filename,0)
        fetchPage.dealwithResult(res)
    end = datetime.datetime.now()
    print "Start at :\t" , start
    print "End at :\t" , end
    print "Total Cost :\t" , end - start
    print 'Total fetched :', statistics.fetched_url
    

'''threadpoll and GET method'''
def callbackfunc(request,result):
    fetchPage.dealwithResult(result[0],result[1])

def usingThreadpool(limit,num_thread):
    urlset = open("input.txt","r")
    start = datetime.datetime.now()
    main = threadpool.ThreadPool(num_thread)
    for url in urlset :
        try :
            hostname , filename = fetchPage.parse(url)
            req = threadpool.WorkRequest(fetchPage.downPage,args=[hostname,filename],kwds={},callback=callbackfunc)
            main.putRequest(req)
        except Exception:
            print Exception.message        
    while True:
        try:
            main.poll()
            if statistics.total_url >= limit : break
        except threadpool.NoResultsPending:
            print "no pending results"
            break
        except Exception ,e:
            print e
    end = datetime.datetime.now()
    print "Start at :\t" , start    
    print "End at :\t" , end
    print "Total Cost :\t" , end - start
    print 'Total url :',statistics.total_url
    print 'Total fetched :', statistics.fetched_url
    print 'Lost url :', statistics.total_url - statistics.fetched_url
    print 'Error 404 :' ,statistics.failed_url
    print 'Error timeout :',statistics.timeout_url
    print 'Error Try too many times ' ,statistics.trytoomany_url
    print 'Error Other faults ',statistics.other_url
    main.stop()

'''threadpool and urllib2 '''
def downPageUsingUrlib2(url):
    try:
        req = urllib2.Request(url)
        fd = urllib2.urlopen(req)
        f = open("pages3/"+str(abs(hash(url))),'w')
        f.write(fd.read())
        f.flush()
        f.close()
        return url ,'success'
    except Exception:
        return url , None
    
def writeFile(request,result):
    statistics.total_url += 1
    if result[1]!=None :
        statistics.fetched_url += 1
        print statistics.total_url,'\tfetched :', result[0],
    else:
        statistics.failed_url += 1
        print statistics.total_url,'\tLost :',result[0],

def usingThreadpoolUrllib2(limit,num_thread):
    urlset = open("input.txt","r")
    start = datetime.datetime.now()   
    main = threadpool.ThreadPool(num_thread)    
    
    for url in urlset :
        try :
            req = threadpool.WorkRequest(downPageUsingUrlib2,args=[url],kwds={},callback=writeFile)
            main.putRequest(req)
        except Exception ,e:
            print e
        
    while True:
        try:
            main.poll()
            if statistics.total_url  >= limit : break
        except threadpool.NoResultsPending:
            print "no pending results"
            break
        except Exception ,e:
            print e 
    end = datetime.datetime.now()    
    print "Start at :\t" , start 
    print "End at :\t" , end
    print "Total Cost :\t" , end - start
    print 'Total url :',statistics.total_url
    print 'Total fetched :', statistics.fetched_url
    print 'Lost url :', statistics.total_url - statistics.fetched_url
    main.stop()

if __name__ =='__main__':
    '''too slow'''
    #usingOneThread(100)
    '''use Get method'''
    #usingThreadpool(3000,50)
    '''use urllib2'''
    usingThreadpoolUrllib2(3000,50)

 

實驗分析:

實驗數據:larbin抓取下來的3000條url,經過Mercator隊列模型(我用c++實現的,以后有機會發個blog)處理后的url集合,具有隨機和代表性。使用50個線程的線程池。
實驗環境:ubuntu10.04,網絡較好,python2.6
存儲:小文件,每個頁面,一個文件進行存儲
PS:由於學校上網是按流量收費的,做網絡爬蟲,灰常費流量啊!!!過幾天,可能會做個大規模url下載的實驗,用個幾十萬的url試試。

實驗結果:

使用urllib2 ,usingThreadpoolUrllib2(3000,50)

Start at :    2012-03-16 22:18:20.956054
End at :    2012-03-16 22:22:15.203018
Total Cost :    0:03:54.246964
Total url : 3001
Total fetched : 2442
Lost url : 559
下載頁面的物理存儲大小:84088kb

使用自己的getPageUsingGet ,usingThreadpool(3000,50)

Start at :    2012-03-16 22:23:40.206730
End at :    2012-03-16 22:26:26.843563
Total Cost :    0:02:46.636833
Total url : 3002
Total fetched : 2484
Lost url : 518
Error 404 : 94
Error timeout : 312
Error Try too many times  0
Error Other faults  112
下載頁面的物理存儲大小:87168kb

小結: 自己寫的下載頁面程序,效率還是很不錯的,而且丟失的頁面也較少。但其實自己考慮一下,還是有很多地方可以優化的,比如文件過於分散,過多的小文件創建和釋放定會產生不小的性能開銷,而且程序里用的是hash命名,也會產生很多的計算,如果有好的策略,其實這些開銷都是可以省略的。另外DNS,也可以不使用python自帶的DNS解析,因為默認的DNS解析都是同步的操作,而DNS解析一般比較耗時,可以采取多線程的異步的方式進行,再加以適當的DNS緩存很大程度上可以提高效率。不僅如此,在實際的頁面抓取過程中,會有大量的url ,不可能一次性把它們存入內存,而應該按照一定的策略或是算法進行合理的分配。 總之,采集頁面要做的東西以及可以優化的東西,還有很多很多。

附件下載:程序代碼(水平有限,僅供參考)

略改進版:minicrawler(里面的beautifulSoup需要改成3.x版本的)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM