Python 並發編程(一)之線程


常用用法

t.is_alive()

Python中線程會在一個單獨的系統級別線程中執行(比如一個POSIX線程或者一個Windows線程)
這些線程將由操作系統來全權管理。線程一旦啟動,將獨立執行直到目標函數返回。可以通過查詢
一個線程對象的狀態,看它是否還在執行t.is_alive()

t.join()

可以把一個線程加入到當前線程,並等待它終止
Python 解釋器在所有線程都終止后才繼續執行代碼剩余的部分

daemon

對於需要長時間運行的線程或者需要一直運行的后台任務,可以用后台線程(也稱為守護線程)

例:
t = Thread(target = func, args(1,), daemon = True)
t.start()

后台線程無法等待,這些線程會在主線程終止時自動銷毀

小結:
后台線程無法等待,不過,這些線程會在主線程終止時自動銷毀。你無法結束一個線程,無法給它發送信
號,無法調整它的調度,也無法執行其他高級操作。如果需要這些特性,你需要自己添加。比如說,
如果你需要終止線程,那么這個線程必須通過編程在某個特定點輪詢來退出

如果線程執行一些像 I/O 這樣的阻塞操作,那么通過輪詢來終止線程將使得線程之間的協調變得非常棘手。
比如,如果一個線程一直阻塞在一個 I/O 操作上,它就永遠無法返回,也就無法檢查自己是否已經被結束了。
要正確處理這些問題,需要利用超時循環來小心操作線程。

線程間通信

queue

一個線程向另外一個線程發送數據最安全的方式應該就是queue庫中的隊列
先看一下使用例子,這里是一個簡單的生產者和消費者模型:

 1 from queue import Queue
 2 from threading import Thread
 3 import random
 4 import time
 5 
 6 
 7 _sentinel = object()
 8 
 9 
10 def producer(out_q):
11     n = 10
12     while n:
13         time.sleep(1)
14         data = random.randint(0, 10)
15         out_q.put(data)
16         print("生產者生產了數據{0}".format(data))
17         n -= 1
18     out_q.put(_sentinel)
19 
20 
21 def consumer(in_q):
22     while True:
23         data = in_q.get()
24         print("消費者消費了{0}".format(data))
25         if data is _sentinel:
26             in_q.put(_sentinel)
27             break
28 
29 
30 q = Queue()
31 t1 = Thread(target=consumer, args=(q,))
32 t2 = Thread(target=producer, args=(q,))
33 
34 t1.start()
35 t2.start()

上述代碼中設置了一個特殊值_sentinel用於當獲取到這個值的時候終止執行
關於queue的功能有個需要注意的地方:
Queue對象雖然已經包含了必要的鎖,主要有q.put和q.get
而q.size(),q.full(),q.empty()等方法不是線程安全的

使用隊列進行線程通信是一個單向、不確定的過程。通常情況下,是沒有辦法知道接收數據的線程是什么時候接收到的數據並開始工作的。但是隊列提供了一些基本的特性:q.task_done()和q.join()

如果一個線程需要在另外一個線程處理完特定的數據任務后立即得到通知,可以把要發送的數據和一個Event放到一起使用

關於線程中的Event

線程有一個非常關鍵的特性:每個線程都是獨立運行的,且狀態不可預測
如果程序中的其他線程需要通過判斷每個線程的狀態來確定自己下一步的操作,這時線程同步問題就會比較麻煩。
解決方法:
使用threading庫中的Event
Event對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。
在初始化狀態下,event對象中的信號標志被設置為假。
如果有線程等待一個event對象,而這個event的標志為假,這個線程將一直被阻塞知道該標志為真。
一個線程如果把event對象的標志設置為真,就會喚醒所有等待這個event對象的線程。

通過一個代碼例子理解:

 1 from threading import Thread, Event
 2 import time
 3 
 4 
 5 def countdown(n, started_evt):
 6     print("countdown starting")
 7     # set將event的標識設置為True
 8     started_evt.set()
 9     while n > 0:
10         print("T-mins", n)
11         n -= 1
12         time.sleep(2)
13 
14 # 初始化的started_evt為False
15 started_evt = Event()
16 print("Launching countdown")
17 t = Thread(target=countdown, args=(10, started_evt,))
18 t.start()
19 # 會一直等待直到event的標志為True的時候
20 started_evt.wait()
21 print("countdown is running")

而結果,我們也可以看出當線程執行了set之后,才打印running

實際用event對象最好是單次使用,創建一個event對象,讓某個線程等待這個對象,一旦對象被設置為Tru,就應該丟棄它,我們雖然可以通過clear()方法重置event對象,但是這個沒法確保安全的清理event對象並對它進行重新的賦值。會發生錯過事件,死鎖等各種問題。
event對象的一個重要特點是它被設置為True時會喚醒所有等待它的線程,如果喚醒單個線程的最好用Condition或信號量Semaphore

和event功能類似的線程中還有一個Condition

關於線程中的Condition

關於Condition官網的一段話:
A condition variable is always associated with some kind of lock; this can be passed in or one will be created by default. Passing one in is useful when several condition variables must share the same lock. The lock is part of the condition object: you don’t have to track it separately.

Other methods must be called with the associated lock held. The wait() method releases the lock, and then blocks until another thread awakens it by calling notify() or notify_all(). Once awakened, wait() re-acquires the lock and returns. It is also possible to specify a timeout.

但是需要注意的是: 
notify() and notify_all()這兩個方法,不會釋放鎖,這意味着線程或者被喚醒的線程不會立刻執行wait()

我們可以通過Conditon對象實現一個周期定時器的功能,每當定時器超時的時候,其他線程都可以檢測到,代碼例子如下:

 1 import threading
 2 import time
 3 
 4 
 5 class PeriodicTimer:
 6     """
 7     這里做了一個定時器
 8     """
 9 
10     def __init__(self, interval):
11         self._interval = interval
12         self._flag = 0
13         self._cv = threading.Condition()
14 
15     def start(self):
16         t = threading.Thread(target=self.run)
17         t.daemon = True
18         t.start()
19 
20     def run(self):
21         while True:
22             time.sleep(self._interval)
23             with self._cv:
24                 # 這個點還是非常有意思的^=
25                 self._flag ^= 1
26                 self._cv.notify_all()
27 
28     def wait_for_tick(self):
29         with self._cv:
30             last_flag = self._flag
31 
32             while last_flag == self._flag:
33                 self._cv.wait()
34 
35 
36 # 下面兩個分別為兩個需要定時執行的任務
37 def countdown(nticks):
38     while nticks > 0:
39         ptimer.wait_for_tick()
40         print('T-minus', nticks)
41         nticks -= 1
42 
43 
44 def countup(last):
45     n = 0
46     while n < last:
47         ptimer.wait_for_tick()
48         print('Counting', n)
49         n += 1
50 
51 
52 ptimer = PeriodicTimer(5)
53 ptimer.start()
54 
55 threading.Thread(target=countdown, args=(10,)).start()
56 threading.Thread(target=countup, args=(5,)).start()

關於線程中鎖的使用

要在多線程中安全使用可變對象,需要使用threading庫中的Lock對象
先看一個關於鎖的基本使用:

 1 import threading
 2 
 3 
 4 class SharedCounter:
 5 
 6     def __init__(self, initial_value=0):
 7         self._value = initial_value
 8         self._value_lock = threading.Lock()
 9 
10 
11     def incr(self,delta = 1):
12         with self._value_lock:
13             self._value += delta
14 
15     def decr(self, delta=1):
16         with self._value_lock:
17             self._value -= delta

Lock對象和with語句塊一起使用可以保證互斥執行,這樣每次就只有一個線程可以執行with語句包含的代碼塊。with語句會在這個代碼快執行前自動獲取鎖,在執行結束后自動釋放所。

線程的調度本質上是不確定的,因此,在多線程程序中錯誤的使用鎖機制可能會導致隨機數據
損壞或者其他異常錯誤,我們稱之為競爭條件

你可能看到有些“老python程序員”
還是通過_value_lock.acquire() 和_value_lock.release(),明顯看來
還是with更加方便,不容易出錯,畢竟你無法保證那次就忘記釋放鎖了

為了避免死鎖,使用鎖機制的程序應該設定每個線程一次只能獲取一個鎖

threading庫中還提供了其他的同步原語:RLock,Semaphore對象。但是這兩個使用場景相對來說比較特殊
RLock(可重入鎖)可以被同一個線程多次獲取,主要用來實現基於檢測對象模式的鎖定和同步。在使用這種鎖的時候,當鎖被持有時,只有一個線程可以使用完整的函數或者類中的方法,例子如下:

 1 import threading
 2 
 3 
 4 class SharedCounter:
 5 
 6     _lock = threading.RLock()
 7 
 8     def __init__(self,initial_value=0):
 9         self._value = initial_value
10 
11     def incr(self,delta=1):
12 
13         with SharedCounter._lock:
14             self._value += delta
15 
16     def decr(self,delta=1):
17 
18         with SharedCounter._lock:
19             self.incr(-delta)

這個例子中的鎖是一個類變量,也就是所有實例共享的類級鎖,這樣就保證了一次只有一個線程可以調用這個類的方法。與標准鎖不同的是已經持有這個鎖的方法再調用同樣適用這個鎖的方法時,無需再次獲取鎖,例如上面例子中的decr方法。
這種方法的特點是:無論這個類有多少實例都使用一個鎖。因此在需要使用大量使用計數器的情況下內存效率更高。
缺點:在程序中使用大量線程並頻繁更新計數器時會有競爭用鎖的問題。

信號量對象是一個建立在共享計數器基礎上的同步原語,如果計數器不為0,with語句講計數器減1,
線程被允許執行。with語句執行結束后,計數器加1。如果計數器為0,線程將被阻塞,直到其他線程結束並將計數器加1。但是信號量不推薦使用,增加了復雜性,影響程序性能。
所以信號量更適用於哪些需要在線程之間引入信號或者限制的程序。例如限制一段代碼的並發量

 1 from threading import Semaphore
 2 import requests
 3 
 4 
 5 _fetch_url_sema = Semaphore(5)
 6 
 7 
 8 def fetch_url(url):
 9     with _fetch_url_sema:
10         return requests.get(url)

關於防止死鎖的加鎖機制

在多線程程序中,死鎖問題很大一部分是由於多線程同時獲取多個鎖造成的。
舉個例子:一個線程獲取一個第一個鎖,在獲取第二個鎖的時候發生阻塞,那么這個線程就可能阻塞其他線程執行,從而導致整個程序假死。

一種解決方法:為程序中每一個鎖分配一個唯一的id,然后只允許按照升序規則來使用多個鎖。

 1 import threading
 2 from contextlib import contextmanager
 3 
 4 # 存儲已經請求鎖的信息
 5 _local = threading.local()
 6 
 7 
 8 @contextmanager
 9 def acquire(*locks):
10     # 把鎖通過id進行排序
11     locks = sorted(locks, key=lambda x: id(x))
12 
13     acquired = getattr(_local, 'acquired', [])
14 
15     if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
16         raise RuntimeError("Lock order Violation")
17     acquired.extend(locks)
18     _local.acquired = acquired
19 
20     try:
21         for lock in locks:
22             lock.acquire()
23         yield
24     finally:
25         for lock in reversed(locks):
26             lock.release()
27         del acquired[-len(locks):]
28 
29 
30 x_lock = threading.Lock()
31 y_lock = threading.Lock()
32 
33 
34 def thread_1():
35     while True:
36         with acquire(x_lock,y_lock):
37             print("Thread-1")
38 
39 
40 def thread_2():
41     while True:
42         with acquire(y_lock,x_lock):
43             print("Thread-2")
44 
45 
46 t1 = threading.Thread(target=thread_1)
47 t1.daemon = True
48 t1.start()
49 
50 t2 = threading.Thread(target=thread_2)
51 t2.daemon = True
52 t2.start()

通過排序,不管以什么樣的順序來請求鎖,這些鎖都會按照固定的順序被獲取。
這里也用了thread.local()來保存請求鎖的信息
同樣的這個東西也可以用來保存線程的信息,而這個線程對其他的線程是不可見的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM