一、Python中的線程使用:
Python中使用線程有兩種方式:函數或者用類來包裝線程對象。
1、 函數式:調用thread模塊中的start_new_thread()函數來產生新線程。如下例:
1 import time 2 import thread 3 def timer(no, interval): 4 cnt = 0 5 while cnt<10: 6 print 'Thread:(%d) Time:%s\n'%(no, time.ctime()) 7 time.sleep(interval) 8 cnt+=1 9 thread.exit_thread() 10 11 12 def test(): #Use thread.start_new_thread() to create 2 new threads 13 thread.start_new_thread(timer, (1,1)) 14 thread.start_new_thread(timer, (2,2)) 15 16 if __name__=='__main__': 17 test()
上面的例子定義了一個線程函數timer,它打印出10條時間記錄后退出,每次打印的間隔由interval參數決定。thread.start_new_thread(function, args[, kwargs])的第一個參數是線程函數(本例中的timer方法),第二個參數是傳遞給線程函數的參數,它必須是tuple類型,kwargs是可選參數。
線程的結束可以等待線程自然結束,也可以在線程函數中調用thread.exit()或thread.exit_thread()方法。
2、 創建threading.Thread的子類來包裝一個線程對象,如下例:
1 import threading 2 import time 3 class timer(threading.Thread): #The timer class is derived from the class threading.Thread 4 def __init__(self, num, interval): 5 threading.Thread.__init__(self) 6 self.thread_num = num 7 self.interval = interval 8 self.thread_stop = False 9 10 def run(self): #Overwrite run() method, put what you want the thread do here 11 while not self.thread_stop: 12 print 'Thread Object(%d), Time:%s\n' %(self.thread_num, time.ctime()) 13 time.sleep(self.interval) 14 def stop(self): 15 self.thread_stop = True 16 17 18 def test(): 19 thread1 = timer(1, 1) 20 thread2 = timer(2, 2) 21 thread1.start() 22 thread2.start() 23 time.sleep(10) 24 thread1.stop() 25 thread2.stop() 26 return 27 28 if __name__ == '__main__': 29 test()
第二種方式,即創建自己的線程類,必要時重寫threading.Thread類的方法,線程的控制可以由自己定制。
threading.Thread類的使用:
1,在自己的線程類的__init__里調用threading.Thread.__init__(self, name = threadname)
Threadname為線程的名字
2, run(),通常需要重寫,編寫代碼實現做需要的功能。
3,getName(),獲得線程對象名稱
4,setName(),設置線程對象名稱
5,start(),啟動線程
6,jion([timeout]),等待另一線程結束后再運行。
7,setDaemon(bool),設置子線程是否隨主線程一起結束,必須在start()之前調用。默認為False。
8,isDaemon(),判斷線程是否隨主線程一起結束。
9,isAlive(),檢查線程是否在運行中。
此外threading模塊本身也提供了很多方法和其他的類,可以幫助我們更好的使用和管理線程。可以參看http://www.python.org/doc/2.5.2/lib/module-threading.html。
假設兩個線程對象t1和t2都要對num=0進行增1運算,t1和t2都各對num修改10次,num的最終的結果應該為20。但是由於是多線程訪問,有可能出現下面情況:在num=0時,t1取得num=0。系統此時把t1調度為”sleeping”狀態,把t2轉換為”running”狀態,t2頁獲得num=0。然后t2對得到的值進行加1並賦給num,使得num=1。然后系統又把t2調度為”sleeping”,把t1轉為”running”。線程t1又把它之前得到的0加1后賦值給num。這樣,明明t1和t2都完成了1次加1工作,但結果仍然是num=1。
上面的case描述了多線程情況下最常見的問題之一:數據共享。當多個線程都要去修改某一個共享數據的時候,我們需要對數據訪問進行同步。
1、 簡單的同步
最簡單的同步機制就是“鎖”。鎖對象由threading.RLock類創建。線程可以使用鎖的acquire()方法獲得鎖,這樣鎖就進入“locked”狀態。每次只有一個線程可以獲得鎖。如果當另一個線程試圖獲得這個鎖的時候,就會被系統變為“blocked”狀態,直到那個擁有鎖的線程調用鎖的release()方法來釋放鎖,這樣鎖就會進入“unlocked”狀態。“blocked”狀態的線程就會收到一個通知,並有權利獲得鎖。如果多個線程處於“blocked”狀態,所有線程都會先解除“blocked”狀態,然后系統選擇一個線程來獲得鎖,其他的線程繼續沉默(“blocked”)。
Python中的thread模塊和Lock對象是Python提供的低級線程控制工具,使用起來非常簡單。如下例所示:
1 import thread 2 import time 3 mylock = thread.allocate_lock() #Allocate a lock 4 num=0 #Shared resource 5 6 def add_num(name): 7 global num 8 while True: 9 mylock.acquire() #Get the lock 10 # Do something to the shared resource 11 print 'Thread %s locked! num=%s'%(name,str(num)) 12 if num >= 5: 13 print 'Thread %s released! num=%s'%(name,str(num)) 14 mylock.release() 15 thread.exit_thread() 16 num+=1 17 print 'Thread %s released! num=%s'%(name,str(num)) 18 mylock.release() #Release the lock. 19 20 def test(): 21 thread.start_new_thread(add_num, ('A',)) 22 thread.start_new_thread(add_num, ('B',)) 23 24 if __name__== '__main__': 25 test()
Python 在thread的基礎上還提供了一個高級的線程控制庫,就是之前提到過的threading。Python的threading module是在建立在thread module基礎之上的一個module,在threading module中,暴露了許多thread module中的屬性。在thread module中,python提供了用戶級的線程同步工具“Lock”對象。而在threading module中,python又提供了Lock對象的變種: RLock對象。RLock對象內部維護着一個Lock對象,它是一種可重入的對象。對於Lock對象而言,如果一個線程連續兩次進行acquire操作,那么由於第一次acquire之后沒有release,第二次acquire將掛起線程。這會導致Lock對象永遠不會release,使得線程死鎖。RLock對象允許一個線程多次對其進行acquire操作,因為在其內部通過一個counter變量維護着線程acquire的次數。而且每一次的acquire操作必須有一個release操作與之對應,在所有的release操作完成之后,別的線程才能申請該RLock對象。
下面來看看如何使用threading的RLock對象實現同步。
1 import threading 2 mylock = threading.RLock() 3 num=0 4 5 class myThread(threading.Thread): 6 def __init__(self, name): 7 threading.Thread.__init__(self) 8 self.t_name = name 9 10 def run(self): 11 global num 12 while True: 13 mylock.acquire() 14 print '\nThread(%s) locked, Number: %d'%(self.t_name, num) 15 if num>=4: 16 mylock.release() 17 print '\nThread(%s) released, Number: %d'%(self.t_name, num) 18 break 19 num+=1 20 print '\nThread(%s) released, Number: %d'%(self.t_name, num) 21 mylock.release() 22 23 def test(): 24 thread1 = myThread('A') 25 thread2 = myThread('B') 26 thread1.start() 27 thread2.start() 28 29 if __name__== '__main__': 30 test()
我們把修改共享數據的代碼成為“臨界區”。必須將所有“臨界區”都封閉在同一個鎖對象的acquire和release之間。
2、 條件同步
鎖只能提供最基本的同步。假如只在發生某些事件時才訪問一個“臨界區”,這時需要使用條件變量Condition。
Condition對象是對Lock對象的包裝,在創建Condition對象時,其構造函數需要一個Lock對象作為參數,如果沒有這個Lock對象參數,Condition將在內部自行創建一個Rlock對象。在Condition對象上,當然也可以調用acquire和release操作,因為內部的Lock對象本身就支持這些操作。但是Condition的價值在於其提供的wait和notify的語義。
條件變量是如何工作的呢?首先一個線程成功獲得一個條件變量后,調用此條件變量的wait()方法會導致這個線程釋放這個鎖,並進入“blocked”狀態,直到另一個線程調用同一個條件變量的notify()方法來喚醒那個進入“blocked”狀態的線程。如果調用這個條件變量的notifyAll()方法的話就會喚醒所有的在等待的線程。
如果程序或者線程永遠處於“blocked”狀態的話,就會發生死鎖。所以如果使用了鎖、條件變量等同步機制的話,一定要注意仔細檢查,防止死鎖情況的發生。對於可能產生異常的臨界區要使用異常處理機制中的finally子句來保證釋放鎖。等待一個條件變量的線程必須用notify()方法顯式的喚醒,否則就永遠沉默。保證每一個wait()方法調用都有一個相對應的notify()調用,當然也可以調用notifyAll()方法以防萬一。
生產者與消費者問題是典型的同步問題。這里簡單介紹兩種不同的實現方法。
1, 條件變量
1 import threading 2 3 import time 4 5 class Producer(threading.Thread): 6 7 def __init__(self, t_name): 8 9 threading.Thread.__init__(self, name=t_name) 10 11 12 13 def run(self): 14 15 global x 16 17 con.acquire() 18 19 if x > 0: 20 21 con.wait() 22 23 else: 24 25 for i in range(5): 26 27 x=x+1 28 29 print "producing..." + str(x) 30 31 con.notify() 32 33 print x 34 35 con.release() 36 37 38 39 class Consumer(threading.Thread): 40 41 def __init__(self, t_name): 42 43 threading.Thread.__init__(self, name=t_name) 44 45 def run(self): 46 47 global x 48 49 con.acquire() 50 51 if x == 0: 52 53 print 'consumer wait1' 54 55 con.wait() 56 57 else: 58 59 for i in range(5): 60 61 x=x-1 62 63 print "consuming..." + str(x) 64 65 con.notify() 66 67 print x 68 69 con.release() 70 71 72 73 con = threading.Condition() 74 75 x=0 76 77 print 'start consumer' 78 79 c=Consumer('consumer') 80 81 print 'start producer' 82 83 p=Producer('producer') 84 85 86 87 p.start() 88 89 c.start() 90 91 p.join() 92 93 c.join() 94 95 print x
上面的例子中,在初始狀態下,Consumer處於wait狀態,Producer連續生產(對x執行增1操作)5次后,notify正在等待的Consumer。Consumer被喚醒開始消費(對x執行減1操作)
2, 同步隊列
Python中的Queue對象也提供了對線程同步的支持。使用Queue對象可以實現多個生產者和多個消費者形成的FIFO的隊列。
生產者將數據依次存入隊列,消費者依次從隊列中取出數據。
1 # producer_consumer_queue 2 3 from Queue import Queue 4 5 import random 6 7 import threading 8 9 import time 10 11 12 13 #Producer thread 14 15 class Producer(threading.Thread): 16 17 def __init__(self, t_name, queue): 18 19 threading.Thread.__init__(self, name=t_name) 20 21 self.data=queue 22 23 def run(self): 24 25 for i in range(5): 26 27 print "%s: %s is producing %d to the queue!\n" %(time.ctime(), self.getName(), i) 28 29 self.data.put(i) 30 31 time.sleep(random.randrange(10)/5) 32 33 print "%s: %s finished!" %(time.ctime(), self.getName()) 34 35 36 37 #Consumer thread 38 39 class Consumer(threading.Thread): 40 41 def __init__(self, t_name, queue): 42 43 threading.Thread.__init__(self, name=t_name) 44 45 self.data=queue 46 47 def run(self): 48 49 for i in range(5): 50 51 val = self.data.get() 52 53 print "%s: %s is consuming. %d in the queue is consumed!\n" %(time.ctime(), self.getName(), val) 54 55 time.sleep(random.randrange(10)) 56 57 print "%s: %s finished!" %(time.ctime(), self.getName()) 58 59 60 61 #Main thread 62 63 def main(): 64 65 queue = Queue() 66 67 producer = Producer('Pro.', queue) 68 69 consumer = Consumer('Con.', queue) 70 71 producer.start() 72 73 consumer.start() 74 75 producer.join() 76 77 consumer.join() 78 79 print 'All threads terminate!' 80 81 82 83 if __name__ == '__main__': 84 85 main()
在上面的例子中,Producer在隨機的時間內生產一個“產品”,放入隊列中。Consumer發現隊列中有了“產品”,就去消費它。本例中,由於Producer生產的速度快於Consumer消費的速度,所以往往Producer生產好幾個“產品”后,Consumer才消費一個產品。
Queue模塊實現了一個支持多producer和多consumer的FIFO隊列。當共享信息需要安全的在多線程之間交換時,Queue非常有用。Queue的默認長度是無限的,但是可以設置其構造函數的maxsize參數來設定其長度。Queue的put方法在隊尾插入,該方法的原型是:
put( item[, block[, timeout]])
如果可選參數block為true並且timeout為None(缺省值),線程被block,直到隊列空出一個數據單元。如果timeout大於0,在timeout的時間內,仍然沒有可用的數據單元,Full exception被拋出。反之,如果block參數為false(忽略timeout參數),item被立即加入到空閑數據單元中,如果沒有空閑數據單元,Full exception被拋出。
Queue的get方法是從隊首取數據,其參數和put方法一樣。如果block參數為true且timeout為None(缺省值),線程被block,直到隊列中有數據。如果timeout大於0,在timeout時間內,仍然沒有可取數據,Empty exception被拋出。反之,如果block參數為false(忽略timeout參數),隊列中的數據被立即取出。如果此時沒有可取數據,Empty exception也會被拋出。