淺析Python多線程


學習Python多線程的資料很多,吐槽Python多線程的博客也不少。本文主要介紹Python多線程實際應用,且假設讀者已經了解多線程的基本概念。如果讀者對進程線程概念不甚了解,可參見知名博主 阮一峰 轉譯的一篇博客:《進程與線程的一個簡單解釋》

1 線程的基本操作

Python中多線程主要有兩個模塊,_thread和threading模塊。前者更底層,后者更常用,能滿足絕大部分編程需求,今天主要圍繞threading模塊展開介紹。啟動一個線程需要用threading模塊中的Thread。

線程的啟動需要先創建Thread對象,然后調用該對象的start()方法,參見下例:

import time
import threading

def func(n):
    while n > 0:
        print("線程name:", threading.current_thread().name, "參數n:", n)
        n -= 1
        time.sleep(1)

t = threading.Thread(target=func, args=(5,))
t.start()
print("主線程:", threading.current_thread().name)
# 運行結果:
# 線程name: Thread-1 參數n: 5
# 主線程: MainThread
# 線程name: Thread-1 參數n: 4
# 線程name: Thread-1 參數n: 3
# 線程name: Thread-1 參數n: 2
# 線程name: Thread-1 參數n: 1

上例中,threading.current_thread().name 是獲取當前線程的name屬性。

Thread中,形參target傳入函數名,args傳入函數對應的參數,參數必須是可迭代對象,如果是元組且只有一個參數必須寫成(參數,)的形式,逗號不能省略

一旦啟動一個線程,該線程將由操作系統來全權管理,獨立執行直到目標函數返回。一般情況下,線程的操作有以下幾種:

t.is_alive()    # 查詢線程對象的狀態,返回布爾值
t.join()        # 將線程加入到當前線程,並等待其終止

t = Thread(target=countdown, args=(10,), daemon=True)  # 后台線程
t.start()  

查看線程狀態示例:

import time
import threading

def func(n):
    while n > 0:
        print("線程name:", threading.current_thread().name, "參數n:", n)
        n -= 1
        time.sleep(1)

t = threading.Thread(target=func, args=(2,))
t.start()
print("主線程:", threading.current_thread().name)

if t.is_alive():
    print("活着的")
else:
    print("未存活")
print("主線程結束")

讓主線程等待其他線程,就是主線程會在join()處一直等待所有線程都結束之后,再繼續運行。參見下例:

import time
import threading

def func(n):
    while n > 0:
        print("線程name:", threading.current_thread().name, "參數n:", n)
        n -= 1
        time.sleep(1)

t = threading.Thread(target=func, args=(2,))
t.start()
t.join()
print("主線程:", threading.current_thread().name)
print("主線程結束")
# 運行結果:
# 線程name: Thread-1 參數n: 2
# 線程name: Thread-1 參數n: 1
# 主線程: MainThread
# 主線程結束

后台線程參見下例:

import time
import threading

def func(n):
    while n > 0:
        print("參數n:", n)
        n -= 1
        time.sleep(1)

t = threading.Thread(target=func, args=(10, ), daemon=True)
t.start()
time.sleep(3)
print("主線程結束")

# 參數n: 10
# 參數n: 9
# 參數n: 8
# 參數n: 7
# 主線程結束

 

后台線程無法等待,但主線程終止時后台線程自動銷毀。 如果要對線程進行高級操作,如發送信號終止線程,都需要自己實現。下例通過輪詢控制線程退出

import time
from threading import Thread

class StopThread:
    def __init__(self):
        self._flag = True

    def terminate(self):
        self._flag = False

    def run(self, n):
        while self._flag and n > 0:
            print('num>>:', n)
            n -= 1
            time.sleep(1)

obj = StopThread()
t = Thread(target=obj.run, args=(11,))
t.start()

time.sleep(5)    # 表示do something

obj.terminate()  # 終止線程
t.join()
print("主線程結束")

上例通過類中的_flag控制線程的終止,當主線程執行5秒之后,主動將_flag賦值為False終止線程。通過輪詢終止線程存在一個問題,如果while self._flag and n > 0:這句后,某次循環一直阻塞在I/O操作上,根本不會進行下一次循環,自然就無法終止。這該怎么辦呢?留一個思考題。

多線程還可以通過繼承Thread實現,如下:

import time
from threading import Thread

class A(Thread):
    def __init__(self,):
        super().__init__()

    def run(self):
        print("run1..", )
        time.sleep(5)
        print("run2..")

obj = A()
obj.start()
print("主線程結束")

2 線程鎖和一個怪象

當我們用多個線程同時修改同一份數據時,怎么保證最終結果是我們期許的呢?舉個例子,當前有一個全局變量a=0,如果有10個線程同時對其加1,這就出現了線程間的競爭,到底應該聽誰的呢?這時候,應該用線程鎖來解決。也就是當某一個線程A對該數據操作時,對該數據加鎖,其他線程只能等着,等待A操作完之后釋放了該鎖,其他線程才能操作該數據,一旦某個線程獲得操作數據的權限,立即又加上鎖。如此便能保證數據的安全准確。奇怪的是,在Python3中,即使不加鎖,好像也不會發生數據出錯的情況。或許這個例子不是很好,也或許是Python3中自動加了鎖。希望有知道的讀者賜教一下。這個奇怪的現象就是下例了:

from threading import Thread
import time

def add_one(a):
    time.sleep(1)
    print("in thread a:", a)
    a[1] += 1

if __name__ == '__main__':
    array = [0, 1, 4]
    thread_obj_list = []

    for i in range(50):
        t = Thread(target=add_one, args=(array,))
        t.start()
        thread_obj_list.append(t)

    for j in thread_obj_list:
        j.join()

    print("array result::", array)
    # array result:: [0, 51, 4]  

我們看到,最后array的第二個元素是51,並沒有出錯,這真是令人費解。好了,言歸正傳,來看看線程鎖的幾個方法吧:

lock = threading.Lock()     # Lock對象
lock.acquire()              # 鎖定
lock.release()              # 解鎖

Lock有“鎖定”或“解鎖”兩種狀態之一。它是在解鎖狀態下創建的。它有兩個基本方法,acquire() 和 release()
當狀態為解鎖時,acquire()將狀態更改為鎖定並立即返回。當狀態被鎖定時,acquire()塊直到對另一個協程中的release()的調用將其改變為解鎖,然后acquire()調用將其重置為鎖定並返回。
release()方法只應在鎖定狀態下調用;它將狀態更改為已解鎖並立即返回。如果嘗試釋放已解鎖的鎖,則會引發 RuntimeError。

下面是一個具體的使用例子:

from threading import Thread
import time
import threading

lock = threading.Lock()

def add_one(a):
    time.sleep(1)
    lock.acquire()
    a[1] += 1
    lock.release()

if __name__ == '__main__':
    array = [0, 1, 4]
    thread_obj_list = []

    for i in range(50):
        t = Thread(target=add_one, args=(array,))
        t.start()
        thread_obj_list.append(t)

    for j in thread_obj_list:
        j.join()

    print("array result::", array)
    # array result:: [0, 51, 4]  

acquire()和release()方法成對出現。但是這樣手動釋放有時候可能會遺忘,這時候可以考慮用上下文管理協議。關於上下文管理協議,可參見作者的這篇文章【Python上下文管理器】。

Lock對象支持with語句:

def add_one(a):
    time.sleep(1)
    with lock:
        a[1] += 1 

3 遞歸鎖 

可重入鎖(又稱遞歸鎖,RLock),就是大鎖中包含子鎖的情況下使用。在這種情況下,再用Lock時,就會出現死鎖現象,此時應該用threading.RLock()對象了,用法同Lock,參見下例:

from threading import Thread
import time
import threading

lock = threading.RLock()

def add_one(a):
    lock.acquire()
    a[1] += 1
    lock.release()

def add_two(b):
    time.sleep(1)
    lock.acquire()
    b[1] += 2
    add_one(b)
    lock.release()

if __name__ == '__main__':
    array = [0, 1, 4]
    thread_obj_list = []

    for i in range(50):
        t = Thread(target=add_two, args=(array,))
        t.start()
        thread_obj_list.append(t)

    for j in thread_obj_list:
        j.join()

    print("array result::", array)
    # array result:: [0, 151, 4]  

上例讀者可以試試Lock(),看看什么效果。RLock()還支持上下文管理協議,上例中的兩個函數可以改成這樣:

def add_one(a):
    with rlock:
        a[1] += 1

def add_two(b):
    time.sleep(1)
    with rlock:
        b[1] += 2
        add_one(b)

4 GIL

全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),是計算機程序設計語言解釋器用於同步線程的一種機制,它使得任何時刻僅有一個線程在執行。所以很多人說Python的線程是假線程,並能利用多核,並不能真正並行。之所以感覺到線程並行,是因為線程上下文不斷切換的緣故。Python 3.2開始使用新的GIL。新的GIL實現中用一個固定的超時時間來指示當前的線程放棄全局鎖。在當前線程保持這個鎖,且其他線程請求這個鎖時,當前線程就會在5毫秒后被強制釋放該鎖。關於全局鎖,強調三點:

(1)GIL的存在,同一時刻只能有一個線程在運行。

(2)GIL是CPython的特性,Jython,pypy等並無GIL。

(3)Cpython的多線程適用於I/O密集型問題,計算密集型問題可使用多進程編程。  

5 判斷線程狀態

在多線程編程中,有時候某個線程依賴另一個線程的狀態,需要使用threading庫中的Event對象。 Event對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。可將線程設置等待Event對象, 直到有其他線程將Event對象設置為真,這些等待Event對象的線程將開始執行。Event()對象的常用方法:

event = threading.Event()   # 創建threading.Event()對象

event.is_set()   # 獲取event的設置值,默認為False
event.set()      # 設置event的值為True
event.clear()    # 設置event的值為False
event.wait()     # 等到event的值被設為True就執行

下面通過“交通信號燈”問題示范event的使用:

import threading
import time

def traffic_light(event):
    count = 0
    event.set()
    while True:
        # 如果計數器[0, 5)之間, 紅燈,event=False
        if 0 <= count < 5:
            event.clear()
            print("light is Red")
        # 如果計數器[5, 10)之間, 綠燈,event=True
        elif 5 <= count < 10:
            event.set()
            print("light is Green")
        # 如果計數器大於10,紅燈,將event設置為False,計數器置為0
        else:
            event.clear()
            count = 0
        time.sleep(1)
        count += 1

def car(name, event):
    while True:
        if not event.is_set():
            # event為False, 表示紅燈, 車只能等待
            print("RED, the %s is waiting..." % name)
            # 此處會阻塞住,直到event被設置為True在執行
            event.wait()
            print("Green, The %s going...." % name)

e = threading.Event()
light = threading.Thread(target=traffic_light, args=(e,))
light.start()
car1 = threading.Thread(target=car, args=("Tesla", e, ))
car1.start()

交通信號燈有紅燈和綠燈兩種狀態,每5秒切換一次狀態,而car()函數中,只要燈變綠就放car通行。運行試試看。

event對象的一個重要特點是當它被設置為真時會喚醒所有等待它的線程。如果你只想喚醒單個或者一定數目的線程,最好是使用信號量或者 Condition 對象來替代。

Condition對象

 condition對象總是與鎖關聯,可以手動傳入鎖對象,也可以不傳入使用默認值。當有多個線程需要等待某個變量改變時,才開始執行。這種情況可以用condition對象實現。condition對象的主要方法有:

condition = threading.Condition(lock=None)   # 創建Condition對象  參數可以不傳

condition.acquire()    # 加鎖
condition.release()    # 解鎖

condition.wait(timeout=None)                 # 阻塞,直到有調用notify(),或者notify_all()時再觸發
condition.wait_for(predicate, timeout=None)  # 阻塞,等待predicate條件為真時執行

condition.notify(n=1)        # 通知n個wait()的線程執行, n默認為1
condition.notify_all()       # 通知所有wait着的線程執行

with condition:              # 支持with語法,不必每次手動調用acquire()/release() 

看一個例子不是很優雅的例子:

import threading
import time

condition = threading.Condition()    # 創建condition對象

def func():
    condition.acquire()    # 如果沒有with語句,必寫這句,否者報錯
    condition.wait()       # 阻塞,等待其他線程調用notify()
    print("in func..")
    condition.release()    # 與acquire()成對出現

# 啟10個線程
for i in range(10):
    t = threading.Thread(target=func, args=())
    t.start()

time.sleep(5)

condition.acquire()
condition.notify(2)        # 通知兩個線程執行
condition.release()

# in func..
# in func..
# 其他8個線程會繼續等待... 

上例中,我們看到啟動的10個線程會等待5秒鍾並且調用了notify(2)之后,才會通知兩個線程繼續運行。且這兩個線程執行完畢之后,其他8個線程仍然會阻塞在condition.wait() 處。

頻繁寫acquire() / release()很繁瑣,下面是優雅的寫法:

import threading
import time

condition = threading.Condition()    # 創建condition對象

def func(n):
    with condition:            # with更優雅
        condition.wait()       # 阻塞,等待其他線程調用notify()
        print("in func..", n)


# 啟10個線程
for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()

time.sleep(5)

with condition:
    condition.notify_all()        # 通知所有線程執行 

運行下,是不是等待5秒之后,所有線程都繼續執行了?

7 信號量

信號量通常用於防范容量有限的資源,例如數據庫服務器。一般而言信號量可以控制釋放固定量的線程。比如啟動100個線程,信號量的控制值設為5,那么前5個線程拿到信號量之后,其余線程只能阻塞,等到這5個線程釋放信號量鎖之后才能去拿鎖。參見下例:

import threading
import time

def func(n):
    # semaphore.acquire()
    with semaphore:
        time.sleep(2)
        print("Thread::", n)
    # semaphore.release()

semaphore = threading.BoundedSemaphore(5)   # 信號量, 每次釋放5個線程

thread_list = []
for i in range(23):
    t = threading.Thread(target=func, args=(i,))
    thread_list.append(t)
    t.start()

for j in thread_list:
    j.join()

print("all threads done") 

上例中,可以看到線程是每5個一組進行釋放的。 

8 Barrier對象

Barriers字面意思是“屏障”,是Python線程(或進程)同步原語。每個線程中都調用wait()方法,當其中一個線程執行到wait方法處會立阻塞;一直等到所有線程都執行到wait方法處,所有線程再繼續執行。參見下例:

import time
import threading

bar = threading.Barrier(3)  # 創建barrier對象,指定滿足3個線程

def worker1():
    print("worker1")
    time.sleep(1)
    bar.wait()
    print("worker1 end")

def worker2():
    print("worker2")
    time.sleep(2)
    bar.wait()
    print("worker2 end")

def worker3():
    print("worker3")
    time.sleep(5)
    bar.wait()
    print("worker3 end")


thread_list = []
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t3 = threading.Thread(target=worker3)
thread_list.append(t1)
thread_list.append(t2)
thread_list.append(t3)

for t in thread_list:
    t.start()

# 每個線程中都調用了wait()方法,在所有(此處設置為3)線程調用wait方法之前是阻塞的。
# 也就是說,只有等到3個線程都執行到了wait方法這句時,所有線程才繼續執行。  

上例中,可以看到,所有線程會先各自運行wait()方法之前的代碼,到wait()處阻塞。等待最后一個線程執行到wait()處,也就是5秒之后,所有線程恢復執行。

9 線程間通信

兩個或多個線程之間相互發送數據最安全的方式可能就是使用 queue 庫中的隊列了。創建一個線程共享的 Queue 對象,線程通過使用 put()和 get()操作來向隊列中添加或者刪除元素。Queue對象已經內置了鎖機制,編程時不必手動操作鎖。下例producer()函數代表包子鋪,生產包子放入隊列中;consumer()函數代表吃包子的人,不斷從隊列中取出包子吃掉;以此演示線程間通過隊列通信。

from queue import Queue
import threading
import time

q = Queue(10)

def producer():
    n = 0
    while True:
        q.put("包子%s" % n)
        print("包子鋪生產 包子%s" % n)
        n += 1
        time.sleep(2)

def consumer():
    while True:
        r = q.get()
        print("bucker 吃掉 %s" % r)
        time.sleep(1)

t1 = threading.Thread(target=producer)
t1.start()
t2 = threading.Thread(target=consumer)
t2.start()  

形如上例的編程模型,又叫生產者-消費者模型。它降低了程序之前的耦合,使得隊列的上游只關注生產數據,隊列的下游只關注消費數據。在票務系統,或者資源有限的情況中可用此模型。補充兩點:

(1)get() 和 put() 方法都支持非阻塞方式和設定超時。
(2)q.qsize() , q.full() , q.empty() 等可以獲取一個隊列的當前大小和狀態。但它們不是線程安全的,盡量別用

10 線程池

Python3.2開始,增加了標准庫concurrent.futures,該庫中的ThreadPoolExecutor是自帶的線程池。簡單使用:

from concurrent.futures import ThreadPoolExecutor
import time

def tell(i):
    print("this is tread {}.".format(i))
    time.sleep(1)

if __name__ == '__main__':
    future = ThreadPoolExecutor(10)
    a = "ddd"
    for i in range(100):
        future.submit(tell, (i,))   # 添加一個線程到線程池
    future.shutdown(wait=True)      # 此函數用於釋放異步執行操作后的系統資源。

其中,submit()方法第一個參數為函數名,第二個為函數的參數。shutdown(wait=True)用於釋放異步執行操作后的系統資源。ThreadPoolExecutor還有一個優點就是:任務提交者更方便的從被調用函數中獲取返回值。參見下例:

import concurrent.futures
import requests

URLS = ['http://www.cnblogs.com/zingp/p/5878330.html',
        'http://www.cnblogs.com/zingp/',
        'https://docs.python.org/']

# 爬取網頁內容
def load_url(url, timeout):
    with requests.get(url, timeout=timeout) as conn:
        return conn.text

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # 創建future對象和對應的url的字典
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as err:
            print('url:%s -- err: %s' % (url, err))
        else:
            print(url, len(data))
            
# http://www.cnblogs.com/zingp/ 12391
# http://www.cnblogs.com/zingp/p/5878330.html 90029
# https://docs.python.org/ 9980   

上例創建一個大小為3的線程池,用了不少with語句,並用future.result() 獲取函數返回值。最終,我們看到爬取了三個網頁,並獲得網頁內容。future.result()操作會阻塞,直到對應的函數執行完成並返回一個結果。

此外,ThreadPoolExecutor還提供了異步回調的功能,大大簡化了多線程編程中處理線程返回結果的難度,參見下例:

from concurrent.futures import ThreadPoolExecutor
import time
import threading

def tell(i):
    print("this is tread {}.".format(i))
    time.sleep(1)
    return [i, threading.get_ident()]   # 必須有返回,通過.result()拿到返回值

def callback(obj): 
    # obj 相當於傳過來的future獨享,且回調函數必須有這個參數
    result = obj.result()    # 線程函數的返回值
    print(result)

if __name__ == '__main__':
    future = ThreadPoolExecutor(10)
    a = "ddd"
    for i in range(100):
        # 線程運行結束后將future對象傳給回調函數callback(obj)
        future.submit(tell, i,).add_done_callback(callback)   
    future.shutdown(wait=True)      # 此函數用於釋放異步執行操作后的系統資源。

Python3.2以前並沒有自帶線程池,那時往往采用自定義線程池。下面一個就是自定義線程池的例子,看看是否能夠看得懂:

import queue
import threading
import contextlib

StopEvent = object()

class ThreadPool(object):
    """定義一個線程池類。"""

    def __init__(self, max_num, max_task_num=None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務。
        :param func: 任務函數;
        :param args: 任務函數所需參數;
        :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;
                            2、任務函數返回值(默認為None,即:不執行回調函數);
        :return: 如果線程池已經終止,則返回True否則None。
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        創建一個線程。
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數。
        """
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:
            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:
            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完所有的任務后,所有線程停止。
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        無論是否還有任務,終止線程。
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數。
        """
        state_list.append(worker_thread)
        try:
            # 遇到yield就返回回去執行with中的語句,執行完了回來。
            yield
        finally:
            state_list.remove(worker_thread)  

創建大的線程池的一個可能需要關注的問題是內存的使用。 例如,如果你在OS X系統上面創建2000個線程,系統顯示Python進程使用了超過9GB的虛擬內存。 不過,這個計算通常是有誤差的。當創建一個線程時,操作系統會預留一個虛擬內存區域來 放置線程的執行棧(通常是8MB大小)。但是這個內存只有一小片段被實際映射到真實內存中。 因此,Python進程使用到的真實內存其實很小 (比如,對於2000個線程來講,只使用到了70MB的真實內存,而不是9GB)。如果擔心虛擬內存大小,可以使用 threading.stack_size() 函數來降低它。

import threading
threading.stack_size(65536)

如果加上這條語句並再次運行前面的創建2000個線程試驗, 會發現Python進程只使用到了大概210MB的虛擬內存,而真實內存使用量沒有變。 注意線程棧大小必須至少為32768字節,通常是系統內存頁大小(4096、8192等)的整數倍。

11 補充幾個概念

同步的定義是:在發出一個功能調用時,在沒有得到結果之前,該調用就不返回,同時其它線程也不能調用這個方法。按照這個定義,其實絕大多數函數都是同步調用。
但是通常說進程、線程同步,往往特指多進程、線程編程時,多個進程、線程之間協同步調,按預定的先后次序進行運行。比如線程A和線程B一起配合,A執行到一定程度依賴B的某個結果,於是停下來示意B運行,B開始執行,執行完將結果返回給A,A接着執行。這里的“同”應該理解為協同、協助、互相配合。
在多線程編程里面,一些敏感數據不允許被多個線程同時訪問,此時就使用同步訪問技術,保證數據在任何時刻,最多有一個線程訪問,以保證數據的完整性。

原語:前文提及原語,很多同學可能不了解這個名詞的意思。內核或微核提供核外調用的過程或函數稱為原語(primitive)。操作系統用語范疇。是由若干多機器指令構成的完成某種特定功能的一段程序,具有不可分割性。即原語的執行必須是連續的,在執行過程中不允許被中斷。不同層次之間對話的語言稱為原語,即不同層之間通過原語來實現信息交換。

 

12 小結與討論

(1)Python多線程編程常用threading模塊。啟動一個多線程需要創建一個Thread對象,調用star()方法啟動線程。注意is_alive() /join()方法和daemon參數的使用。
(2)python多線程鎖有Lock / Rlock, 全局鎖GIL。GIL是CPython特性,同一時刻只能運行一個線程,不能利用多核資源。
(3)線程同步原語有Event / Condition / Semaphore / Barrier。Event用於常用語通知全部線程,condition和Semapher常用於通知一定數量的線程, Barrier用於多個線程必須完成某些步驟再一起執行。
(4)Lock / Rlock / Event / Condition / Semaphore 支持上下文管理協議(with語句,好用)。
(5)線程間通信可以用queue模塊中的Queue隊列,get()和put()已加鎖,是線程安全的。qsize()/full()/empty() 等可以獲取一個隊列的當前大小和狀態, 不是線程安全的,盡量別用。
(6)concurrent.futures中的ThreadPoolExecutor是Python3.2之后自帶的線程池模塊,十分好用,支持with語句,通過future.result()獲取線程返回值。
(7)Python多線程適用於I/O密集型問題,CPU密集型問題可以用C代碼優化底層算法提升性能,需注意一個寫的不好的C語言擴展會導致這個問題更加嚴重;也可以用pypy或者多進程。

以上是本篇全部內容,歡迎讀者批評指正。

參考資料:

threading 官方文檔

concurrent.futures 官方文檔

Python3-cookbook 中文文檔

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM