python基礎之多線程和線程池


為什么要引入並發編程

場景1:一個網絡爬蟲,按順序爬取花了1小時,采用並發下載減少到20分鍾
場景2:一個APP應用,優化前每次打開頁面需要3秒,采用異步並發提升到打開每次200毫秒

其實引入並發就是為了提升程序的運行速度。

python中對並發編程的支持

  • 多線程:threading模塊,利用CPU和IO可以同時執行的原理,讓CPU不會干巴巴的等待IO完成
  • 多進程:multiprocessing模塊,利用多核CPU的能力,真正的並行執行任務
  • 異步IO:asyncio模塊,在單線程利用CPU和IO同時執行的原理,實現函數異步執行

同時python又提供了一些模塊來輔助或者簡化並發的運行。

  • 使用Lock對資源進行加鎖,防止沖突訪問
  • 使用Queue實現不同線程、進程之間的數據通信,實現生產者-消費者模型
  • 使用線程池、進程池Pool,簡化線程、進程的任務提交,等待結束,獲取結果

如何選擇

  • CPU密集型:CPU密集型也叫計算密集型,是指I/O在很短的時間就可以完成,CPU需要大量的計算和處理,特點是CPU占用率特別高。典型的示例:壓縮、解壓縮,加密解密,正則表達式搜索等
  • IO密集型:IO密集型指的是系統運作大部分的狀況是CPU在等待I/O,例如一些磁盤、內存、網絡的讀寫,這種狀況,CPU占用率不高,系統IO特別高。 典型的示例:文件處理程序,網絡爬蟲,讀寫數據庫等

python中多進程、多線程、多協程的對比

  • 多線程:
    • 優點:相比進程,更加輕量級,占用資源少
    • 缺點:相比協程,啟動數目有限,占用內存資源,有線程切換開銷
    • 適用於:IO密集型計算,同時運行的任務數目要求不多
  • 多進程
    • 優點:可以利用多核CPU並行運算
    • 缺點:占用資源最多,可啟動數目比線程少
    • 適用於:CPU密集型計算場景
  • 多協程
    • 優點:內存開銷最少,啟動協程數可以非常多
    • 缺點:支持的庫有限,例如不能使用requests模塊,代碼實現復雜
    • 適用於:IO密集型計算、需要超多任務運行,有現成庫支持的場景

python全局解釋器鎖GIL

python速度慢的兩個原因?

相比其他語言,例如:C/C++/java/golang,python確實很慢,在一些特殊場景下,python要比C++慢100~200倍

那么python慢的原因到底是什么?

  1. 動態類型語言,邊解釋邊執行
  2. GIL無法利用多核CPU並發執行

那么GIL是什么?

GIL:全局解釋器鎖,是計算機程序設計語言解釋器用於同步線程的一種機制,它使得任何時刻僅有一個線程在執行,即便在多核心處理器上,使用GIL的解釋器也只允許同一時間執行一個線程。

出現GIL的原因?

python設計初期,為了規避並發問題,解決多線程之間數據完整性和狀態同步問題,因此引入了GIL。

由於python中對象的管理,是使用引用計數器進行的,引用數為0則釋放對象。

比如:有兩個線程A和B都想引用對象obj,並對該對象做撤銷處理,線程A先執行了撤銷,將對象obj做了減一處理,此時發生了多線程的調度切換,線程B也做了obj的撤銷處理,obj此時又減一,這個時候又發生了多線程調度切換,此時對象obj的計數已經為0,此時Python會釋放此對象,這個時候可能會破壞內存。

而多線程在執行期間,線程會釋放GIL,實現CPU和IO的並行執行,因此多線程對於IO密集型的運行效率會有很大的提升。

創建多線程的方法

創建多線程的流程

  1. 先准備一個執行函數,例如:
def my_func(a, b):
    do_something(a, b)
  1. 創建一個線程
import threading
t = threading.Thread(target=my_func, args=(100, 200,))
  1. 啟動線程
t.start()
  1. 等待結束
t.join()

爬蟲示例

測試示例來源於爬取的北京新發地菜價信息,地址如下:http://www.xinfadi.com.cn/priceDetail.html

瀏覽器f12抓包分析,可以看到,價格信息是通過http://www.xinfadi.com.cn/getPriceData.html 這個請求拿到的,請求方法為POST,我們試着拿第一頁的數據信息,代碼如下:

import requests

url = 'http://www.xinfadi.com.cn/getPriceData.html'


def get_resource(url, page=1):
    data = {
        "limit": 20,
        "current": page
    }
    resp = requests.post(url, data=data)
    resp.encoding = 'utf-8'
    price_list = resp.json()['list']
    res_data = [
        (info['prodName'], info['place'], info['avgPrice']) for info in price_list
    ]
    print(res_data)
    return res_data


if __name__ == '__main__':
    res = get_resource(url)
    print(res)

執行結果如下:

[('大白菜', '冀陝遼', '1.15'), ('娃娃菜', '冀', '1.25'), ('小白菜', '', '2.75'), ('圓白菜', '冀', '2.5'), ('圓白菜', '魯', '1.9'), ('紫甘藍', '冀', '0.75'), ('芹菜', '魯', '2.65'), ('西芹', '遼', '2.9'), ('菠菜', '蒙', '6.5'), ('萵筍', '冀', '2.25'), ('團生菜', '冀', '4.5'), ('散葉生菜', '京遼', '4.75'), ('羅馬生菜', '冀', '3.25'), ('油菜', '冀', '2.9'), ('香菜', '冀', '6.0'), ('茴香', '冀', '6.5'), ('韭菜', '粵冀', '2.85'), ('苦菊', '遼', '4.5'), ('油麥菜', '遼', '6.0'), ('黃心菜', '皖', '1.55')]

下面看下單線程爬取五十頁的菜價信息時的用時情況吧:

為了測試方便,這里寫一個統計程序運行時間的裝飾器,最終單線程運行時的代碼為:

import time
import xinfadi_spider
import threading
from functools import wraps


def count_time(func):
    @wraps(func)
    def _wraper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        print(f"運行時間: {end - start}")
        return res

    return _wraper


@count_time
def single_thread():
    for page in range(1, 51):
        xinfadi_spider.get_resource(xinfadi_spider.url, page)


if __name__ == '__main__':
    single_thread()

運行時間最終為:

運行時間: 13.166715621948242

下面再看下多線程時的運行代碼:

@count_time
def single_thread():
    for page in range(1, 51):
        xinfadi_spider.get_resource(xinfadi_spider.url, page)
    return


@count_time
def multi_thread():
    t = []
    for page in range(1, 51):
        t.append(
            threading.Thread(target=xinfadi_spider.get_resource, args=(xinfadi_spider.url, page,))
        )
    for thread in t:
        thread.start()
    for thread in t:
        thread.join()


if __name__ == '__main__':
    multi_thread()

最終運行速度提升了10倍左右,結果為:

運行時間: 1.8293204307556152

線程安全

線程安全指的是某個函數、函數庫在多線程環境中被調用時,能正確的處理多個線程的共享變量,使程序功能正確完成

為了保證線程安全,python提供了QueueLock這兩種方式,下面會結合一些案例進行介紹。

Queue隊列

Queue是python專門提供的一種數據類型,其特點就是隊列會通過先進先出,或者先進后出的模式,保證單個數據不會同時被多個線程進行訪問,因此使用Queue可以安全的訪問數據,而不會造成數據共享沖突。

另外使用Queue還能夠實現生產者和消費者模型,進行程序間的解耦,可以用較少的資源解決高並發的一些問題。

Queue的一些常用方法:

  1. 導入模塊: import queue
  2. 創建隊列: q = queue.Queue()
  3. 添加元素: q.put(item)
  4. 獲取元素: q.get()
  5. 查詢元素的個數: q.qsize()
  6. 判斷是否為空: q.empty()
  7. 判斷是否已滿: q.full()

這里結合上面的爬蟲案例,使用Queue來進行並發解析,整體流程如下:

首先改造一下我們之前的爬蟲程序,將解析獲取網頁數據和解析網頁數據分開

import requests


url = 'http://www.xinfadi.com.cn/getPriceData.html'


def get_resource(url, page=1):
    data = {
        "limit": 20,
        "current": page
    }
    resp = requests.post(url, data=data)
    resp.encoding = 'utf-8'
    return resp.json()


def parse_resource(resource_data):
    price_list = resource_data['list']
    res_data = [
        (info['prodName'], info['place'], info['avgPrice']) for info in price_list
    ]
    return res_data


if __name__ == '__main__':
    res = get_resource(url)
    print(parse_resource(res))

然后編寫生產者和消費者

import queue
import threading
import xinfadi_spider


def producer(page_queue: queue.Queue, resource_queue: queue.Queue):
    while True:
        page = page_queue.get()
        resource = xinfadi_spider.get_resource(xinfadi_spider.url, page)
        resource_queue.put(resource)
        print(f"{threading.current_thread().name} 爬取第{page}頁內容, 當前隊列大小為: "
              f"{page_queue.qsize()}")


def consumer(resource_queue: queue.Queue):
    while True:
        resouce = resource_queue.get()
        parse_resource = xinfadi_spider.parse_resource(resouce)
        print(f"{threading.current_thread().name} 解析數據, "
              f"隊列大小為: {resource_queue.qsize()}")



if __name__ == '__main__':
    page_queue = queue.Queue()
    resource_queue = queue.Queue()

    for page in range(1, 51):
        page_queue.put(page)

    for i in range(3):
        t = threading.Thread(name=f'producer-{i}', target=producer, args=(page_queue, resource_queue,))
        t.start()

    for i in range(2):
        t = threading.Thread(name=f'consumer-{i}', target=consumer, args=(resource_queue,))
        t.start()

最終的結果如下:

producer-1 爬取第2頁內容, 當前隊列大小為: 47
producer-0 爬取第1頁內容, 當前隊列大小為: 47
producer-2 爬取第3頁內容, 當前隊列大小為: 45consumer-1 解析數據, 隊列大小為: 2
consumer-1 解析數據, 隊列大小為: 1
consumer-1 解析數據, 隊列大小為: 0

producer-0 爬取第5頁內容, 當前隊列大小為: 44consumer-1 解析數據, 隊列大小為: 0
........
........
........
producer-0 爬取第47頁內容, 當前隊列大小為: 0
consumer-0 解析數據, 隊列大小為: 0
producer-1 爬取第50頁內容, 當前隊列大小為: 0
producer-2 爬取第49頁內容, 當前隊列大小為: 0consumer-0 解析數據, 隊列大小為: 1
consumer-0 解析數據, 隊列大小為: 0

Lock鎖

而使用Lock時,在線程函數執行前,會先加鎖,執行完成后,會釋放鎖,確保每次只有一個線程占有該鎖

下面結合一個經典的銀行取錢的例子,來說一下為什么要引入Lock

import threading


class Account:
    def __init__(self, account, balance):
        # 賬戶
        self.account = account
        # 賬戶余額
        self.balance = balance


def draw_money(account, money):
    if account.balance >= money:
        print(f"{threading.current_thread().name} 取錢成功,取出金額為: {money}")
        account.balance -= money
        print(f"當前賬戶余額為: {account.balance}")
    else:
        print(f"{threading.current_thread().name} 取錢失敗,余額不足!")


if __name__ == '__main__':
    account = Account("tom", 1000)
    threading.Thread(name='A', target=draw_money, args=(account, 800,)).start()
    threading.Thread(name='B', target=draw_money, args=(account, 800,)).start()

正常的結果輸出應該為下面:

A 取錢成功,取出金額為: 800
當前賬戶余額為: 200
B 取錢失敗,余額不足!

創建了兩個線程A和B,那么當A取完錢之后,賬戶余額就變成了200,因此當B再去執行時,賬戶余額不足,因此會提示余額不足,但是當我們多運行幾次后,你會發現運行結果會出現下面的情況:

A 取錢成功,取出金額為: 800
B 取錢成功,取出金額為: 800
當前賬戶余額為: 200
當前賬戶余額為: -600

出現這種情況的原因是因為run()方法不具有線程的安全性,在線程去修改account賬戶余額時,恰好發生了線程切換,即另一個線程B去修改了account的賬戶,因此就出現了上面的結果。

當我們在代碼里,更改余額的邏輯前面加一個阻塞time.sleep(0.5)之后

time.sleep(0.5)
account.balance -= money

你會看到每次的運行結果總會是錯誤的

A 取錢成功,取出金額為: 800
B 取錢成功,取出金額為: 800
當前賬戶余額為: 200
當前賬戶余額為: -600

那么python為了解決這個問題,引入了互斥鎖Lock, 使用方法如下:

  1. 創建線程鎖: lock = threading.Lock()
  2. 創建遞歸鎖: lock = threading.RLock()
  3. 加鎖: lock,acquire()
  4. 釋放鎖: locak.release()

對應的代碼寫法有兩種,一種是通過try...finally...方式,另一種是通過with方式,如下:

import threading

lock = threading.Lock()

lock.acquire()
try:
    do something
finally:
    lock.release()

import threading

lock = threading.Lock()

with lock:
    do something

這里直接使用with的方式(使用起來更簡潔),結合上面的示例就是:

import threading
import time

class Account:
    def __init__(self, account, balance):
        # 賬戶
        self.account = account
        # 賬戶余額
        self.balance = balance
        # 定義互斥鎖
        self.lock = threading.Lock()


def draw_money(account, money):
    with account.lock:
        if account.balance >= money:
            print(f"{threading.current_thread().name} 取錢成功,取出金額為: {money}")
            time.sleep(0.5)
            account.balance -= money
            print(f"當前賬戶余額為: {account.balance}")
        else:
            print(f"{threading.current_thread().name} 取錢失敗,余額不足!")


if __name__ == '__main__':
    account = Account("tom", 1000)
    threading.Thread(name='A', target=draw_money, args=(account, 800,)).start()
    threading.Thread(name='B', target=draw_money, args=(account, 800,)).start()

無論在邏輯里面添加任何阻塞,最終的結果都是正確的

A 取錢成功,取出金額為: 800
當前賬戶余額為: 200
B 取錢失敗,余額不足!

線程池

首先了解一下線程的生命周期,如下圖所示:

從上圖可以看到新建線程時,系統需要分配資源,終止線程系統需要回收資源,因此這就會產生一定新建和終止的開銷,如果可以重用線程,那么就可以減少系統開銷,所以就有了線程池,那么使用線程池有哪些優勢呢?

  1. 提升性能,減少了大量的新建、終止線程的開銷,重用線程資源
  2. 適用於處理突發性大量請求或需要大量線程來完成任務,但實際任務處理時間較短的場景
  3. 能有效避免系統因為創建線程過多,導致系統負荷較高而變慢的問題
  4. 使用線程池,比單獨使用線程要更加簡潔

使用方法如下:

  1. map函數方式

map的結果和入參順序是固定的

from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor() as pool:
    # func 是目標函數
    # args_list 是一個參數列表
    results = pool.map(func, args_list)
    # 獲取執行的返回結果
    for result in results:
        print(result)
  1. future模式

as_completed順序是不固定的

from concurrent.futures import ThreadPoolExecutor, as_completed


with ThreadPoolExecutor() as pool:
    # arg 是指一個參數
    futures = [pool.submit(func, arg) for arg in args_list]
    for future in futures:
        print(future.result())
    for future in as_completed(futures):
        print(future.result())

下面結合上面的爬蟲案例進行改造,首先是通過submit方式來看下

import xinfadi_spider
from concurrent.futures import ThreadPoolExecutor


# 新建一個線程來獲取所有url資源
with ThreadPoolExecutor() as p1:
    futures = {
        page: p1.submit(xinfadi_spider.get_resource, xinfadi_spider.url, page)
        for page in range(1, 51)
    }
    for k, v in futures.items():
        print(k, v.result())

with ThreadPoolExecutor() as p2:
    futures_parse = {}
    for resource in futures.values():
        res = p2.submit(xinfadi_spider.parse_resource, resource.result())
        futures_parse[res] = resource
    for k, v in futures_parse.items():
        print(res.result())

注意:當使用submit時,返回的是一個future對象,可以通過result()獲取返回結果,

而使用map提交任務時,相當於啟動了len(iterlables)個線程來並發的去執行func函數

with ThreadPoolExecutor() as p3:
    res = p3.map(xinfadi_spider.get_resource, [xinfadi_spider.url] * 50, [i for i in range(1, 51)])

with ThreadPoolExecutor() as p4:
    p4.map(xinfadi_spider.parse_resource, [r for r in res])

需要注意的是,使用map時,傳入多個參數時,需要保證傳入的變量是一個可迭代的對象,例如數組、元祖等,並且需要保證參數的個數是一致的。

如果不使用with關鍵字來創建線程池時,例如直接通過pool = ThreadPoolExecutor()創建線程池時,需要在結束時使用pool.shutdown()來關閉線程池

多進程

多線程和協程本質上還是在單核上進行,而多進程是真正意義上的並行,利用了多進程在多核CPU上並行執行。

由於多進程和多線程寫法幾乎一樣,所以這里不在做過多的講解,只列出一些創建方法和使用方法。

  1. 導入模塊
# 多進程
from multiprocessing import Process

# 多線程
from threading import Thread
  1. 新建、啟動、等待結束
# 多進程
p = Process(target=func, args=(1,))
p.start()
p.join()

# 多線程
t = Thread(target=func, args=(1,))
t.start()
t.join()
  1. 數據通信
# 多進程
from multiprocessing import Queue
q = Queue()
q.put([1,2,3])
item = q.get()

# 多線程
import queue
q = Queue()
q.put([1,2,3])
item = q.get()
  1. 線程安全加鎖
# 多進程
from multiprocessing import Lock
lock = Lock()
with lock:
  do_something()
  
# 多線程
from threading import Lock
lock = Lock()
with lock:
  do_something()
# 多進程
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
    # 方法一
    res = pool.map(func, *iterables)
    # 方法二
    res = pool.submit(func, arg)
    result = res.result()
    
# 多線程
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
    # 方法一
    res = pool.map(func, *iterables)
    # 方法二
    res = pool.submit(func, arg)
    result = res.result()

協程

關於協程這部分,需要很多的內容去描述,因此放到下一篇進行學習和介紹。

歡迎各位朋友關注我的公眾號,來一起學習進步哦
images


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM