1.線程的創建
多線程的使用在一些較為復雜的問題中十分常見,例如用爬蟲爬取上億條數據的情況下,單線程便不再適用啦,要想掌握多線程的使用,我們首先從線程的創建和使用開始。
Python中使用線程有多種方式。
1.1函數式:調用thread模塊中的start_new_thread()函數來產生新線程。
如下例:
# -*- coding: utf-8 -*- import thread def run_thread(n): for i in range(n): print i thread.start_new_thread(run_thread,(4,)) #參數一定是元組,兩個參數可以寫成(a,b)
1.2 創建threading.Thread的子類來包裝一個線程對象
如下例,我們創建了兩個進程:
from threading import Thread import time class race(Thread): def __init__(self,threadname, interval): Thread.__init__(self,name = threadname) self.interval = interval self.isrunning = True def run(self): while self.isrunning: print 'thread %s is running, time: %s\n' %(self.getName(), time.ctime()) time.sleep(self.interval) def stop(self): self.isrunning = False def test(): thead1 = race('A',1) thead2 = race('B',2) thead1.start() thead2.start() time.sleep(5) thead1.stop() thead2.stop() if __name__ == '__main__': test()
在自己的線程類的__init__里調用threading.Thread.__init__(self, name = threadname). Threadname為線程的名字
這種方法可以創建自己的線程類,必要時重寫threading.Thread類的方法,線程的控制可以由自己定制。
1.3 在threading.Thread中指定目標函數作為線程處理函數
# -*- coding: utf-8 -*- from threading import Thread def run_thread(n): for i in range(n): print i t1 = Thread(target=run_thread,args=(5,))#指定目標函數,傳入參數,這里參數也是元組 t1.start() #啟動線程
附: threading.Thread中常用函數說明
函數名 | 功能 |
run() | 如果采用方法2創建線程就需要重寫該方法 |
getName() | 獲得線程的名稱(方法2中有示例) |
setName() | 設置線程的名稱 |
start() | 啟動線程 |
join(timeout) | 在join()位置等待另一線程結束后再繼續運行join()后的操作,timeout是可選項,表示最大等待時間 |
setDaemon(bool) | True:當父線程結束時,子線程立即結束;False:父線程等待子線程結束后才結束。默認為False |
isDaemon() | 判斷子線程是否和父線程一起結束,即setDaemon()設置的值 |
isAlive() | 判斷線程是否在運行 |
例:join()方法的使用
我們通過一段代碼來觀察join()方法帶來的改變:
# -*- coding: utf-8 -*- import threading import time class Mythread(threading.Thread): def __init__(self,threadname): threading.Thread.__init__(self, name = threadname) def run(self): time.sleep(2) for i in range(5): print '%s is running....' %self.getName() t2 = Mythread('b') t2.start() #t2.join() for i in range(5): print 'the programing is running'
如上例所示,我們把join()方法注釋掉,這是一段普通的線程代碼,它的運行結果如下:
the programing is running the programing is running the programing is running the programing is running the programing is running b is running.... b is running.... b is running.... b is running.... b is running....
此時,程序先運行主線程的程序,當主線程運行之后再運行B線程的內容。
當我們把join()方法的注釋符號去掉,即加入該方法后,會發生怎樣的改變呢? 運行程序后結果如下:
b is running.... b is running.... b is running.... b is running.... b is running.... the programing is running the programing is running the programing is running the programing is running the programing is running [Finished in 2.0s]
可以看到,當join()加入后,當主線程運行到 t2.join() 時,它將等待 t2 運行完,然后再繼續運行t2.join() 后的操作。
2.線程的同步
假設兩個線程對象t1和t2都要對num=0進行增1運算,t1和t2都各對num修改10次,num的最終的結果應該為20。但是由於是多線程訪問,有可能出現下面情況:在num=0時,t1取得num=0。系統此時把t1調度為”sleeping”狀態,把t2轉換為”running”狀態,t2頁獲得num=0。然后t2對得到的值進行加1並賦給num,使得num=1。然后系統又把t2調度為”sleeping”,把t1轉為”running”。線程t1又把它之前得到的0加1后賦值給num。這樣,明明t1和t2都完成了1次加1工作,但結果仍然是num=1。
上面的case描述了多線程情況下最常見的問題之一:數據共享。當多個線程都要去修改某一個共享數據的時候,我們需要對數據訪問進行同步。
學過操作系統的同學都知道,在操作系統中為了解決這一問題我們引入了鎖機制,在python中同樣如此。
2.1 簡單的線程同步
最簡單的同步機制就是“鎖”。鎖對象由threading.RLock類創建。
1.線程可以使用鎖的acquire()方法獲得鎖,這樣鎖就進入“locked”狀態。每次只有一個線程可以獲得鎖。
2.如果當另一個線程試圖獲得這個鎖的時候,就會被系統變為“blocked”狀態,直到那個擁有鎖的線程調用鎖的release()方法來釋放鎖,這樣鎖就會進入“unlocked”狀態。“blocked”狀態的線程就會收到一個通知,並有權利獲得鎖。
3.如果多個線程處於“blocked”狀態,所有線程都會先解除“blocked”狀態,然后系統選擇一個線程來獲得鎖,其他的線程繼續沉默(“blocked”)。
Python中的thread模塊和Lock對象是Python提供的低級線程控制工具,使用起來非常簡單。如下例所示:
1 import thread 2 import time 3 mylock = thread.allocate_lock() #Allocate a lock 4 num=0 #Shared resource 5 6 def add_num(name): 7 global num 8 while True: 9 mylock.acquire() #Get the lock 10 # Do something to the shared resource 11 print 'Thread %s locked! num=%s'%(name,str(num)) 12 if num >= 5: 13 print 'Thread %s released! num=%s'%(name,str(num)) 14 mylock.release() 15 thread.exit_thread() 16 num+=1 17 print 'Thread %s released! num=%s'%(name,str(num)) 18 mylock.release() #Release the lock. 19 20 def test(): 21 thread.start_new_thread(add_num, ('A',)) 22 thread.start_new_thread(add_num, ('B',)) 23 24 if __name__== '__main__': 25 test()
Python 在thread的基礎上還提供了一個高級的線程控制庫,就是之前提到過的threading。
Python的threading module是在建立在thread module基礎之上的一個module,在threading module中,暴露了許多thread module中的屬性。
在thread module中,python提供了用戶級的線程同步工具“Lock”對象。而在threading module中,python又提供了Lock對象的變種: RLock對象。RLock對象內部維護着一個Lock對象,它是一種可重入的對象。
對於Lock對象而言,如果一個線程連續兩次進行acquire操作,那么由於第一次acquire之后沒有release,第二次acquire將掛起線程。這會導致Lock對象永遠不會release,使得線程死鎖。
RLock對象允許一個線程多次對其進行acquire操作,因為在其內部通過一個counter變量維護着線程acquire的次數。而且每一次的acquire操作必須有一個release操作與之對應,在所有的release操作完成之后,別的線程才能申請該RLock對象。
下面來看看如何使用threading的RLock對象實現同步。
1 # -*- coding: utf-8 -*- 2 import threading 3 mylock = threading.RLock() 4 num=0 5 6 class myThread(threading.Thread): 7 def __init__(self, name): 8 threading.Thread.__init__(self) 9 self.t_name = name 10 11 def run(self): 12 global num #聲明為全局變量 13 while True: 14 mylock.acquire() 15 print '/nThread(%s) locked, Number: %d'%(self.t_name, num) 16 if num>=4: 17 mylock.release() 18 print '/nThread(%s) released, Number: %d'%(self.t_name, num) 19 break 20 num+=1 21 print '/nThread(%s) released, Number: %d'%(self.t_name, num) 22 mylock.release() 23 24 def test(): 25 thread1 = myThread('A') 26 thread2 = myThread('B') 27 thread1.start() 28 thread2.start() 29 30 if __name__== '__main__': 31 test()
我們把修改共享數據的代碼成為“臨界區”。必須將所有“臨界區”都封閉在同一個鎖對象的acquire和release之間。
2.2 條件同步
鎖只能提供最基本的同步。假如只在發生某些事件時才訪問一個“臨界區”,這時需要使用條件變量Condition。
Condition對象是對Lock對象的包裝,在創建Condition對象時,其構造函數需要一個Lock對象作為參數,如果沒有這個Lock對象參數,Condition將在內部自行創建一個Rlock對象。在Condition對象上,當然也可以調用acquire和release操作,因為內部的Lock對象本身就支持這些操作。但是Condition的價值在於其提供的wait和notify的語義。
條件變量是如何工作的呢?首先一個線程成功獲得一個條件變量后,調用此條件變量的wait()方法會導致這個線程釋放這個鎖,並進入“blocked”狀態,直到另一個線程調用同一個條件變量的notify()方法來喚醒那個進入“blocked”狀態的線程。如果調用這個條件變量的notifyAll()方法的話就會喚醒所有的在等待的線程。
如果程序或者線程永遠處於“blocked”狀態的話,就會發生死鎖。所以如果使用了鎖、條件變量等同步機制的話,一定要注意仔細檢查,防止死鎖情況的發生。對於可能產生異常的臨界區要使用異常處理機制中的finally子句來保證釋放鎖。等待一個條件變量的線程必須用notify()方法顯式的喚醒,否則就永遠沉默。保證每一個wait()方法調用都有一個相對應的notify()調用,當然也可以調用notifyAll()方法以防萬一。