Thread
先引入一個例子:
>>> from threading import Thread,currentThread,activeCount >>> >>> def test(s): ... print "ident:",currentThread().ident ... print "count:",activeCount() ... print s ... >>> >>> Thread(target = test, args =('Hello',)).start() ident: 1099229504 count: 2 Hello
需要模塊threading,對應的幫助文檔:
http://docs.python.org/2.7/library/threading.html#module-threading
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}) This constructor should always be called with keyword arguments. Arguments are: group should be None; reserved for future extension when a ThreadGroup class is implemented. target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called. name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number. args is the argument tuple for the target invocation. Defaults to (). kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}. If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
除了標識符,還可以給線程取個名字,便於調試。
還可以繼承Thread實現自己的線程類:
>>> from threading import * >>> >>> class MyThread(Thread): ... def __init__(self,name,*args): ... super(MyThread,self).__init__(name = name)#調用父類的init,設置線程的名稱 ... self.data = args ... ... def run(self): ... print self.name,self.data ... >>> >>> MyThread("abc",range(10)).start() abc ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],)>>> >>> >>> MyThread("abc",range(5)).start() abc ([0, 1, 2, 3, 4],) >>> MyThread("abc",range(10)).start() abc ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],)
將線程daemon屬性設為True,那么表示這是一個背景線程,進程退出時不會等到該線程結束。
調用join()等到線程結束,可提供超時參數(秒,浮點數設定更小粒度)。
isAlive()檢查線程狀態,join()可以多次調用。
>>> from time import sleep >>> >>> def test(): ... print "__thread__start__" ... sleep(10) ... print "__thread__exit__" ... >>> >>> def run(): ... t = Thread(target = test) ... t.start() ... t.join(2) //設置超時時間為2s ... ... print t.isAlive()//檢查線程狀態 ... t.join() //再次等待 ... ... print "over!" ... >>> >>> run() __thread__start__ True __thread__exit__ over!
Lock
Lock不支持遞歸加鎖,也就是說即便在同一個線程中,也必須等待鎖釋放。通常建議改用RLock,它會處理"owning thread"和"recursion level"狀態,對於同一個線程的多次請求鎖行為,只累加計數器。每次調用release()將遞減該計數器,直到0時釋放鎖,因此acquire()和relase()必須承兌出現,一個加鎖,一個釋放。
threading中的成員大多實現了上下文協議,盡可能用with代替手工調用。
>>> lock = RLock() >>> >>> def show(i): ... with lock: ... print currentThread().name,i ... sleep(0.1) ... >>> def test(): ... with lock: ... for i in range(5): ... show(i) ... >>> >>> for i in range(2): ... Thread(target = test).start() ... >>> Thread-2 0 Thread-2 1 Thread-2 2 Thread-2 3 Thread-2 4 Thread-3 0 Thread-3 1 Thread-3 2 Thread-3 3 Thread-3 4
所有線程等待lock鎖,串行執行。
Event
Event通過一個內部標記來協調多線程運行。方法wait()阻塞線程執行,直到標記為True。
set()將標記設為True,clear()更改標記為False。isSet()用於判斷標記狀態。
>>> from threading import * >>> >>> def test(): ... e = Event() ... def test(): ... for i in range(5): ... e.wait() ... e.clear() ... print i ... Thread(target = test).start() ... return e ... >>> e = test() >>> e.set() >>> 0 >>> e.set() >>> 1 >>> e.set() >>> 2 >>> e.set() >>> 3
如果不調用clear(),那么標記一直為True,wait()就不會發生堵塞行為。
通常為每個線程准備一個獨立的Event,而不是多個線程共享,以避免未及時調用clear(0時發生意外情況。
condition
condition像Lock和Event的綜合體,除基本的鎖操作外,還提供了類似yield的功能。
在獲取鎖以后,可以調用wait()臨時讓出鎖,當前線程被阻塞,直到notify()發送通知后再次請求鎖來恢復執行。將wait當做yield,那么notify就是send
可以將已有的鎖對象傳給Condition
>>> from threading import * >>> from time import sleep >>> >>> >>> cond = Condition() >>> >>> def t1(): ... with cond: ... for i in range(5): ... print currentThread().name,i ... sleep(0.1) ... if i == 3:cond.wait() ... >>> >>> def t2(): ... with cond: ... for i in range(5): ... print currentThread().name,i ... sleep(0.1) ... cond.notify() ... >>> >>> Thread(target = t1).start();Thread(target = t2).start() Thread-1 0 >>> Thread-1 1 Thread-1 2 Thread-1 3 //調用wait(),獲取鎖,當前線程被阻塞 Thread-2 0 Thread-2 1 Thread-2 2 Thread-2 3 Thread-2 4 Thread-1 4//t2執行完range(5)循環,通過cond.notify()發送通知,重新獲取鎖,繼續執行
只有獲取鎖的線程才能調用wait()和notify(),因此必須在鎖釋放前調用。
當wait()釋放鎖后,其他線程也可進入wait狀態。notifyAll()激活所有等待線程,讓它們去搶鎖然后完成后繼續執行。
>>> def test(): ... with cond: ... for i in range(5): ... print currentThread().name,i ... sleep(0.1) ... if i == 2:cond.wait() ... >>> >>> Thread(target = test).start();Thread(target = test).start() Thread-3 0 >>> Thread-3 1 Thread-3 2 Thread-4 0 Thread-4 1 Thread-4 2 //Thread-4等待,Thread-3持有鎖 >>> with cond:cond.notifyAll() //通知所有cond.wait線程 ... >>> Thread-3 3 //Thread-3和Thread-4再次搶鎖完成后繼續執行,順序不定 Thread-3 4 Thread-4 3 Thread-4 4
Semaphore
Semaphore通過一個計數器來限制可同時運行的線程數量。計數器表示還可以運行的線程數量。
acquire()遞減計數器,release()則是增加計數器。
>>> sem = Semaphore(2) >>> >>> def test(): ... with sem: ... for i in range(5): ... print currentThread().name,i ... sleep(0.1) ... >>> >>> for i in range(3): ... Thread(target = test).start() ... Thread-5 0//線程5和6同時執行,獲取計數器,使其減為0,故使得線程7被阻塞 Thread-6 0 >>> Thread-5 1 Thread-6 1 Thread-5 2 Thread-6 2 Thread-5 3 Thread-6 3 Thread-5 4 Thread-6 4//線程5和線程6執行完成后,釋放信號量,線程7開始執行 Thread-7 0 Thread-7 1 Thread-7 2 Thread-7 3 Thread-7 4
線程5和6同時執行,獲取計數器,使其減為0,故使得線程7被阻塞,故前面輸出只有線程5和線程6。
在線程5和線程6執行完成后,釋放信號量,線程7開始執行。
Timer
用一個獨立線程在n秒后執行某個函數。如定時器尚未執行,可用cancel()取消,定時器僅執行一次。
>>> import datetime >>> from threading import * >>> >>> def test(): ... print datetime.datetime.now() ... >>> Timer(2,test).start() >>> 2013-10-29 21:28:07.990131
mark:import datetime和from datetime import *
Local
TLS(thread-lock storage)為線程提供獨立的存儲空間。
>>> from threading import * >>> >>> from time import sleep >>> >>> data = local() >>> >>> def test(fn,x): ... data.x = x ... ... for i in range(5): ... data.x = fn(data.x) ... print currentThread().name,data.x ... sleep(0.1) ... ... >>> >>> t1 = (lambda x:x+1,0) >>> t2 = (lambda x:x+'a','a') >>> >>> for d in (t1,t2): ... Thread(target = test,args = d).start() ... Thread-1 1 Thread-2 aa >>> Thread-1 2 Thread-2 aaa Thread-1 3 Thread-2 aaaa Thread-1 4 Thread-2 aaaaa Thread-1 5 Thread-2 aaaaaa