線程間通信
1.Queue
使用線程隊列有一個要注意的問題是,向隊列中添加數據項時並不會復制此數據項,線程間通信實際上是在線程間傳遞對象引用。如果你擔心對象的共享狀態,那你最好只傳遞不可修改的數據結構(如:整型、字符串或者元組)或者一個對象的深拷貝。
Queue 對象提供一些在當前上下文很有用的附加特性。比如在創建 Queue 對象時提供可選的 size 參數來限制可以添加到隊列中的元素數量。對於“生產者”與“消費者”速度有差異的情況,為隊列中的元素數量添加上限是有意義的。比如,一個“生產者”產生項目的速度比“消費者”“消費”的速度快,那么使用固定大小的隊列就可以在隊列已滿的時候阻塞隊列,以免未預期的連鎖效應擴散整個程序造成死鎖或者程序運行失常。在通信的線程之間進行“流量控制”是一個看起來容易實現起來困難的問題。如果你發現自己曾經試圖通過擺弄隊列大小來解決一個問題,這也許就標志着你的程序可能存在脆弱設計或者固有的可伸縮問題。 get() 和 put() 方法都支持非阻塞方式和設定超時。
import queue
q = queue.Queue()
try:
data = q.get(block=False)
except queue.Empty:
...
try:
q.put(item, block=False)
except queue.Full:
...
try:
data = q.get(timeout=5.0)
except queue.Empty:
...
def producer(q):
...
try:
q.put(item, block=False)
except queue.Full:
log.warning('queued item %r discarded!', item)
_running = True
def consumer(q):
while _running:
try:
item = q.get(timeout=5.0)
# Process item
...
except queue.Empty:
pass
最后,有 q.qsize() , q.full() , q.empty() 等實用方法可以獲取一個隊列的當前大小和狀態。但要注意,這些方法都不是線程安全的。可能你對一個隊列使用empty() 判斷出這個隊列為空,但同時另外一個線程可能已經向這個隊列中插入一個數據項。所以,你最好不要在你的代碼中使用這些方法。
為了避免出現死鎖的情況,使用鎖機制的程序應該設定為每個線程一次只允許獲取一個鎖。如果不能這樣做的話,你就需要更高級的死鎖避免機制。在 threading 庫中還提供了其他的同步原語,比如 RLock 和 Semaphore 對象。
Queue提供的方法:
task_done()
意味着之前入隊的一個任務已經完成。由隊列的消費者線程調用。每一個get()調用得到一個任務,接下來的task_done()調用告訴隊列該任務已經處理完畢。
如果當前一個join()正在阻塞,它將在隊列中的所有任務都處理完時恢復執行(即每一個由put()調用入隊的任務都有一個對應的task_done()調用)。
join()
阻塞調用線程,直到隊列中的所有任務被處理掉。
只要有數據被加入隊列,未完成的任務數就會增加。當消費者線程調用task_done()(意味着有消費者取得任務並完成任務),未完成的任務數就會減少。當未完成的任務數降到0,join()解除阻塞。
put(item[, block[, timeout]])
將item放入隊列中。
1.如果可選的參數block為True且timeout為空對象(默認的情況,阻塞調用,無超時)。
2.如果timeout是個正整數,阻塞調用進程最多timeout秒,如果一直無空空間可用,拋出Full異常(帶超時的阻塞調用)。
3.如果block為False,如果有空閑空間可用將數據放入隊列,否則立即拋出Full異常
其非阻塞版本為put_nowait等同於put(item, False)
get([block[, timeout]])
從隊列中移除並返回一個數據。block跟timeout參數同put方法
其非阻塞方法為get_nowait()相當與get(False)
empty()
如果隊列為空,返回True,反之返回False
同步機制
Event
線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其他線程需要通過斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用 threading 庫中的 Event 對象。Event 對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在初始情況下,event 對象中的信號標志被設置假。如果有線程等待一個 event 對象,而這個 event 對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個 event 對象的信號標志設置為真,它將喚醒所有等待個 event 對象的線程。如果一個線程等待一個已經被設置為真的 event 對象,那么它將忽略這個事件,繼續執行。
from threading import Thread, Event
import time
def countdown(n, start_evt):
print('countdown is starting...')
start_evt.set()
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
start_evt = Event() # 可通過Event 判斷線程的是否已運行
t = Thread(target=countdown, args=(10, start_evt))
t.start()
print('launching countdown...')
start_evt.wait() # 等待countdown執行
# event 對象的一個重要特點是當它被設置為真時會喚醒所有等待它的線程
print('countdown is running...')
Semaphore(信號量)
在多線程編程中,為了防止不同的線程同時對一個公用的資源(比如全部變量)進行修改,需要進行同時訪問的數量(通常是1)的限制。信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。
from threading import Semaphore, Lock, RLock, Condition, Event, Thread
import time
# 信號量
sema = Semaphore(3) #限制同時能訪問資源的數量為3
def foo(tid):
with sema:
print('{} acquire sema'.format(tid))
time.sleep(1)
print('{} release sema'.format(tid))
threads = []
for i in range(5):
t = Thread(target=foo, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
Lock(鎖)
互斥鎖為資源引入一個狀態:鎖定/非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。
#創建鎖
mutex = threading.Lock()
#鎖定
mutex.acquire([timeout])
#釋放
mutex.release()
RLock(可重入鎖)
為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。
import threading
import time
class MyThread(threading.Thread):
def run(self):
global num
time.sleep(1)
if mutex.acquire(1):
num = num+1
msg = self.name+' set num to '+str(num)
print msg
mutex.acquire()
mutex.release()
mutex.release()
num = 0
mutex = threading.RLock()
def test():
for i in range(5):
t = MyThread()
t.start()
if __name__ == '__main__':
test()
Condition(條件變量)
Condition被稱為條件變量,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。線程首先acquire一個條件變量,然后判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件后,通過notify方法通知其他線程,其他處於wait狀態的線程接到通知后會重新判斷條件。不斷的重復這一過程,從而解決復雜的同步問題。
可以認為Condition對象維護了一個鎖(Lock/RLock)和一個waiting池。線程通過acquire獲得Condition對象,當調用wait方法時,線程會釋放Condition內部的鎖並進入blocked狀態,同時在waiting池中記錄這個線程。當調用notify方法時,Condition對象會從waiting池中挑選一個線程,通知其調用acquire方法嘗試取到鎖。
Condition對象的構造函數可以接受一個Lock/RLock對象作為參數,如果沒有指定,則Condition對象會在內部自行創建一個RLock。
除了notify方法外,Condition對象還提供了notifyAll方法,可以通知waiting池中的所有線程嘗試acquire內部鎖。由於上述機制,處於waiting狀態的線程只能通過notify方法喚醒,所以notifyAll的作用在於防止有線程永遠處於沉默狀態。
import threading
import time
class Producer:
def run(self):
global count
while True:
if con.acquire():
if count > 1000:
con.wait()
else:
count += 100
msg = threading.current_thread().name + ' produce 100, count=' + str(count)
print(msg)
con.notify() # 通知 waiting線程池中的線程
con.release()
time.sleep(1)
count = 0
con = threading.Condition()
class Consumer:
def run(self):
global count
while True:
if con.acquire():
if count < 100:
con.wait()
else:
count -= 3
msg = threading.current_thread().name + ' consumer 3, count=' + str(count)
print(msg)
con.notify()
con.release()
time.sleep(3)
producer = Producer()
consumer = Consumer()