threading
【這篇文章的閱讀量越來越多了… 因此我覺得有必要聲明下,文章的性質是我個人的學習記錄和總結,並非教程,文中難免有表達不嚴謹,甚至錯誤的地方。如果您只是相對threading相關內容做個大概的了解,希望能對您有所參考。如果想要精密地學習,請移步正規教材、文檔以及大牛的博客】
python程序默認是單線程的,也就是說在前一句語句執行完之前后面的語句不能繼續執行(不知道我理解得對不對)
先感受一下線程,一般情況下:
def testa(): sleep(1) print "a" def testb(): sleep(1) print "b" testa() testb() #先隔出一秒打印出a,再過一秒打出b
但是如果用了threading的話:
ta = threading.Thread(target=testa) tb = threading.Thread(target=testb) for t in [ta,tb]: t.start() for t in [ta,tb]: t.join() print "DONE" #輸出是ab或者ba(緊貼着的)然后空一行再來DONE的結果。
得到這樣的結果是因為這樣的,在start之后,ta首先開始跑,但是主線程(腳本本身)沒有等其完成就繼續開始下一輪循環,然后tb也開始了,在之后的一段時間里,ta和tb兩條線程(分別代表了testa和testb這兩個過程)共同執行。相對於一個個迭代而言,這樣做無疑是大大提高了運行的速度。
Thread類為線程的抽象類,其構造方法的參數target指向一個函數對象,即該線程的具體操作。此外還可以有args=<tuple>來給target函數傳參數。需要注意的是當傳任何一個序列進去的話Thread會自動把它分解成單個單個的元素然后分解傳給target函數。我估計在定義的時候肯定是*args了。
join方法是個很tricky的東西,至今還不是很清楚地懂這是個什么玩意兒。join([timeout])方法阻塞了主線程,直到調用此方法的子線程完成之后主線程才繼續往下運行。(之前我糊里糊塗地把join就緊緊接在start后面寫了,如果這么寫了的話那么多線程在速度上就毫無優勢,和單線程一樣了= =)。而像上面這個示例一樣,先一個遍歷把所有線程 都啟動起來,再用一個遍歷把所有線程都join一遍似乎是比較通行的做法。
● 關於線程鎖
多線程程序涉及到一個問題,那就是當不同線程要對同一個資源進行修改或利用時會出現混亂,所以有必要引入線程鎖。
(經網友提醒,補充一下相關例子)比如下面這一段程序:
import threading class MyThread(threading.Thread): def __init__(self,counter,name): self.counter = counter self.name = name def run(self): self.counter[0] += 1 print self.counter[0] if __name__ == '__main__': counter = [0] for i in range(1,11): t = MyThread(counter,i) t.start()
這里並發了10個線程,在沒有混亂的情況下,很明顯一個線程的name和經過它處理過后的counter中的數字應該相同。因為沒有鎖可能引發混亂,想象中,我們可能認為,當某個線程要打印counter中的數字時,別的線程對其作出了改變,從而導致打印出的counter中的數字不符合預期。實際上,這段代碼的運行結果很大概率是很整齊的1\n2\n3....10。如果要解釋一下,1. 雖然稱並發10個線程。但是實際上線程是不可能真的在同一個時間點開始,比如在這個例子中t1啟動后,要將循環進入下一輪,創建新的線程對象t2,然后再讓t2啟動。這段時間雖然很短很短,但是確實是存在的。而這段時間的長度,足夠讓t1的run中,進行自增並且打印的操作。最終,整個結果看上去似乎沒什么毛病。
如果我們想要看到“混亂”的情況,顯然兩個方法。要么縮短for i in range以及創建線程對象的時間,使得線程在自增之后來不及打印時counter被第二個線程自增,這個比較困難;另一個方法就是延長自增后到打印前的這段時間。自然想到,最簡單的,用time.sleep(1)睡一秒即可。此時結果可能是10\n10\n...。主要看第一行的結果。不再是1而是10了。說明在自增操作結束,打印數字之前睡的這一秒里,到第10個線程都成功自增了counter,因此即使是第一個線程,打印到的也是經過第10個線程修改的counter了。
上述結果雖然數的值上改變了,但是很可能輸出仍然是整齊的一行行的。有時候好幾個數字會擠在一起輸出,尤其是把並發量調大,比如調到100或1000,尤為明顯。擠在一起主要是因為,time.sleep(1)並不是精精確確地睡1秒。有可能是0.999或者1.001(具體差異可能更小,打個比方)。此時可能tk線程睡了1.001秒而tk+1線程睡了0.999秒,導致兩者打印時內容被雜亂地一起寫入緩沖區,所以打印出來的就凌亂了。根據時間誤差的不同,甚至有可能出現大數字先打印出來的情況。
可以通過Thread.Lock類來創建簡單的線程鎖。lock = threading.Lock()即可。在某線程start中,處理會被爭搶的資源之前,讓lock.acquire(),且lock在acquire()之后不能再acquire,否則會報錯。當線程結束處理后調用lock.release()來釋放鎖就好了。一般而言,有鎖的多線程場景可以提升一部分效率,但在寫文件等時機下會有阻塞等待的情況。
為了說明簡單的lock,我們改一下上面那段程序:
import threading import time class MyThread(threading.Thread): def __init__(self,lock,name): threading.Thread.__init__(self) self.lock = lock self.name = name def run(self): time.sleep(1) # self.lock.acquire() print self.name # self.lock.release() if __name__ == '__main__': lock = threading.Lock() for i in range(1,10): t = MyThread(lock,i) t.start()
根據啟動的順序,每個線程有了name屬性。然后啟動,在沒有鎖的情況下可能會出現擠在一起,並且數字亂序輸出的情況。把兩句注釋去掉,加上鎖之后,得到的輸出肯定是一行一行的,但是數字仍然有可能是亂序的。分析一下,加上鎖之后,每次進行print,其實是線程對於sys.stdout寫入內容,有多個線程都要print就形成了競爭,因此就會導致擠在一起。加上鎖,acquire之后,本線程擁有了對sys.stdout的獨享,因此可以正確輸出內容+換行,再解開鎖供下一個需要打印的線程使用。那為什么亂序問題還是沒有解決呢?這個就是(推測)因為前面提到的time.sleep的不精確性。有可能6號線程sleep了稍微久而7號稍微短了些,導致7號先於6號獲得鎖。自然7就比6先打印出來了。如果稍微有意思地改動一下,比如sleep的秒數時間錯開來,1號線程睡1秒,2號線程睡2秒這樣子的話,時間上的錯開使得沒有了對資源的競爭的情況,因此即使沒有鎖也不會亂。
總結一下,1. 對於run過程中對於可能有競爭的資源之前所做的操作,花費時間越是接近,越有可能發生資源競爭從而導致混亂。(廢話…)2. 當run中有print或者類似操作時需要注意,其實隱含着要對stdout做出競爭的意義
相比之下,無所多線程場景可以進一步提升效率,但是可能會引起讀寫沖突等問題,所以要慎用。一定要確認各個線程間沒有共同的資源之類的問題后再實行無鎖多線程。
和Lock類類似的還有一個RLock類,與Lock類的區別在於RLock類鎖可以嵌套地acquire和release。也就是說在同一個線程中acquire之后再acquire也不會報錯,而是將鎖的層級加深一層。只有當每一層鎖從下到上依次都release開這個鎖才算是被解開。
● 更加強大的鎖——Condition
上面提到的threading.Lock類提供了最為簡單的線程鎖的功能。除了Lock和RLock以外,其實threading還補充了其他一些很多的帶有鎖功能的類。Condition就是其中最為強大的類之一。
在說Condition之前還需要明確一下線程的幾個概念。線程的阻塞和掛起,線程的這兩個狀態乍一看都是線程暫停不再繼續往前運行,但是引起的原因不太一樣。阻塞是指線程間互相的制約,當一個線程獲得了鎖,其他的線程就被阻塞了,而掛起是出於統一調度的考慮。換句話說,掛起是一種主動的行為,在程序中我們主動掛起某個線程然后可以主動放下讓線程繼續運行;而阻塞更多時候是被動發生的,當有線程操作沖突了那么必然是有一方要被阻塞的。從層級上看,掛起操作是高於阻塞的,也就說一個線程可以在阻塞的時候被掛起,然后被喚醒后依然是阻塞狀態。如果在掛起過程中具備了運行條件(即不再阻塞),線程也不會往前運行。
再來看看Condition類的一些方法。首先是acquire和release,Condition內部也維護了一把鎖,默認是RLock類,所有關聯了同一個Condition對象的線程也都會遵守這把鎖規定的來進行運行。
Condition.wait([timeout]) 這個方法一定要在獲取鎖定之后調用,調用這個方法的Condition對象所在的線程會被掛起並且釋放這個線程獲得着的所有鎖,直到接到通知被喚醒或者超時(如果設置了Timeout的話),當被喚醒之后線程將重新獲取鎖定。
Condition.notify() notify就是上面所說的通知,調用這個方法之后會喚醒一個被掛起的線程。線程的選擇尚不明確,似乎是隨機的。需要注意的是notify方法只進行掛起的喚醒而不涉及鎖的釋放
Condition.notify_all() 喚醒所有掛起的線程
基於上面這幾個方法,就可以做出比較好的線程管理的demo了,比如下面這段網上常見的一個捉迷藏的模擬程序:
import threading,time class Seeker(threading.Thread): def __init__(self,cond,name): Thread.__init__(self) self.cond = cond self.name = name def run(self): time.sleep(1) #1.確保seeker晚於hider開始執行 self.cond.acquire() #4. hider的鎖釋放了所以這里獲得了鎖 print '我把眼睛蒙上了' self.cond.notify() #5.蒙上眼后通知hider,hider線程此時被喚醒並試圖獲取鎖,但是鎖還在seeker身上,所以hider被阻塞,seeker繼續往下 self.cond.wait() #6. seeker鎖被釋放並且掛起,hider就獲取鎖開始繼續往下運行了 print '我找到你了' self.cond.notify() #9.找到了之后通知hider,hider意圖獲取鎖但不行所以被阻塞,seeker往下 self.cond.release() #10.釋放鎖 print '我贏了' class Hider(threading.Thread): def __init__(self,cond,name): Thread.__init__(self) self.cond = cond self.name = name def run(self): self.cond.acquire() #2.hider獲取鎖 self.cond.wait() #3.hider被掛起然后釋放鎖 print '我已經藏好了' self.cond.notify() #7.藏好后通知seeker,seeker意圖獲取鎖,但是鎖在hider身上所以seeker被阻塞 self.cond.wait() #8.hider被掛起,釋放鎖,seeker獲取鎖,seeker繼續往下運行 self.cond.release() #11. 在此句之前一點,seeker釋放了鎖(#10),hider得到鎖,隨即這句hider釋放鎖 print '被你找到了' cond = threading.Condition() seeker = Seeker(cond,'seeker') hider = Hider(cond,'hider') seeker.start() hider.start() ''' 結果: 我把眼睛蒙上了 我已經藏好了 我找到你了 我贏了 被你找到了 '''
這里需要注意的是self.cond.release方法不能省,否則會引起死鎖。
● 以上的包裝線程的方式是一種面向過程的方法,下面介紹一下如何面向對象地來抽象線程
面向對象地抽象線程需要自定義一個類繼承Thread類。比如自定義class MyThread(Thread)。這個類的一個實例就是代表了一個線程,然后通過重載這個類中的run方法(是run,不是start!!但start的動作確實就是調用run)來執行具體的操作。此時鎖可以作為一個構造方法的參數,將一個鎖傳進不同的實例中以實現線程鎖控制。比如:
引用自http://www.cnblogs.com/tkqasn/p/5700281.html
#方法二:從Thread繼承,並重寫run() class MyThread(threading.Thread): def __init__(self,arg): super(MyThread, self).__init__()#注意:一定要顯式的調用父類的初始化函數。 self.arg=arg def run(self):#定義每個線程要運行的函數 time.sleep(1) print 'the arg is:%s\r' % self.arg for i in xrange(4): t =MyThread(i) t.start() print 'main thread end!'
Thread類還有以下的一些方法,自定義的類也可以調用
getName()
setName(...) //其實Thread類在構造方法中有一個name參數,可以為相應的線程取一個名字。這兩個方法就是相關這個名字屬性的
isAlive() 一個線程從start()開始到run()結束的過程中沒有異常,則其實alive的。
setDaemon(True/False) 是否設置一個線程為守護線程。當你設置一個線程為守護線程之后,程序不會等待這個線程結束再退出程序,可參考http://blog.csdn.net/u012063703/article/details/51601579
● 除了Thread類,threading中還有以下一些屬性,簡單介紹一下:
Timer類,Timer(int,target=func) 和Thread類類似,只不過它在int秒過后才以target指定的函數開始線程運行
currentThread() 獲得當前線程對象
activeCount() 獲得當前活動的線程總個數
enumerate() 獲得所有活動線程的列表
settrace(func) 設置一跟蹤函數,在run執行前執行
setprofile(func) 設置一跟蹤函數,在run執行完畢之后執行
以上內容是目前我所能駕馭的,而threading類還有很多很NB的東西比如RLock類,Condition類,Event類等等。沒什么時間再仔細研究它們,先寫到這里為止。
Queue
Queue用於建立和操作隊列,常和threading類一起用來建立一個簡單的線程隊列。
首先,隊列有很多種,根據進出順序來分類,可以分成
Queue.Queue(maxsize) FIFO(先進先出隊列)
Queue.LifoQueue(maxsize) LIFO(先進后出隊列)
Queue.PriorityQueue(maxsize) 為優先級越高的越先出來,對於一個隊列中的所有元素組成的entries,優先隊列優先返回的一個元素是sorted(list(entries))[0]。至於對於一般的數據,優先隊列取什么東西作為優先度要素進行判斷,官方文檔給出的建議是一個tuple如(priority, data),取priority作為優先度。
如果設置的maxsize小於1,則表示隊列的長度無限長
FIFO是常用的隊列,其一些常用的方法有:
Queue.qsize() 返回隊列大小
Queue.empty() 判斷隊列是否為空
Queue.full() 判斷隊列是否滿了
Queue.get([block[,timeout]]) 從隊列頭刪除並返回一個item,block默認為True,表示當隊列為空卻去get的時候會阻塞線程,等待直到有有item出現為止來get出這個item。如果是False的話表明當隊列為空你卻去get的時候,會引發異常。在block為True的情況下可以再設置timeout參數。表示當隊列為空,get阻塞timeout指定的秒數之后還沒有get到的話就引發Full異常。
Queue.put(...[,block[,timeout]]) 向隊尾插入一個item,同樣若block=True的話隊列滿時就阻塞等待有空位出來再put,block=False時引發異常。同get的timeout,put的timeout是在block為True的時候進行超時設置的參數。
Queue.task_done() 從場景上來說,處理完一個get出來的item之后,調用task_done將向隊列發出一個信號,表示本任務已經完成
Queue.join() 監視所有item並阻塞主線程,直到所有item都調用了task_done之后主線程才繼續向下執行。這么做的好處在於,假如一個線程開始處理最后一個任務,它從任務隊列中拿走最后一個任務,此時任務隊列就空了但最后那個線程還沒處理完。當調用了join之后,主線程就不會因為隊列空了而擅自結束,而是等待最后那個線程處理完成了。
結合threading和Queue可以構建出一個簡單的生產者-消費者模型,比如:
下面的代碼引用自http://blog.csdn.net/l1902090/article/details/24804085
import threading import Queue import time class worker(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue=queue self.thread_stop=False def run(self): while not self.thread_stop: print("thread%d %s: waiting for tast" %(self.ident,self.name)) try: task=q.get(block=True, timeout=20)#接收消息 except Queue.Empty: print("Nothing to do!i will go home!") self.thread_stop=True break print("task recv:%s ,task No:%d" % (task[0],task[1])) print("i am working") time.sleep(3) print("work finished!") q.task_done()#完成一個任務 res=q.qsize()#判斷消息隊列大小 if res>0: print("fuck!There are still %d tasks to do" % (res)) def stop(self): self.thread_stop = True if __name__ == "__main__": q=Queue.Queue(3) worker=worker(q) worker.start() q.put(["produce one cup!",1], block=True, timeout=None)#產生任務消息 q.put(["produce one desk!",2], block=True, timeout=None) q.put(["produce one apple!",3], block=True, timeout=None) q.put(["produce one banana!",4], block=True, timeout=None) q.put(["produce one bag!",5], block=True, timeout=None) print("***************leader:wait for finish!") q.join()#等待所有任務完成 print("***************leader:all task finished!")
(嗯。。姑且不論他的F-word哈哈哈,開玩笑的,這例子還可以,至少很清晰地說明了如何把這兩個模塊結合起來用)
輸出是這樣的:
thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one cup! ,task No:1 i am working work finished! fuck!There are still 3 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one desk! ,task No:2 i am workingleader:wait for finish! work finished! fuck!There are still 3 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one apple! ,task No:3 i am working work finished! fuck!There are still 2 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one banana! ,task No:4 i am working work finished! fuck!There are still 1 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one bag! ,task No:5 i am working work finished! thread139958685849344 Thread-1: waiting for tast 1 ***************leader:all task finished! Nothing to do!i will go home!
運行一下就知道,上例中並沒有性能的提升(畢竟還是只有一個線程在跑)。線程隊列的意義並不是進一步提高運行效率,而是使線程的並發更加有組織。可以看到,在增加了線程隊列之后,程序對於線程的並發數量就有了控制。新線程想要加入隊列開始執行,必須等一個既存的線程完成之后才可以。舉個例子,比如
for i in range(x): t = MyThread(queue) t.start()
x在這里是個變量,我們不知道這個循環會觸發多少線程並發,如果多的話就會很冒險。但是有了隊列之后,把一個隊列作為所有線程構建線程對象時的一個參數,讓線程必須按照這個隊列規定的大小來執行的話,就不擔心過多線程帶來的危險了。
■ 線程池實現
不得不說一年前還是太simple。。 一年后再來補充點內容吧
首先我們要明確,線程池,線程,隊列這幾個概念之間的區別和聯系。
舉一個不太恰當的例子。比如有五個很餓的人去吃旋轉壽司。旋轉壽司店里有一個傳送帶,將壽司運送到他們面前。他們一字排開坐好准備好吃,當壽司過來,食客可能會選擇一個喜歡的口味開吃。在吃的過程中,他通常就不會再去“吃着碗里看着傳送帶上的”了。之所以是很餓的人,因為我們假定他們一旦吃完一盤就會立刻着手下一盤,毫不停歇。
在這個場景中,五個人組成的集體是線程池,每個人就是一個線程,而旋轉壽司的傳送帶是隊列,每盤壽司就是一個隊列中的任務。之所以說這個例子不太恰當,是因為場景中食客可以自己選擇想吃的壽司而線程池-隊列中,隊列才是任務分配的主導。就好比是傳送帶發現某個食客說他已經吃完一盤壽司,還想再來一盤的時候,會不顧食客的喜好,強行將一盤壽司推到一個空閑的食客面前讓他吃。
更加抽象點來說,線程在這個語境中其實就像是一個工具,而線程池就是一個工具的集合。由於通常一個線程池面向的是一類任務,所以線程池中的線程基本上也是同質的。即上述的五個食客是五胞胎(誤hh)。另一方面,之所以說面向的是一類任務,是因為隊列中的任務通常是具有某些共性的。共性程度高低取決於隊列以及線程池的具體實現,但是肯定是有的。這就好比壽司可以有握り,巻き而上面的具可以有いくら、マグロ、ウニ但是歸根結底肯定還是要有米飯的。
在正式的開發中,隊列通常是由第三方服務提供比如RabbitMQ,Redis等。而線程池通常由程序自己實現。下面這段代碼則是在一個python程序中,基於Queue加上自制的建議線程池建立起來的模型。
# -*- coding:utf-8 -*- import threading import Queue import time import random from faker import Faker class MyThread(threading.Thread): ''' 線程模型 ''' def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue self.start() # 因為作為一個工具,線程必須永遠“在線”,所以不如讓它在創建完成后直接運行,省得我們手動再去start它 def run(self): while True: # 除非確認隊列中已經無任務,否則時刻保持線程在運行 try: task = self.queue.get(block=False) # 如果隊列空了,直接結束線程。根據具體場景不同可能不合理,可以修改 time.sleep(random.random()) # 假設處理了一段時間 print 'Task %s Done' % task # 提示信息而已 self.queue.task_done() except Exception,e: break class MyThreadPool(): def __init__(self,queue,size): self.queue = queue self.pool = [] for i in range(size): self.pool.append(MyThread(queue)) def joinAll(self): for thd in self.pool: if thd.isAlive(): thd.join() if __name__ == '__main__': q = Queue.Queue(10) fake = Faker() for i in range(5): q.put(fake.word()) pool = MyThreadPool(queue=q,size=2) pool.joinAll()
網上有一部分示例,將隊列作為一個屬性維護在了線程池類中,也不失為一種辦法,我這里為了能夠條理清晰,沒有放在類里面。這段程序首先生成了一個maxsize是10的隊列。fake.word()可以隨機生成一個單詞,這里僅作測試用。所以向隊列中添加了5個task。
這里有個坑: 如果put的數量大於隊列最大長度,而且put沒有設置block=False的話,那么顯然程序會阻塞在put這邊。此時ThreadPool未被建立,也就是說工作線程都還沒有啟動,因此會引起這樣一個死鎖。如果把線程池的建立放到put之前也不行,此時線程發現隊列為空,所以所有線程都會直接結束(當然這是線程中get的block是False的時候,如果為True那么也是死鎖),最終隊列中的task沒人處理,程序輸出為空。解決這個坑的辦法,一個是像上面一樣保持最開始put的量小於隊列長度;第二個就是干脆不要限制隊列長度,用q = Queue.Queue()生產隊列即可。
好的,繼續往下,進入了線程池的生成。線程池內部的列表才是真·線程池,另外其關聯了queue對象,所以在創建的時候可以將隊列對象傳遞給線程對象。線程對象在創建時就啟動了,並且被添加到線程池的那個列表中。線程池的大小由參數給出,線程啟動后會去隊列里面get任務,並且進行處理。處理完成后進行task_done聲明並且再次去嘗試get。如果隊列為空那么就直接拋出異常,也就是跳出循環,線程結束。
通過這樣一個模型,根據線程池的大小,這才真正地給線程並發做了一個限制,可促進較大程度的資源利用。
● 進一步地…
在上面這個示例中,實際上處理任務的實際邏輯是被寫在了MyThread類里面。如果我們想要一個通用性更加高的工具類,那么勢必要想想如何將這個線程類解耦具體邏輯。另一方面,隊列中的任務的內容,不僅僅可以是字符串,也可以是任何python對象。這就使得靈活性大大提高。
比如我們可以在隊列中put內容是(func, args, kwargs)這樣一個元組。其中func是一個函數對象,描述了任務的處理邏輯過程,args是一個元組,代表所有func函數的匿名參數,kwargs則是func函數的所有具名參數。如此,可以將線程類的run方法改寫成這樣:
def run(self): while True: try: func,args,kwargs = self.queue.get() try: func(*args,**kwargs) except Exception,e: raise ('bad execution: %s' % str(e)) self.queue.task_done() except Exception,e: break
這樣一個run就可以做到很大程度的解耦了。
類似的思想,線程池類和線程類也不必是一一對應的。可以將線程類作為一個參數傳遞給線程池類。這樣一個線程池類就可以作為容器容納各種各樣的線程了。具體實例就不寫了。