python多線程與threading模塊


python多線程與_thread模塊 中介紹了線程的基本概念以及_thread模塊的簡單示例。然而,_thread模塊過於簡單,使得我們無法用它來准確地控制線程,本文介紹threading模塊,它提供了更強大的多線程管理方案。

 

threading模塊的對象

Thread  表示一個執行線程的對象 

Lock  鎖原語

RLock  可重入鎖對象,使單一線程可以再次獲得已持有的鎖(遞歸鎖)

Condition  條件變量對象,使得一個線程等待另一個線程滿足特定條件

Event  條件變量的通用版本,任意數量的線程等待某個事件的發生,該事件發生后所有線程將被激活

Semaphore  為線程間的共享資源提供了一個計數器,如果沒有可用資源時會被阻塞

BoundedSemaphone  與Semaphore相似,不過它不允許超過初始值

Timer  與Thread相似,不過運行前要等待一段時間

Barrier  創建一個”障礙“,必須要達到指定數量的線程才能繼續

Thread對象

Thread類表示在單獨的控制線程中運行的活動。

主要方法:

threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

//target是將被run()方法調用的可調用對象。默認為None,表示不調用任何東西。

//name是線程的名字。默認情況下,以“Thread-N”的形式構造一個唯一的名字,N是一個小的十進制整數。

//args是給調用目標的參數元組。默認為()

//kwargs是給調用目標的關鍵字參數的一個字典。默認為{}

//daemon表示是否為守護線程

 

start()  開始執行線程  

run()  //定義線程功能

join(timeout=None)  //直至啟動的線程之前一直掛起,除非給出timeout時間,否則一直阻塞

gerName()  //返回線程名

屬性:

name  //線程名

ident  //線程標識符

daemon  //是否為守護線程

 

Threading模塊主要函數

threading.active_count()  // 返回當前處於alive狀態的Thread對象的個數。返回的數目等於enumerate()返回的列表的長度。
threading.current_thread()  // 返回當前的Thread對象,對應於調用者控制的線程。
threading.get_ident()  // 返回當前線程的'線程標識符'。它是一個非零的整數。它的價值沒有直接的意義。
threading.enumerate()  // 返回當前活着的Thread對象的列表。該列表包括守護線程、由current_thread()創建的虛假線程對象和主線程。它不包括已終止的線程和尚未開始的線程。
threading.main_thread()  // 返回主 Thread 對象。在正常情況下,主線程是從 Python 解釋器中啟動的線程。
守護線程

如果把一個線程設置為守護線程,就表示這個線程是不重要的,進程退出時不需要等待這些線程執行完成。

執行如下賦值語句可以將一個線程設置為守護線程。thread.daemon=True。

一個新線程會繼承父線程的守護標記。 

使用鎖

鎖有兩種狀態:鎖定和未鎖定。同時也只支持兩種函數,獲得鎖和釋放鎖。

lock.acquire()  //獲取鎖

lock.release()  //釋放鎖

當多線程競爭時,允許第一個獲得鎖的線程進入臨界區,執行相應代碼。所有之后到達的線程將被阻塞,直到第一個線程執行結束,退出臨界區,釋放鎖。此時,等待的線程可以獲得鎖並進入臨界區,哪個正在等待的線程獲取鎖是隨機的

 1 #!/usr/bin/env python3
 2 # coding:utf-8
 3 from atexit import register
 4 from random import randrange
 5 from threading import Thread, current_thread, Lock
 6 from time import sleep, ctime
 7 
 8 
 9 class CleanOutputSet(set):
10     def __str__(self):    //改變輸出格式
11         return ', '.join(x for x in self)
12 
13 
14 #lock = Lock()
15 loops = (randrange(2, 5) for x in range(randrange(3, 7)))
16 remaining = CleanOutputSet()
17 
18 
19 def loop(nsec):
20     myname = current_thread().name    //獲取當前線程名
21     remaining.add(myname)    //添加到集合
22     print('[{0}] Start {1}'.format(ctime(), myname))
23     sleep(nsec)
24     remaining.remove(myname)
25     print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nsec))
26     print('  (remaining: {0})'.format(remaining or 'None'))
27 
28 
29 def main():
30     for pause in loops:
31         Thread(target=loop, args=(pause,)).start()
32 
33 
34 @register
35 def _atexit():
36     print('all DONE at:', ctime())
37 
38 
39 if __name__ == '__main__':
40     main()
不使用鎖的情況
 1 [Wed Jan 24 20:41:48 2018] Start Thread-1
 2 [Wed Jan 24 20:41:48 2018] Start Thread-2
 3 [Wed Jan 24 20:41:48 2018] Start Thread-3
 4 [Wed Jan 24 20:41:48 2018] Start Thread-4
 5 [Wed Jan 24 20:41:48 2018] Start Thread-5
 6 [Wed Jan 24 20:41:48 2018] Start Thread-6
 7 [Wed Jan 24 20:41:50 2018] Completed Thread-3 (2 secs)
 8   (remaining: Thread-6, Thread-1, Thread-2, Thread-5, Thread-4)
 9 [Wed Jan 24 20:41:50 2018] Completed Thread-4 (2 secs)
10   (remaining: Thread-6, Thread-1, Thread-2, Thread-5)
11 [Wed Jan 24 20:41:51 2018] Completed Thread-5 (3 secs)
12   (remaining: Thread-6, Thread-1, Thread-2)
13 [Wed Jan 24 20:41:52 2018] Completed Thread-1 (4 secs)
14 [Wed Jan 24 20:41:52 2018] Completed Thread-6 (4 secs)
15   (remaining: Thread-2)
16   (remaining: Thread-2)
17 [Wed Jan 24 20:41:52 2018] Completed Thread-2 (4 secs)
18   (remaining: None)
19 all DONE at: Wed Jan 24 20:41:52 2018
輸出結果

輸出結果中可以看到,多個線程並行I/O導致結果混亂。I/O和訪問相同的數據結構都屬於臨界區,因此需要用鎖來防止多個線程同時進入臨界區。

 1 #!/usr/bin/env python3
 2 # coding:utf-8
 3 from atexit import register
 4 from random import randrange
 5 from threading import Thread, current_thread, Lock
 6 from time import sleep, ctime
 7 
 8 
 9 class CleanOutputSet(set):
10     def __str__(self):  # 改變輸出格式
11         return ', '.join(x for x in self)
12 
13 
14 lock = Lock()
15 loops = (randrange(2, 5) for x in range(randrange(3, 7)))
16 remaining = CleanOutputSet()
17 
18 
19 def loop(nsec):
20     myname = current_thread().name   # 獲取當前線程名
21     lock.acquire()
22     remaining.add(myname)   # 加入集合
23     print('[{0}] Start {1}'.format(ctime(), myname))
24     lock.release()
25     sleep(nsec)
26     lock.acquire()
27     remaining.remove(myname)
28     print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nsec))
29     print('  (remaining: {0})'.format(remaining or 'None'))
30     lock.release()
31 
32 
33 def main():
34     for pause in loops:
35         Thread(target=loop, args=(pause,)).start()
36 
37 
38 @register
39 def _atexit():
40     print('all DONE at:', ctime())
41 
42 
43 if __name__ == '__main__':
44     main()
使用鎖
 1 [Wed Jan 24 20:51:11 2018] Start Thread-1
 2 [Wed Jan 24 20:51:11 2018] Start Thread-2
 3 [Wed Jan 24 20:51:11 2018] Start Thread-3
 4 [Wed Jan 24 20:51:11 2018] Start Thread-4
 5 [Wed Jan 24 20:51:11 2018] Start Thread-5
 6 [Wed Jan 24 20:51:13 2018] Completed Thread-2 (2 secs)
 7   (remaining: Thread-3, Thread-1, Thread-4, Thread-5)
 8 [Wed Jan 24 20:51:14 2018] Completed Thread-3 (3 secs)
 9   (remaining: Thread-1, Thread-4, Thread-5)
10 [Wed Jan 24 20:51:15 2018] Completed Thread-4 (4 secs)
11   (remaining: Thread-1, Thread-5)
12 [Wed Jan 24 20:51:15 2018] Completed Thread-1 (4 secs)
13   (remaining: Thread-5)
14 [Wed Jan 24 20:51:15 2018] Completed Thread-5 (4 secs)
15   (remaining: None)
16 all DONE at: Wed Jan 24 20:51:15 2018
輸出結果

使用信號量

鎖只能控制臨界區的訪問,面對一些更復雜的情況就無能為力了。比如,讀者與寫者共享一段8單位長的緩沖區,讀者每次取走1單位,寫者每次產生1單位。使用鎖顯然無法解決,這就用到信號量。

信號量時最古老的原語之一。它是一個計數器,當資源消耗時遞減,資源釋放時遞增。

threading.Semaphore(value=1)  //構造函數(也可使用threading.BoundedSemaphore(value=1),value可自行設置,BoundedSemaphore創建的信號量不可超過初值,超過將拋出ValueError)

semaphore.acquire(block=true,timeout=None)  //資源消耗,block為true時,若無法獲得資源,將阻塞timeout時間等待其他線程釋放資源。block為false時,無法獲得資源將拋出ValueError

semaphore.release()  //資源增加

 1 #!/usr/bin/env python3
 2 # coding:utf-8
 3 from atexit import register
 4 from random import randrange
 5 from threading import BoundedSemaphore, Lock, Thread
 6 from time import sleep, ctime
 7 
 8 lock = Lock()
 9 MAX = 5
10 candytray = BoundedSemaphore(MAX)
11 
12 
13 def refill():
14     lock.acquire()
15     print('Refilling candy...')
16     try:
17         candytray.release()
18     except ValueError:
19         print('full, skipping')
20     else:
21         print('OK')
22     lock.release()
23 
24 
25 def buy():
26     lock.acquire()
27     print('Buying candy...')
28     if candytray.acquire(False):
29         print('OK')
30     else:
31         print('empty, skipping')
32     lock.release()
33 
34 
35 def producer(loops):
36     for i in range(loops):
37         refill()
38         sleep(randrange(3))
39 
40 
41 def consumer(loops):
42     for i in range(loops):
43         buy()
44         sleep(randrange(3))
45 
46 
47 def main():
48     print('starting at:', ctime())
49     nloops = randrange(2, 6)
50     print('THE CANDY MACHINE (full with {0} bars)'.format(MAX))
51     Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start()
52     Thread(target=producer, args=(nloops,)).start()
53 
54 
55 @register
56 def _atexit():
57     print('all DONE at:', ctime())
58 
59 
60 if __name__ == '__main__':
61     main()
使用信號量

以上為糖果機信號量示例。糖果機5個槽,refill()函數添加糖果,buy()函數買走糖果。這兩個函數對型號量的作用是相反的。

starting at: Wed Jan 24 21:16:00 2018
THE CANDY MACHINE (full with 5 bars)
Buying candy...
OK
Refilling candy...
OK
Refilling candy...
full, skipping
Buying candy...
OK
Refilling candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
empty, skipping
Buying candy...
empty, skipping
all DONE at: Wed Jan 24 21:16:05 2018
輸出結果

使用Condition

可以把Condiftion理解為一把高級的瑣,它提供了比Lock, RLock更高級的功能,允許我們能夠控制復雜的線程同步問題。threadiong.Condition在內部維護一個瑣對象(默認是RLock),可以在創建Condigtion對象的時候把瑣對象作為參數傳入。Condition也提供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的調用內部瑣對象的對應的方法而已。Condition還提供了如下方法(特別要注意:這些方法只有在占用瑣(acquire)之后才能調用,否則將會報RuntimeError異常。):

Condition.wait([timeout])  //wait方法釋放內部所占用的瑣,同時線程被掛起,直至接收到通知被喚醒或超時(如果提供了timeout參數的話)。當線程被喚醒並重新占有瑣的時候,程序才會繼續執行下去。

Condition.notify()  //喚醒一個掛起的線程(如果存在掛起的線程)。注意:notify()方法不會釋放所占用的瑣。

Condition.notify_all()

Condition.notifyAll()  //喚醒所有掛起的線程(如果存在掛起的線程)。注意:這些方法不會釋放所占用的瑣。

 

 1 from threading import Condition, Thread
 2 from time import ctime
 3 
 4 
 5 con = Condition()
 6 
 7 
 8 def get_odd():
 9     count = 3
10     while(True):
11         con.acquire(timeout=1)
12         print(1)
13         count -= 1
14         if count is 0:
15             break
16         con.notify()
17         con.wait()
18 
19 
20 def get_even():
21     count = 3
22     while(True):
23         con.acquire(timeout=1)
24         print(2)
25         count -= 1
26         if count is 0:
27             break
28         con.notify()
29         con.wait()
30 
31 
32 
33 def main():
34     print('start at:', ctime())
35     Thread(target=get_odd, name='').start()
36     Thread(target=get_even, name='').start()
37 
38 
39 if __name__ == '__main__':
40     main()
View Code

 

創建線程的三種方法

以生產者消費者模型為例,緩沖區大小為5,開始為空。生產者產生一個數置入緩沖區,消費者取走一個數。生產者運行10次,消費者運行5次 

方法一:創建Thread實例,並傳給它一個函數(包括函數參數)

 1 #!/usr/bin/env python3
 2 # coding:utf-8'
 3 
 4 from atexit import register
 5 from time import ctime
 6 from random import randint
 7 from threading import BoundedSemaphore, Semaphore, Lock, Thread
 8 
 9 lock = Lock()
10 buffer = [0 for x in range(5)]
11 MAX = len(buffer)
12 sem_full = Semaphore(0)
13 sem_empty = BoundedSemaphore(MAX)
14 
15 
16 def producer():
17     sem_empty.acquire()
18     lock.acquire()
19     print("produce a number")
20     for i in range(len(buffer)):
21         if buffer[i] is 0:
22             buffer[i] = randint(1, 10)
23             break
24     print(buffer)
25     sem_full.release()
26     lock.release()
27 
28 
29 def consumer():
30     sem_full.acquire()
31     lock.acquire()
32     print("consume a number")
33     for i in range(len(buffer)):
34         if buffer[i] is not 0:
35             buffer[i] = 0
36             break
37     print(buffer)
38     sem_empty.release()
39     lock.release()
40 
41 
42 def main():
43     print('starting at:', ctime())
44     for i in range(5):
45         Thread(target=consumer, args='').start()
46         Thread(target=producer, args='').start()
47         Thread(target=producer, args='').start()
48 
49 
50 @register
51 def _atexit():
52     print("all DOne at:", ctime())
53 
54 
55 if __name__ == '__main__':
56     main()

方法二:創建Thread實例,傳給它一個可調用的類實例(將調用__call__方法)

 1 #!/usr/bin/env python3
 2 # coding:utf-8
 3 
 4 from atexit import register
 5 from time import ctime
 6 from random import randint
 7 from threading import BoundedSemaphore, Semaphore, Lock, Thread
 8 
 9 lock = Lock()
10 buffer = [0 for x in range(5)]
11 MAX = len(buffer)
12 sem_full = Semaphore(0)
13 sem_empty = BoundedSemaphore(MAX)
14 
15 
16 class ThreadFunc(object):
17 
18     def __init__(self, func, args, name=""):
19         self.name = name
20         self.func = func
21         self.args = args
22 
23     def __call__(self, *args, **kwargs):
24         self.func(*self.args)
25 
26 
27 def producer():
28     sem_empty.acquire()
29     lock.acquire()
30     print("produce a number")
31     for i in range(len(buffer)):
32         if buffer[i] is 0:
33             buffer[i] = randint(1, 10)
34             break
35     print(buffer)
36     sem_full.release()
37     lock.release()
38 
39 
40 def consumer():
41     sem_full.acquire()
42     lock.acquire()
43     print("consume a number")
44     for i in range(len(buffer)):
45         if buffer[i] is not 0:
46             buffer[i] = 0
47             break
48     print(buffer)
49     sem_empty.release()
50     lock.release()
51 
52 
53 threads = []
54 
55 
56 def main():
57     print('starting at:', ctime())
58     for i in range(5):
59         c = Thread(target=ThreadFunc(consumer, '', consumer.__name__))
60         threads.append(c)
61         p1 = Thread(target=ThreadFunc(producer, '', producer.__name__))
62         threads.append(p1)
63         p2 = Thread(target=ThreadFunc(producer, '', producer.__name__))
64         threads.append(p2)
65 
66     for i in range(len(threads)):
67         threads[i].start()
68 
69     for i in range(len(threads)):
70         threads[i].join()
71 
72 
73 @register
74 def _atexit():
75     print("all DOne at:", ctime())
76 
77 
78 if __name__ == '__main__':
79     main()

方法三:派生Thread子類,創建子類的實例(重寫run方法)

 1 #!/usr/bin/env python3
 2 # coding:utf-8
 3 
 4 from atexit import register
 5 from time import ctime
 6 from random import randint
 7 from threading import BoundedSemaphore, Semaphore, Lock, Thread
 8 
 9 lock = Lock()
10 buffer = [0 for x in range(5)]
11 MAX = len(buffer)
12 sem_full = Semaphore(0)
13 sem_empty = BoundedSemaphore(MAX)
14 
15 
16 class MyThread(Thread):
17 
18     def __init__(self, func, args, name=""):
19         Thread.__init__(self)
20         self.name = name
21         self.func = func
22         self.args = args
23 
24     def run(self):
25         self.func(*self.args)
26 
27 
28 def producer():
29     sem_empty.acquire()
30     lock.acquire()
31     print("produce a number")
32     for i in range(len(buffer)):
33         if buffer[i] is 0:
34             buffer[i] = randint(1, 10)
35             break
36     print(buffer)
37     sem_full.release()
38     lock.release()
39 
40 
41 def consumer():
42     sem_full.acquire()
43     lock.acquire()
44     print("consume a number")
45     for i in range(len(buffer)):
46         if buffer[i] is not 0:
47             buffer[i] = 0
48             break
49     print(buffer)
50     sem_empty.release()
51     lock.release()
52 
53 
54 threads = []
55 
56 
57 def main():
58     print('starting at:', ctime())
59     for i in range(5):
60         c = MyThread(consumer, '', consumer.__name__)
61         threads.append(c)
62         p1 = MyThread(producer, '', producer.__name__)
63         threads.append(p1)
64         p2 = MyThread(producer, '', producer.__name__)
65         threads.append(p2)
66 
67     for i in range(len(threads)):
68         threads[i].start()
69 
70     for i in range(len(threads)):
71         threads[i].join()
72 
73 
74 @register
75 def _atexit():
76     print("all DOne at:", ctime())
77 
78 
79 if __name__ == '__main__':
80     main()

輸出結果

starting at: Wed Jan 24 22:25:05 2018
produce a number
[7, 0, 0, 0, 0]
produce a number
[7, 9, 0, 0, 0]
consume a number
[0, 9, 0, 0, 0]
produce a number
[7, 9, 0, 0, 0]
consume a number
[0, 9, 0, 0, 0]
consume a number
[0, 0, 0, 0, 0]
produce a number
[9, 0, 0, 0, 0]
consume a number
[0, 0, 0, 0, 0]
produce a number
[9, 0, 0, 0, 0]
produce a number
[9, 5, 0, 0, 0]
produce a number
[9, 5, 6, 0, 0]
produce a number
[9, 5, 6, 10, 0]
consume a number
[0, 5, 6, 10, 0]
produce a number
[7, 5, 6, 10, 0]
produce a number
[7, 5, 6, 10, 2]
all DOne at: Wed Jan 24 22:25:05 2018
View Code

其他未提及的對象及方法可參考python標准庫

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM