Python之Threading模塊


Thread

先引入一個例子:

>>> from threading import Thread,currentThread,activeCount
>>> 
>>> def test(s):
...     print "ident:",currentThread().ident
...     print "count:",activeCount()
...     print s
... 
>>> 
>>> Thread(target = test, args =('Hello',)).start()
ident: 1099229504
count: 2
Hello

 

需要模塊threading,對應的幫助文檔:

http://docs.python.org/2.7/library/threading.html#module-threading

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={})
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

 

除了標識符,還可以給線程取個名字,便於調試。

 

還可以繼承Thread實現自己的線程類:

>>> from threading import *
>>> 
>>> class MyThread(Thread):
...     def __init__(self,name,*args):
...             super(MyThread,self).__init__(name = name)#調用父類的init,設置線程的名稱
...             self.data = args
...     
...     def run(self):
...             print self.name,self.data
... 
>>> 
>>> MyThread("abc",range(10)).start()
abc ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],)>>> 

>>> 
>>> MyThread("abc",range(5)).start() 
abc ([0, 1, 2, 3, 4],)
>>> MyThread("abc",range(10)).start()
abc ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],)

 

將線程daemon屬性設為True,那么表示這是一個背景線程,進程退出時不會等到該線程結束。

調用join()等到線程結束,可提供超時參數(秒,浮點數設定更小粒度)。

isAlive()檢查線程狀態,join()可以多次調用。

>>> from time import sleep
>>> 
>>> def test():
...     print "__thread__start__"
...     sleep(10)
...     print "__thread__exit__"
... 
>>> 
>>> def run():
...     t = Thread(target = test)
...     t.start()
...     t.join(2) //設置超時時間為2s
...     
...     print t.isAlive()//檢查線程狀態
...     t.join() //再次等待
...     
...     print "over!"
... 
>>> 
>>> run()
__thread__start__
True
__thread__exit__
over!

Lock

Lock不支持遞歸加鎖,也就是說即便在同一個線程中,也必須等待鎖釋放。通常建議改用RLock,它會處理"owning thread"和"recursion level"狀態,對於同一個線程的多次請求鎖行為,只累加計數器。每次調用release()將遞減該計數器,直到0時釋放鎖,因此acquire()和relase()必須承兌出現,一個加鎖,一個釋放。

 

threading中的成員大多實現了上下文協議,盡可能用with代替手工調用。

>>> lock = RLock()
>>> 
>>> def show(i):
...     with lock:
...             print currentThread().name,i
...             sleep(0.1)
... 
>>> def test():
...     with lock:
...             for i in range(5):
...                     show(i)
... 
>>> 
>>> for i in range(2):
...     Thread(target = test).start()
... 
>>>
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-3 0
Thread-3 1
Thread-3 2
Thread-3 3
Thread-3 4

所有線程等待lock鎖,串行執行。

Event

Event通過一個內部標記來協調多線程運行。方法wait()阻塞線程執行,直到標記為True。

set()將標記設為True,clear()更改標記為False。isSet()用於判斷標記狀態。

>>> from threading import *
>>> 
>>> def test():
...     e = Event()
...     def test():
...             for i in range(5):
...                     e.wait()
...                     e.clear()
...                     print i
...     Thread(target = test).start()
...     return e
... 
>>> e = test()
>>> e.set()
>>> 0

>>> e.set()
>>> 1

>>> e.set()
>>> 2

>>> e.set()
>>> 3

如果不調用clear(),那么標記一直為True,wait()就不會發生堵塞行為。

通常為每個線程准備一個獨立的Event,而不是多個線程共享,以避免未及時調用clear(0時發生意外情況。

 

condition

condition像Lock和Event的綜合體,除基本的鎖操作外,還提供了類似yield的功能。

在獲取鎖以后,可以調用wait()臨時讓出鎖,當前線程被阻塞,直到notify()發送通知后再次請求鎖來恢復執行。將wait當做yield,那么notify就是send

 

可以將已有的鎖對象傳給Condition

>>> from threading import *
>>> from time import sleep
>>> 
>>> 
>>> cond = Condition()
>>> 
>>> def t1():
...     with cond:
...             for i in range(5):
...                     print currentThread().name,i
...                     sleep(0.1)
...                     if i == 3:cond.wait()
... 
>>> 
>>> def t2():
...     with cond:
...             for i in range(5):
...                     print currentThread().name,i
...                     sleep(0.1)
...             cond.notify()
... 
>>> 
>>> Thread(target = t1).start();Thread(target = t2).start()
Thread-1 0
>>> Thread-1 1
Thread-1 2
Thread-1 3 //調用wait(),獲取鎖,當前線程被阻塞
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-1 4//t2執行完range(5)循環,通過cond.notify()發送通知,重新獲取鎖,繼續執行

只有獲取鎖的線程才能調用wait()和notify(),因此必須在鎖釋放前調用。

 

當wait()釋放鎖后,其他線程也可進入wait狀態。notifyAll()激活所有等待線程,讓它們去搶鎖然后完成后繼續執行。

>>> def test():
...     with cond:
...             for i in range(5):
...                     print currentThread().name,i
...                     sleep(0.1)
...                     if i == 2:cond.wait()
... 
>>> 
>>> Thread(target = test).start();Thread(target = test).start()
Thread-3 0

>>> Thread-3 1
Thread-3 2
Thread-4 0
Thread-4 1
Thread-4 2 //Thread-4等待,Thread-3持有鎖

>>> with cond:cond.notifyAll() //通知所有cond.wait線程
... 
>>> Thread-3 3 //Thread-3和Thread-4再次搶鎖完成后繼續執行,順序不定
Thread-3 4
Thread-4 3
Thread-4 4

Semaphore

Semaphore通過一個計數器來限制可同時運行的線程數量。計數器表示還可以運行的線程數量。

acquire()遞減計數器,release()則是增加計數器。

>>> sem  = Semaphore(2)
>>> 
>>> def test():
...     with sem:
...             for i in range(5):
...                     print currentThread().name,i
...                     sleep(0.1)
... 
>>> 
>>> for i in range(3):
...     Thread(target = test).start()
... 
Thread-5 0//線程5和6同時執行,獲取計數器,使其減為0,故使得線程7被阻塞
Thread-6 0
>>> Thread-5 1
Thread-6 1
Thread-5 2
Thread-6 2
Thread-5 3
Thread-6 3
Thread-5 4
Thread-6 4//線程5和線程6執行完成后,釋放信號量,線程7開始執行
Thread-7 0
Thread-7 1
Thread-7 2
Thread-7 3
Thread-7 4

線程5和6同時執行,獲取計數器,使其減為0,故使得線程7被阻塞,故前面輸出只有線程5和線程6。

在線程5和線程6執行完成后,釋放信號量,線程7開始執行。

 

Timer

用一個獨立線程在n秒后執行某個函數。如定時器尚未執行,可用cancel()取消,定時器僅執行一次。

>>> import datetime
>>> from threading import *
>>> 
>>> def test():
...     print datetime.datetime.now()
... 
>>> Timer(2,test).start()
>>> 2013-10-29 21:28:07.990131

mark:import datetime和from datetime import *

Local

TLS(thread-lock storage)為線程提供獨立的存儲空間。

>>> from threading import *
>>> 
>>> from time import sleep
>>> 
>>> data = local()
>>> 
>>> def test(fn,x):
...     data.x = x
...     
...     for i in range(5):
...             data.x = fn(data.x)
...             print currentThread().name,data.x
...             sleep(0.1)
...     
... 
>>> 
>>> t1 = (lambda x:x+1,0)
>>> t2 = (lambda x:x+'a','a')
>>> 
>>> for d in (t1,t2):
...     Thread(target = test,args = d).start()
... 
Thread-1 1
 Thread-2 aa

>>> Thread-1 2
Thread-2 aaa
Thread-1 3
Thread-2 aaaa
Thread-1 4
Thread-2 aaaaa
Thread-1 5
Thread-2 aaaaaa

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM