Python多線程學習


一、Python中的線程使用:

    Python中使用線程有兩種方式:函數或者用類來包裝線程對象。

1、  函數式:調用thread模塊中的start_new_thread()函數來產生新線程。如下例:

 1 import time  
 2 import thread  
 3 def timer(no, interval):  
 4     cnt = 0  
 5     while cnt<10:  
 6         print 'Thread:(%d) Time:%s\n'%(no, time.ctime())  
 7         time.sleep(interval)  
 8         cnt+=1  
 9     thread.exit_thread()  
10      
11    
12 def test(): #Use thread.start_new_thread() to create 2 new threads  
13     thread.start_new_thread(timer, (1,1))  
14     thread.start_new_thread(timer, (2,2))  
15    
16 if __name__=='__main__':  
17     test()  

上面的例子定義了一個線程函數timer,它打印出10條時間記錄后退出,每次打印的間隔由interval參數決定。thread.start_new_thread(function, args[, kwargs])的第一個參數是線程函數(本例中的timer方法),第二個參數是傳遞給線程函數的參數,它必須是tuple類型,kwargs是可選參數。

    線程的結束可以等待線程自然結束,也可以在線程函數中調用thread.exit()或thread.exit_thread()方法。

2、  創建threading.Thread的子類來包裝一個線程對象,如下例:

 1 import threading  
 2 import time  
 3 class timer(threading.Thread): #The timer class is derived from the class threading.Thread  
 4     def __init__(self, num, interval):  
 5         threading.Thread.__init__(self)  
 6         self.thread_num = num  
 7         self.interval = interval  
 8         self.thread_stop = False  
 9    
10     def run(self): #Overwrite run() method, put what you want the thread do here  
11         while not self.thread_stop:  
12             print 'Thread Object(%d), Time:%s\n' %(self.thread_num, time.ctime())  
13             time.sleep(self.interval)  
14     def stop(self):  
15         self.thread_stop = True  
16          
17    
18 def test():  
19     thread1 = timer(1, 1)  
20     thread2 = timer(2, 2)  
21     thread1.start()  
22     thread2.start()  
23     time.sleep(10)  
24     thread1.stop()  
25     thread2.stop()  
26     return  
27    
28 if __name__ == '__main__':  
29     test()  

第二種方式,即創建自己的線程類,必要時重寫threading.Thread類的方法,線程的控制可以由自己定制。

threading.Thread類的使用:

1,在自己的線程類的__init__里調用threading.Thread.__init__(self, name = threadname)

Threadname為線程的名字

2, run(),通常需要重寫,編寫代碼實現做需要的功能。

3,getName(),獲得線程對象名稱

4,setName(),設置線程對象名稱

5,start(),啟動線程

6,jion([timeout]),等待另一線程結束后再運行。

7,setDaemon(bool),設置子線程是否隨主線程一起結束,必須在start()之前調用。默認為False。

8,isDaemon(),判斷線程是否隨主線程一起結束。

9,isAlive(),檢查線程是否在運行中。

    此外threading模塊本身也提供了很多方法和其他的類,可以幫助我們更好的使用和管理線程。可以參看http://www.python.org/doc/2.5.2/lib/module-threading.html

 

 

假設兩個線程對象t1和t2都要對num=0進行增1運算,t1和t2都各對num修改10次,num的最終的結果應該為20。但是由於是多線程訪問,有可能出現下面情況:在num=0時,t1取得num=0。系統此時把t1調度為”sleeping”狀態,把t2轉換為”running”狀態,t2頁獲得num=0。然后t2對得到的值進行加1並賦給num,使得num=1。然后系統又把t2調度為”sleeping”,把t1轉為”running”。線程t1又把它之前得到的0加1后賦值給num。這樣,明明t1和t2都完成了1次加1工作,但結果仍然是num=1。

    上面的case描述了多線程情況下最常見的問題之一:數據共享。當多個線程都要去修改某一個共享數據的時候,我們需要對數據訪問進行同步。

1、  簡單的同步

最簡單的同步機制就是“鎖”。鎖對象由threading.RLock類創建。線程可以使用鎖的acquire()方法獲得鎖,這樣鎖就進入“locked”狀態。每次只有一個線程可以獲得鎖。如果當另一個線程試圖獲得這個鎖的時候,就會被系統變為“blocked”狀態,直到那個擁有鎖的線程調用鎖的release()方法來釋放鎖,這樣鎖就會進入“unlocked”狀態。“blocked”狀態的線程就會收到一個通知,並有權利獲得鎖。如果多個線程處於“blocked”狀態,所有線程都會先解除“blocked”狀態,然后系統選擇一個線程來獲得鎖,其他的線程繼續沉默(“blocked”)。

Python中的thread模塊和Lock對象是Python提供的低級線程控制工具,使用起來非常簡單。如下例所示:

 1 import thread  
 2 import time  
 3 mylock = thread.allocate_lock()  #Allocate a lock  
 4 num=0  #Shared resource  
 5   
 6 def add_num(name):  
 7     global num  
 8     while True:  
 9         mylock.acquire() #Get the lock   
10         # Do something to the shared resource  
11         print 'Thread %s locked! num=%s'%(name,str(num))  
12         if num >= 5:  
13             print 'Thread %s released! num=%s'%(name,str(num))  
14             mylock.release()  
15             thread.exit_thread()  
16         num+=1  
17         print 'Thread %s released! num=%s'%(name,str(num))  
18         mylock.release()  #Release the lock.  
19   
20 def test():  
21     thread.start_new_thread(add_num, ('A',))  
22     thread.start_new_thread(add_num, ('B',))  
23   
24 if __name__== '__main__':  
25     test()  

Python 在thread的基礎上還提供了一個高級的線程控制庫,就是之前提到過的threading。Python的threading module是在建立在thread module基礎之上的一個module,在threading module中,暴露了許多thread module中的屬性。在thread module中,python提供了用戶級的線程同步工具“Lock”對象。而在threading module中,python又提供了Lock對象的變種: RLock對象。RLock對象內部維護着一個Lock對象,它是一種可重入的對象。對於Lock對象而言,如果一個線程連續兩次進行acquire操作,那么由於第一次acquire之后沒有release,第二次acquire將掛起線程。這會導致Lock對象永遠不會release,使得線程死鎖。RLock對象允許一個線程多次對其進行acquire操作,因為在其內部通過一個counter變量維護着線程acquire的次數。而且每一次的acquire操作必須有一個release操作與之對應,在所有的release操作完成之后,別的線程才能申請該RLock對象。

下面來看看如何使用threading的RLock對象實現同步。

 1 import threading  
 2 mylock = threading.RLock()  
 3 num=0  
 4    
 5 class myThread(threading.Thread):  
 6     def __init__(self, name):  
 7         threading.Thread.__init__(self)  
 8         self.t_name = name  
 9           
10     def run(self):  
11         global num  
12         while True:  
13             mylock.acquire()  
14             print '\nThread(%s) locked, Number: %d'%(self.t_name, num)  
15             if num>=4:  
16                 mylock.release()  
17                 print '\nThread(%s) released, Number: %d'%(self.t_name, num)  
18                 break  
19             num+=1  
20             print '\nThread(%s) released, Number: %d'%(self.t_name, num)  
21             mylock.release()  
22               
23 def test():  
24     thread1 = myThread('A')  
25     thread2 = myThread('B')  
26     thread1.start()  
27     thread2.start()  
28    
29 if __name__== '__main__':  
30     test()  

我們把修改共享數據的代碼成為“臨界區”。必須將所有“臨界區”都封閉在同一個鎖對象的acquire和release之間。

2、  條件同步

鎖只能提供最基本的同步。假如只在發生某些事件時才訪問一個“臨界區”,這時需要使用條件變量Condition。

Condition對象是對Lock對象的包裝,在創建Condition對象時,其構造函數需要一個Lock對象作為參數,如果沒有這個Lock對象參數,Condition將在內部自行創建一個Rlock對象。在Condition對象上,當然也可以調用acquire和release操作,因為內部的Lock對象本身就支持這些操作。但是Condition的價值在於其提供的wait和notify的語義。

條件變量是如何工作的呢?首先一個線程成功獲得一個條件變量后,調用此條件變量的wait()方法會導致這個線程釋放這個鎖,並進入“blocked”狀態,直到另一個線程調用同一個條件變量的notify()方法來喚醒那個進入“blocked”狀態的線程。如果調用這個條件變量的notifyAll()方法的話就會喚醒所有的在等待的線程。

如果程序或者線程永遠處於“blocked”狀態的話,就會發生死鎖。所以如果使用了鎖、條件變量等同步機制的話,一定要注意仔細檢查,防止死鎖情況的發生。對於可能產生異常的臨界區要使用異常處理機制中的finally子句來保證釋放鎖。等待一個條件變量的線程必須用notify()方法顯式的喚醒,否則就永遠沉默。保證每一個wait()方法調用都有一個相對應的notify()調用,當然也可以調用notifyAll()方法以防萬一。

 

 

生產者與消費者問題是典型的同步問題。這里簡單介紹兩種不同的實現方法。

1,  條件變量

 1 import threading  
 2   
 3 import time  
 4   
 5 class Producer(threading.Thread):  
 6   
 7     def __init__(self, t_name):  
 8   
 9         threading.Thread.__init__(self, name=t_name)  
10   
11    
12   
13     def run(self):  
14   
15         global x  
16   
17         con.acquire()  
18   
19         if x > 0:  
20   
21             con.wait()  
22   
23         else:  
24   
25             for i in range(5):  
26   
27                 x=x+1  
28   
29                 print "producing..." + str(x)  
30   
31             con.notify()  
32   
33         print x  
34   
35         con.release()  
36   
37    
38   
39 class Consumer(threading.Thread):  
40   
41     def __init__(self, t_name):  
42   
43         threading.Thread.__init__(self, name=t_name)  
44   
45     def run(self):  
46   
47         global x  
48   
49         con.acquire()  
50   
51         if x == 0:  
52   
53             print 'consumer wait1'  
54   
55             con.wait()  
56   
57         else:  
58   
59             for i in range(5):  
60   
61                 x=x-1  
62   
63                 print "consuming..." + str(x)  
64   
65             con.notify()  
66   
67         print x  
68   
69         con.release()  
70   
71    
72   
73 con = threading.Condition()  
74   
75 x=0  
76   
77 print 'start consumer'  
78   
79 c=Consumer('consumer')  
80   
81 print 'start producer'  
82   
83 p=Producer('producer')  
84   
85    
86   
87 p.start()  
88   
89 c.start()  
90   
91 p.join()  
92   
93 c.join()  
94   
95 print x  

 上面的例子中,在初始狀態下,Consumer處於wait狀態,Producer連續生產(對x執行增1操作)5次后,notify正在等待的Consumer。Consumer被喚醒開始消費(對x執行減1操作) 

2,  同步隊列

Python中的Queue對象也提供了對線程同步的支持。使用Queue對象可以實現多個生產者和多個消費者形成的FIFO的隊列。

生產者將數據依次存入隊列,消費者依次從隊列中取出數據。

 1 # producer_consumer_queue  
 2   
 3 from Queue import Queue  
 4   
 5 import random  
 6   
 7 import threading  
 8   
 9 import time  
10   
11    
12   
13 #Producer thread  
14   
15 class Producer(threading.Thread):  
16   
17     def __init__(self, t_name, queue):  
18   
19         threading.Thread.__init__(self, name=t_name)  
20   
21         self.data=queue  
22   
23     def run(self):  
24   
25         for i in range(5):  
26   
27             print "%s: %s is producing %d to the queue!\n" %(time.ctime(), self.getName(), i)  
28   
29             self.data.put(i)  
30   
31             time.sleep(random.randrange(10)/5)  
32   
33         print "%s: %s finished!" %(time.ctime(), self.getName())  
34   
35    
36   
37 #Consumer thread  
38   
39 class Consumer(threading.Thread):  
40   
41     def __init__(self, t_name, queue):  
42   
43         threading.Thread.__init__(self, name=t_name)  
44   
45         self.data=queue  
46   
47     def run(self):  
48   
49         for i in range(5):  
50   
51             val = self.data.get()  
52   
53             print "%s: %s is consuming. %d in the queue is consumed!\n" %(time.ctime(), self.getName(), val)  
54   
55             time.sleep(random.randrange(10))  
56   
57         print "%s: %s finished!" %(time.ctime(), self.getName())  
58   
59    
60   
61 #Main thread  
62   
63 def main():  
64   
65     queue = Queue()  
66   
67     producer = Producer('Pro.', queue)  
68   
69     consumer = Consumer('Con.', queue)  
70   
71     producer.start()  
72   
73     consumer.start()  
74   
75     producer.join()  
76   
77     consumer.join()  
78   
79     print 'All threads terminate!'  
80   
81    
82   
83 if __name__ == '__main__':  
84   
85     main()  

在上面的例子中,Producer在隨機的時間內生產一個“產品”,放入隊列中。Consumer發現隊列中有了“產品”,就去消費它。本例中,由於Producer生產的速度快於Consumer消費的速度,所以往往Producer生產好幾個“產品”后,Consumer才消費一個產品。

Queue模塊實現了一個支持多producer和多consumer的FIFO隊列。當共享信息需要安全的在多線程之間交換時,Queue非常有用。Queue的默認長度是無限的,但是可以設置其構造函數的maxsize參數來設定其長度。Queue的put方法在隊尾插入,該方法的原型是:

put( item[, block[, timeout]])

如果可選參數block為true並且timeout為None(缺省值),線程被block,直到隊列空出一個數據單元。如果timeout大於0,在timeout的時間內,仍然沒有可用的數據單元,Full exception被拋出。反之,如果block參數為false(忽略timeout參數),item被立即加入到空閑數據單元中,如果沒有空閑數據單元,Full exception被拋出。

Queue的get方法是從隊首取數據,其參數和put方法一樣。如果block參數為true且timeout為None(缺省值),線程被block,直到隊列中有數據。如果timeout大於0,在timeout時間內,仍然沒有可取數據,Empty exception被拋出。反之,如果block參數為false(忽略timeout參數),隊列中的數據被立即取出。如果此時沒有可取數據,Empty exception也會被拋出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM