python爬蟲入門八:多進程/多線程


什么是多線程/多進程

引用蟲師的解釋:

計算機程序只不過是磁盤中可執行的,二進制(或其它類型)的數據。它們只有在被讀取到內存中,被操作系統調用的時候才開始它們的生命期。

進程(有時被稱為重量級進程)是程序的一次執行。每個進程都有自己的地址空間,內存,數據棧以及其它記錄其運行軌跡的輔助數據。操作系統管理在其上運行的所有進程,並為這些進程公平地分配時間。

線程(有時被稱為輕量級進程)跟進程有些相似,不同的是,所有的線程運行在同一個進程中,共享相同的運行環境。我們可以想像成是在主進程或“主線程”中並行運行的“迷你進程”。

 

為什么需要多線程/多進程

我們直接編寫的爬蟲程序是單線程的,在數據需求量不大時它能夠滿足我們的需求。

但如果數據量很大,比如要通過訪問數百數千個url去爬取數據,單線程必須等待當前url訪問完畢並且數據提取保存完成后才可以對下一個url進行操作,一次只能對一個url進行操作;

我們使用多線程/多進程的話,就可以實現對多個url同時進行操作。這樣就能大大縮減了爬蟲運行時間。

 

實現多線程/多進程

多線程

python提供了兩組多線程接口,一是thread模塊_thread,提供低等級接口;二是threading模塊,在thread模塊基礎上進行封裝,提供更容易使用的基於對象的接口,可以繼承Thread對象來實現多線程。

同時,還有其他線程相關的對象,如Timer、Lock等。

在這里,我們使用threading模塊實現多線程。

1. 添加線程

threading.Thread(target, args)

使用threading.Thread()新建一個線程,target是需要執行的函數,args是需要傳入該函數的參數,args接受一個tuple,即使只有一個參數也需要寫成(x,)形式

import threading

print(threading.active_count()) # 顯示當前激活的線程數
print(threading.enumerate()) # 顯示當前激活的線程
print(threading.current_thread()) # 當前運行的線程 


def thread_job():
    print('This is a thread of %s' % threading.current_thread())

def main():
    thread = threading.Thread(target=thread_job,) # 添加一個線程
    thread.start() # 開始該線程

if __name__ == '__main__':
    main()

2. 線程阻塞:join

join()的作用是調用該線程時,等待該線程完成后再繼續往下運行。

join通常用於主線程與子線程之間,主線程等待子線程運行完畢后再繼續執行,避免子程序和主程序同時運行,子程序還沒有運行完的時候主程序就已經運行結束。

import threading
import time

# 定義一個fun,傳入線程
def T1_job():
    print('T1 start\n')
    for i in range(10):
        time.sleep(0.1)
    print('T1 finish\n')
        
def T2_job():
    print('T2 start\n')
    print('T2 finish\n')
        
def main():
    thread1 = threading.Thread(target=T1_job, name='T1') # 添加線程,准備執行thread_job,命名T1
    thread2 = threading.Thread(target=T2_job, name='T2')
    
    thread1.start() # 執行該線程,沒有添加join的時候,同步執行main和thread_job
    thread2.start()
    
    thread1.join() # 等待thread1完成后才進行下一步-主程序
    thread2.join() # 等待thread2完成后才進行下一步-主程序
    print('all done')

if __name__ == '__main__':
    main()

3. 信息傳遞:Queue隊列

Queue是python標准庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列。

Queue是一種先進先出的數據結構,一般來說讀數據都從Queue頭讀,寫數據都從Queue尾寫入。

import threading
from queue import Queue

def job(l, q):
    for i in range(len(l)):
        l[i] = l[i]**2
    q.put(l) # 線程中,return獲取的值無法提取,需要放入q中

def multithreading():
    q = Queue() # 隊列
    threads = [] # 全部線程
    data = [[1, 2, 3], [3, 4, 5], [4,4,4], [5,5,5]]
    for i in range(4):
        # 4個線程來執行job函數
        t = threading.Thread(target=job, args=(data[i], q))
        t.start()
        threads.append(t) # 當前線程加入全部線程中
        
    # 對主線程中的每一個線程都執行join()
    for thread in threads:
        thread.join()
   
    results = [] # 保存結果
    for _ in range(4):
        results.append(q.get()) # 從q中拿出值,每次只能按順序拿出一個值
    print(results)
    
if __name__ == '__main__':
    multithreading()

# [[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]

4. 線程鎖:Lock

lock在不同線程使用同一共享內存時,能夠確保線程之間互不影響。

使用lock的方法是:在每個線程執行運算修改共享內存之前執行lock.acquire()將共享內存上鎖, 確保當前線程執行時,內存不會被其他線程訪問;

執行運算完畢后使用lock.release()將鎖打開, 保證其他的線程可以使用該共享內存。

lock.acquire()和lock.release()必須成對出現

# lock鎖,當前線程運行完成后才進行下一進程
import threading

def job1():
    global A, lock
    lock.acquire() # 打開鎖
    for i in range(10):
        A += 1
        time.sleep(0.2)
        print('job1', A)
    lock.release() # 關閉鎖
    
def job2():
    global A, lock
    lock.acquire() # 打開鎖
    for i in range(10):
        A += 10
        time.sleep(0.2)
        print('job2', A)
    lock.release() # 關閉鎖
    
if __name__ == '__main__':
    lock = threading.Lock() # lock鎖
    A = 0
    t1 = threading.Thread(target=job1)
    t2 = threading.Thread(target=job2)
    t1.start()
    t2.start()

將上述代碼中的lock.acquire()和lock.release()四行代碼注釋后運行,就是不加鎖的情況,這時候輸出結果都是混亂的。而加鎖后,輸出結果正常。

5. 線程池

線程池有幾種方法可以實現,這里我們使用multiprocessing.dummy庫。

from multiprocessing.dummy import Pool as ThreadPool # 線程池
import threading

def job(i):
    print(i, '\n', threading.current_thread())
    
if __name__ == '__main__':
    pool = ThreadPool(4) # 創建一個包含4個線程的線程池
    pool.map(job, range(12))
    pool.close() # 關閉線程池的寫入
    pool.join() # 阻塞,保證子線程運行完畢后再繼續主進程 

 

多進程

多進程multiprocessing和多線程threading類似,都是用在python中進行並行計算的,而多進程則是為了彌補python在多線程中的劣勢而出現的。

multiprocessing是使用計算機的多核進行運算,它可以避免多線程中GIL的影響。

python使用multiprocessing模塊實現多進程,用法和threading基本一致。

1. 添加進程

multiprocessing.Process(target, args)

使用multiprocessing.Process新建一個進程,target是需要執行的函數,args是需要傳入該函數的參數,args接受一個tuple,即使只有一個參數也需要寫成(x,)形式

import multiprocessing as mp

def job(a,d):
    print('aaaaa')

if __name__=='__main__':
    p1 = mp.Process(target=job,args=(1,2)) # 添加一個進程
    p1.start()
    p1.join()

2. 信息傳遞:Queue隊列

多進程中的Queue使用同多線程一致,同樣為先進先出

多進程可以直接從multiprocessing.Queue()導入Queue隊列。

import multiprocessing as mp

def job(q):
    res=0
    for i in range(1000):
        res+=i+i**2+i**3
    q.put(res)    # 將值放入隊列

if __name__=='__main__':
    q = mp.Queue() # Queue隊列
    p1 = mp.Process(target=job,args=(q,))
    p2 = mp.Process(target=job,args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get() # 從隊列中取出值
    res2 = q.get() # 從隊列中取出值
    print(res1, res2)

3. 進程池

import multiprocessing as mp

def job(x):
    return x*x

def multicore():
    pool = mp.Pool() # 定義一個進程池
    res = pool.map(job, range(100))
    print(res)

if __name__=='__main__':
    multicore()

 關於進程池的更多信息請跳轉至:

 4. 共享內存

一般的變量在進程之間是沒法進行通訊的,multiprocessing 給我們提供了 Value 和 Array 模塊,他們可以在不通的進程中共同使用。

Value()和Array()都接受兩個參數,第一個為數據類型,第二個是傳入的數。

Value()可以接受傳入單個數值,Array()可以接受傳入一個一維數組

import multiprocessing as mp

value1 = mp.Value('i', 0) # value接受單個數值,i表示一個帶符號的整型
array = mp.Array('i', [1, 2, 3, 4]) # Array接受一個一維數組

array2 = mp.Array('i', [[1,2], [2,3]]) # 傳入一個二維數組錯誤,傳入參數非一維數組

數據類型如下:

Type code
C Type Python Type Minimum size in bytes
'b' signed char int 1
'B' unsigned char int 1
'u' py_UNICODE Unicode character 2
'h' signed short int 2
'H' unsigned short int 2
'i' signed int int 2
'I' unsigned int int 2
'l' signed long int 4
'L' unsigned long int 4
'q' signed long long int 8
'Q' unsigned long long int 8
'f' float float 4
'd'
double
float 
8

5. 進程鎖

進程鎖同線程鎖使用方法一致,lock在不同進程使用同一共享內存時,能夠確保進程之間互不影響。

使用lock的方法是:在每個進程執行運算修改共享內存之前執行lock.acquire()將共享內存上鎖, 確保當前進程執行時,內存不會被其他進程訪問;

執行運算完畢后使用lock.release()將鎖打開, 保證其他的進程可以使用該共享內存。

lock.acquire()和lock.release()必須成對出現。 

import multiprocessing as mp

def job(v, num, l):
    l.acquire() # 鎖住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # 獲取共享內存
        print(v.value)
    l.release() # 釋放

def multicore():
    l = mp.Lock() # 定義一個進程鎖
    v = mp.Value('i', 0) # 定義共享內存
    p1 = mp.Process(target=job, args=(v,1,l)) # 需要將lock傳入
    p2 = mp.Process(target=job, args=(v,3,l)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

 

如何選擇多線程/多進程

1. 結論

CPU密集型代碼(各種循環處理、計算等等):使用多進程

IO密集型代碼(文件處理、網絡爬蟲等):使用多線程

2. 解釋

多線程和多進程的理解可以類比於公路。

假設當前公路均為單行道,並且出於安全考慮,一個車道只能同時行駛一輛汽車,一條公路只有一名駕駛員。只有一名指揮者進行集中調度,駕駛員獲取到了指揮者的調度信息才會駕駛。

單線程是只有一條公路而且是單車道,只能同時行駛一輛汽車;

多線程是只有一條公路,但是是多車道,可以同時行駛多輛汽車;

多進程是有很多條公路,每條公路可能是單車道也可能是多車道,同樣可以同時行駛多輛汽車。

 

因為GIL的存在,python中的多線程其實在同一時間只能運行一個線程,就像一名駕駛員只能同時駕駛一輛汽車。四線程類比於一條四車道的公路,但是駕駛員可以從駕駛車道A上的汽車切換至駕駛車道B上的汽車,駕駛員切換的速度夠快的話,看起來就像是這條公路上的四輛汽車都在同時行駛。指揮者發布的命令只需要跨越車道就能傳遞給駕駛員,命令傳輸的時間損耗相對較小。所以對於多線程,我們希望指揮者可以比較頻繁發布命令,駕駛員獲取到命令后能夠很快就完成然后切換到下一個車道繼續執行命令,這樣看起來就像是駕駛員同時駕駛四輛汽車了。所以對於IO密集型代碼,推薦使用多線程。

而對於多進程來說,每條公路都有一名駕駛員,四線程類比於四條公路,則四名駕駛員可以同時駕駛四輛汽車。但指揮者發布的命令需要跨越公路才能傳遞給駕駛員,命令傳輸的時間損耗相對較大。所以對於多進程,我們希望指揮者發布一次命令后駕駛員可以執行較長時間,這樣就不必把時間過多花費在信息傳輸上。所以對於CPU密集型代碼,推薦使用多進程。 

 

參考資料

1. python的多線程中的join的作用

2. python隊列Queue

3. Python多線程(2)——線程同步機制

4. 莫煩PYTHON-Threading多線程

5. Python 多進程鎖 多進程共享內存

6. python學習筆記——多進程中共享內存Value & Array

7. 莫煩PYTHON-multiprocessing多進程

8. python 之 多進程

9. Python多進程 

10. Python 使用multiprocessing 特別耗內存

11. 廖雪峰-進程和線程 

12. python 多線程,詳細教程,線程同步,線程加鎖,ThreadPoolExecutor

13. 多進程 multiprocessing 多線程Threading 線程池和進程池concurrent.futures


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM