python爬蟲之多線程、多進程+代碼示例


python爬蟲之多線程、多進程

使用多進程、多線程編寫爬蟲的代碼能有效的提高爬蟲爬取目標網站的效率。

一、什么是進程和線程

引用廖雪峰的官方網站關於進程和線程的講解:

進程:對於操作系統來說,一個任務就是一個進程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器進程,打開一個記事本就啟動了一個記事本進程,打開兩個記事本就啟動了兩個記事本進程,打開一個Word就啟動了一個Word進程。

線程:有些進程還不止同時干一件事,比如Word,它可以同時進行打字、拼寫檢查、打印等事情。在一個進程內部,要同時干多件事,就需要同時運行多個“子任務”,我們把進程內的這些“子任務”稱為線程(Thread)。

每個進程至少要做一件事,所以,一個進程至少有一個線程。

二、多進程

實現多進程的四種方式

os.fork()

python 的 os 模塊封裝了常見的系統調用,其中,多進程的調用就是 fork() 函數。具體示例代碼如下:

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
"""
fork()
1.只有在Unix系統中有效,Windows系統中無效
2.fork函數調用一次,返回兩次:在父進程中返回值為子進程id,在子進程中返回值為0
"""
import os

pid = os.fork()
if pid == 0:
    print("執行子進程,子進程pid={pid},父進程ppid={ppid}".format(pid=os.getpid(), ppid=os.getppid()))
else:
    print("執行父進程,子進程pid={pid},父進程ppid={ppid}".format(pid=pid, ppid=os.getpid()))

# 執行父進程,子進程pid=611,父進程ppid=610
# 執行子進程,子進程pid=611,父進程ppid=610

Process 類

通過 Multiprocessing 模塊中的 Process 類,創建Process對象。

Process類的構造方法:

init(self, group=None, targent=None, name=None, args=(), kwargs={})

參數 說明
group 進程所屬組,基本不用。
targent 表示調用對象,一般為函數。
args 表示調用對象參數元祖。
name 進程別名。
kwargs 表示調用對象的字典。

具體示例代碼如下:

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process


def run_process(name):
    print(name)


if __name__ == "__main__":
    p = Process(target=run_process, args=("test",))
    p.start()
    p.join()
    print("子進程結束")

# test
# 子進程結束

繼承 Process 類

通過繼承Process類,重寫 run 方法。使用 .start() 方法,會自動調用 run 方法。具體示例代碼如下:

from multiprocessing import Process


class NewProcess(Process):
    def __init__(self, n):
        super(NewProcess, self).__init__()
        self.n = n

    def run(self):
        print(self.n)


if __name__ == "__main__":
    test = "test"
    p = NewProcess(test)
    p.start()
    p.join()
    print("子進程結束")

# test
# 子進程結束

進程池 Pool 類

Pool 類可以提供指定數量(一般為CPU的核數)的進程供用戶調用,當有新的請求提交的 Pool 中時,如果池中還沒有滿,就會創建一個新的進程來執行這些請求。如果池滿,請求就會告知先等待。直到池中有進程結束,才會創建新的進程來執行這些請求。

注意:進程池中的進程是不能共享隊列和數據的,而 Process 生成的子進程可以共享隊列。

Pool 類中的常用方法:

函數 函數原型 說明
apply() apply(func[, args=()[, kwds={}]]) 該函數用於傳遞不定參數,主進程會被阻塞直到函數執行結束(不建議使用,並且3.x以后不再出現)。
apply_async() apply_async(func[, args()[, kwds{}[, callback=None]]]) 與apply用法一樣,但它是非阻塞且支持結果返回進行回調。
map() map(func, utterable[, chunksize=None]) Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到返回結果。第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。
close() 關閉進程池(Pool),使其不能再添加新的Process。
terminate() 結束工作進程,不再處理未處理的任務。
join() 主進程阻塞等待子進程的退出,join方法必須在close或terminate之后使用。

具體代碼如下:

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import time
from multiprocessing import Pool


def run(num):
    time.sleep(1)
    return num * num


if __name__ == "__main__":
    testList = [1, 2, 3, 4, 5, 6, 7]
    print('單進程執行')  # 順序執行
    t1 = time.time()
    for i in testList:
        run(i)
    t2 = time.time()
    print('順序執行的時間為:', int(t2 - t1))

    print('多進程 map 執行')  # 並行執行
    p = Pool(4)  # 創建擁有4個進程數量的進程池
    result = p.map(run, testList)
    p.close()  # 關閉進程池,不再接受新的任務
    p.join()  # 主進程阻塞等待子進程的退出
    t3 = time.time()
    print('執行的時間為:', int(t3 - t2))

    print(result)
    
# 單進程執行
# 順序執行的時間為: 7
# 多進程 map 執行
# 執行的時間為: 2
# [1, 4, 9, 16, 25, 36, 49]

進程通信

Queue()

隊列:先進先出,按照順序

通信原理:在內存中建立隊列數據結構模型。多個進程都可以通過隊列存入內容,取出內容的順序和存入內容的順序保存一致。

方法 功能 參數
q = Queue(maxsize = 0) 創建隊列消息,並返回隊列對象。 表示最多存儲多少消息。默認表示根據內存分配存儲。
q.put(data, [block, timeout]) 向隊列存儲消息。 Data:要存入的數據。block:默認隊列滿時會堵塞,設置False則非堵塞。timeout:超時時間。
data = q.get([block, timeout]) 獲取隊列消息。 block:默認隊列空時會堵塞,設置False則非堵塞。timeout:超時時間。
q.full() 判斷隊列是否為滿。
q.empty() 判斷隊列是否為空。
q.size() 判斷隊列中的消息數量。
q.close() 關閉隊列。
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Queue


def foo(data):
    s = data.get()  # 管子的另一端放在子進程這里,子進程接收到了數據
    if s not in "":
        print('子進程已收到數據...')
        print(s)  # 子進程打印出了數據內容...


if __name__ == '__main__':  # 要加這行...

    q = Queue()  # 創建進程通信的Queue,你可以理解為我拿了個管子來...
    p = Process(target=foo, args=(q,))  # 創建子進程
    print('主進程准備發送數據...')
    q.put("數據接收成功")  # 將管子的一端放在主進程這里,主進程往管子里丟入數據↑
    p.start()  # 啟子子進程

    p.join()
    
# 主進程准備發送數據...
# 子進程已收到數據...
# 數據接收成功

Pipe()

通信原理:在內存中開辟管道空間,生成管道操作對象,多個進程使用“同一個”管道對象進行操作即可實現通信。

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello'])	# 向管道中寫入內容
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # 從管道讀取信息
    p.join()
    
    # prints "[42, None, 'hello']"

manager()

進程的 manager 方法可以共享數據,比如共享列表,元祖,字典,鎖,字符。

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing


def f(m_list):
    m_list.append("f")


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    m_list = manager.list([1, 2, 3])
    p = multiprocessing.Process(target=f, args=(m_list, ))
    p.start()
    p.join()
    print(m_list)
    
# [1, 2, 3, 'f']

三、多線程

線程在程序中是獨立的、並非的執行流。與分隔的進程相比線程之間的隔離程度要小,它們共享內存,文件句柄和其它進程應有的狀態。多線程之間共享全局變量

創建多線程多兩種方式

threading模塊Thread類

具體代碼如下:

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time


def run(n):
    print("task", n)
    time.sleep(1)
    print('1s')
    time.sleep(1)
    print('0s')
    time.sleep(1)


if __name__ == '__main__':
    t1 = threading.Thread(target=run, args=("t1",))
    t2 = threading.Thread(target=run, args=("t2",))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    
# task t1
# task t2
# 1s
# 1s
# 0s
# 0s

自定義線程

繼承threading.Thread類自定義線程類。其本質是重構Thread類中的run方法。

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from threading import Thread
import time


class MyThread(Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()
        self.n = n

    def run(self):
        print("task", self.n)
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)


if __name__ == '__main__':
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    t1.start()
    t2.start()
    

# task t1
# task t2
# 1s
# 1s
# 0s
# 0s

守護線程

setDaemon(True)把所有的子線程都變成了主線程的守護線程,因此當主進程結束后,子線程也會隨之結束。所以當主線程結束后,整個程序就退出了。

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time


def run(n):
    print("task", n)
    time.sleep(1)   # 此時子線程停1s
    print('2')
    time.sleep(1)
    print('1')


if __name__ == '__main__':
    t = threading.Thread(target=run, args=("t1",))
    t.setDaemon(True)   # 把子進程設置為守護線程,必須在start()之前設置
    t.start()
    print("end")
    
# task t1
# end

想要守護線程執行結束后,主進程再結束,可以使用 join 方法,讓主線程等待子線程執行完畢。

Lock

多線程和多進程最大的不同在於,多進程中,同一個變量,各自都有一份拷貝存與每個進程中,互不影響,而多線程中,所有變量都由所有線程共享,所以,任何一個變量都可以被任何一個線程修改,因此,線程之間共享數據最大的危險在於多個線程同時改一個變量,把內容給改亂了。

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading

value = 0
lock = threading.Lock()


def change_it(n):
    # 先存后取,結果應該為0:
    global value
    value = value + n
    value = value - n


# 未加鎖(值不確定)
def run_thread(n):
    for i in range(2000000):
        change_it(n)


# 加鎖
# def run_thread(n):
#     for i in range(2000000):
#         lock.acquire()
#         try:
#             change_it(n)
#         finally:
#             lock.release()


t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(value)

# 29

由於鎖只有一個,無論多少線程,同一時刻最多只有一個線程持有該鎖,所以不會造成修改的沖突。當多個線程同時執行 lock.acquire() 時,只有一個線程能成功獲取鎖,然后繼續執行代碼,其它線程就繼續等待直到獲得鎖為止。

獲得鎖的線程用完一定要釋放鎖,否則那些等待鎖的線程將會永遠的等待下去,成為死線程。所以用 try...finally 來確保鎖一定會被釋放。

鎖的好處就是確保某段關鍵代碼只能由一個線程從頭到尾完整的執行,壞處當然也很多,首先是阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率大大的下降了。其次,由於可以存在多個鎖,不同的線程持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執行,也無法結束,只能靠操作系統強制終止。

信號量(BoundedSemaphore類)

Lock同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程去更改數據。

import threading
import time

def run(n, semaphore):
    semaphore.acquire()   #加鎖
    time.sleep(1)
    print("run the thread:%s\n" % n)
    semaphore.release()     #釋放

if __name__ == '__main__':
    num = 0
    semaphore = threading.BoundedSemaphore(5)  # 最多允許5個線程同時運行
    for i in range(22):
        t = threading.Thread(target=run, args=("t-%s" % i, semaphore))
        t.start()
    while threading.active_count() != 1:
        pass  # print threading.active_count()
    else:
        print('-----all threads done-----')

GIL鎖

在非 python 環境中,單核情況下,同時只能有一個任務執行。多核可以同時支持多個線程同時執行。但是在 python 中,無論有多少核,同只能執行一個線程。究其原因,這就是GIL的存在導致的。

GIL全稱Global Interpreter Lock(全局解釋器鎖),來源是python設計之初的考慮,為了數據安全所做的決定。某個線程想要執行,必須先拿到GIL,我們可以把GIL看作是“通行證”,並且在一個python進程中,GIL只有一個。拿不到通行證的線程,就不允許進入CPU執行。GIL只有在cpython中才有,因為cpython調用的是c語言的原生線程,所以他不能直接操作cpu,只能利用GIL保證同一時間只能有一個線程拿到數據,而在pypy和jpython中是沒有GIL的。

python針對不同類型的代碼執行效率也是不同的。

1、cpu密集型代碼(各種循環處理、計數等),在這種情況下,由於計算機工作多,ticks計數很快就會達到閾值。然后觸發GIL的釋放與再競爭(多個線程來回切換是需要消耗資源的),所以python下的多線程對cpu密集型代並不友好。

2、IO密集型代碼(文件處理,網絡爬蟲等涉及文件讀寫的操作),多線程能夠有效提升效率(單線程下有IO操作會進行IO等待,造成不必要的浪費,而開啟多線程能在線程A等待時,自動切換到線程B,可以不浪費CPU的資源,從而能提升程序執行效率)。所以python的多線程對IO密集型代碼比較友好。

使用建議

python下想要充分利用多核CPU,就使用多進程。因為每個進程都有各子獨立的GIL,互不干擾,這樣就可以真正意義上的並行執行,在python中,多進程的執行效率優於多線程(僅僅針對多核CPU而言)。

四、爬取豆瓣電影TOP250

采取三種方式。爬取前250名電影。

(1)所爬取的網頁鏈接:https://movie.douban.com/top250?start=0&filter=

(2)通過分析網頁,發現第一頁的url start=0,第二頁的url start=25,第三頁的url start=50。

(3)主要爬取電影名跟評分,用來進行比對,所以數據方面就不過多的提取和保存,只簡單的打印出來。

多進程爬取

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing
from multiprocessing import Process, Queue

import time
from lxml import etree
import requests


class DouBanSpider(Process):
    def __init__(self, q, url_list, lock):
        # 重寫寫父類的__init__方法
        super(DouBanSpider, self).__init__()
        self.url_list = url_list
        self.q = q
        self.lock = lock
        self.headers = {
            'Host': 'movie.douban.com',
            'Referer': 'https://movie.douban.com/top250?start=225&filter=',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
        }

    def run(self):
        self.parse_page()

    def send_request(self, url):
        '''
        用來發送請求的方法
        :return: 返回網頁源碼
        '''
        # 請求出錯時,重復請求3次,
        i = 0
        while i <= 3:
            try:
                print(u"[INFO]請求url:" + url)
                return requests.get(url=url, headers=self.headers).content
            except Exception as e:
                print(u'[INFO] %s%s' % (e, url))
                i += 1

    def parse_page(self):
        '''
        解析網站源碼,並采用xpath提取 電影名稱和平分放到隊列中
        :return:
        '''
        time.sleep(0.1)
        while 1:
            try:

                url = self.url_list.pop()
            except IndexError as e:
                break
            self.lock.acquire()
            response = self.send_request(url)
            html = etree.HTML(response)
            #  獲取到一頁的電影數據
            node_list = html.xpath("//div[@class='info']")
            for move in node_list:
                # 電影名稱
                title = move.xpath('.//a/span/text()')[0]
                # 評分
                score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]

                # 將每一部電影的名稱跟評分加入到隊列
                self.q.put(score + "\t" + title)
            self.lock.release()


class AllUrlSpider(Process):
    def __init__(self, url_lis):
        super(AllUrlSpider, self).__init__()
        self.url_list = url_lis

    def run(self):
        base_url = 'https://movie.douban.com/top250?start='
        # 構造所有url
        for num in range(225, -1, -25):
            self.url_list.append(base_url + str(num))
            print("獲得URL:{}".format(base_url + str(num)))


def main():
    # 創建一個隊列用來保存進程獲取到的數據
    q = Queue()
    lock = multiprocessing.Lock()

    manager = multiprocessing.Manager()
    url_list = manager.list()
    a = AllUrlSpider(url_list)

    p = DouBanSpider(q, url_list, lock)
    b = DouBanSpider(q, url_list, lock)
    c = DouBanSpider(q, url_list, lock)

    a.start()
    p.start()
    b.start()
    c.start()

    a.join()
    p.join()
    b.join()
    c.join()

    while not q.empty():
        print(q.get())


if __name__ == "__main__":
    start = time.time()
    main()
    print('[info]耗時:%s' % (time.time() - start))

多進程爬取耗時7.15秒,部分結果如下圖所示:

多線程爬取

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from queue import Queue
from threading import Thread
import threading
import time

from lxml import etree
import requests

url_list = []
lock = threading.Lock()


class DouBanSpider(Thread):
    def __init__(self, q) :
        # 重寫寫父類的__init__方法
        super(DouBanSpider, self).__init__()
        self.q = q
        self.headers = {
            'Host': 'movie.douban.com',
            'Referer': 'https://movie.douban.com/top250?start=225&filter=',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
        }

    def run(self):
        self.parse_page()

    def send_request(self, url):
        '''
        用來發送請求的方法
        :return: 返回網頁源碼
        '''
        # 請求出錯時,重復請求3次,
        i = 0
        while i <= 3:
            try:
                print
                u"[INFO]請求url:" + url
                html = requests.get(url=url, headers=self.headers).content
            except Exception as e:
                print
                u'[INFO] %s%s' % (e, url)
                i += 1
            else:
                return html

    def parse_page(self):
        '''
        解析網站源碼,並采用xpath提取 電影名稱和平分放到隊列中
        :return:
        '''
        while 1:
            try:

                url = url_list.pop()
            except IndexError as e:
                break
            lock.acquire()
            response = self.send_request(url)
            html = etree.HTML(response)
            #  獲取到一頁的電影數據
            node_list = html.xpath("//div[@class='info']")
            for move in node_list:
                # 電影名稱
                title = move.xpath('.//a/span/text()')[0]
                # 評分
                score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]

                # 將每一部電影的名稱跟評分加入到隊列
                self.q.put(score + "\t" + title)
            lock.release()


class AllUrlSpider(Thread):
    def run(self):
        base_url = 'https://movie.douban.com/top250?start='
        # 構造所有url
        for num in range(225, -1, -25):
            url_list.append(base_url + str(num))
            print("獲得URL:{}".format(base_url + str(num)))


def main():
    # 創建一個隊列用來保存進程獲取到的數據
    q = Queue()
    a = AllUrlSpider()
    a.start()

    # 保存線程
    Thread_list = []
    # 創建並啟動線程
    for i in range(5):
        p = DouBanSpider(q)
        p.start()
        Thread_list.append(p)

    a.join()
    # 讓主線程等待子線程執行完成
    for i in Thread_list:
        i.join()

    while not q.empty():
        print(q.get())


if __name__ == "__main__":
    start = time.time()
    main()
    print('[info]耗時:%s' % (time.time() - start))

多線程爬取耗時5秒,部分結果如下圖所示:

耗時跟網絡的好壞也是有一定的關系,每次測出的數據結果也不一樣。但理論上來講,線程在I/O密集的操作性是要高於進程的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM