多線程threading模塊


python的多線程編程

簡介

多線程編程技術可以實現代碼並行性,優化處理能力,同時功能的更小划分可以使代碼的可重用性更好。Python中threading和Queue模塊可以用來實現多線程編程。

詳解

線程和進程

進程(有時被稱為重量級進程)是程序的一次執行。每個進程都有自己的地址空間、內存、數據棧以及其它記錄其運行軌跡的輔助數據。操作系統管理在其上運行的所有進程,並為這些進程公平地分配時間。進程也可以通過fork和spawn操作來完成其它的任務,不過各個進程有自己的內存空間、數據棧等,所以只能使用進程間通訊(IPC),而不能直接共享信息。
線程(有時被稱為輕量級進程)跟進程有些相似,不同的是所有的線程運行在同一個進程中,共享相同的運行環境。它們可以想像成是在主進程或“主線程”中並行運行的“迷你進程”。線程有開始、順序執行和結束三部分,它有一個自己的指令指針,記錄自己運行到什么地方。線程的運行可能被搶占(中斷)或暫時的被掛起(也叫睡眠)讓其它的線程運行,這叫做讓步。一個進程中的各個線程之間共享同一片數據空間,所以線程之間可以比進程之間更方便地共享數據以及相互通訊。線程一般都是並發執行的,正是由於這種並行和數據共享的機制使得多個任務的合作變為可能。實際上,在單CPU的系統中,真正的並發是不可能的,每個線程會被安排成每次只運行一小會,然后就把CPU讓出來,讓其它的線程去運行。在進程的整個運行過程中,每個線程都只做自己的事,在需要的時候跟其它的線程共享運行的結果。多個線程共同訪問同一片數據不是完全沒有危險的,由於數據訪問的順序不一樣,有可能導致數據結果的不一致的問題,這叫做競態條件。而大多數線程庫都帶有一系列的同步原語,來控制線程的執行和數據的訪問。

使用線程

(1)全局解釋器鎖(GIL)
Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中可以“運行”多個線程,但在任意時刻只有一個線程在解釋器中運行。
對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。在多線程環境中,Python 虛擬機按以下方式執行:a、設置 GIL;b、切換到一個線程去運行;c、運行指定數量的字節碼指令或者線程主動讓出控制(可以調用 time.sleep(0));d、把線程設置為睡眠狀態;e、解鎖 GIL;d、再次重復以上所有步驟。
在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束為止(由於在這期間沒有Python的字節碼被運行,所以不會做線程切換)編寫擴展的程序員可以主動解鎖GIL。
(2)退出線程
當一個線程結束計算,它就退出了。線程可以調用thread.exit()之類的退出函數,也可以使用Python退出進程的標准方法,如sys.exit()或拋出一個SystemExit異常等。不過,不可以直接“殺掉”("kill")一個線程。
不建議使用thread模塊,很明顯的一個原因是,當主線程退出的時候,所有其它線程沒有被清除就退出了。另一個模塊threading就能確保所有“重要的”子線程都退出后,進程才會結束。
(3)Python的線程模塊
Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊允許程序員創建和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊允許用戶創建一個可以用於多個線程之間共享數據的隊列數據結構。
避免使用thread模塊,因為更高級別的threading模塊更為先進,對線程的支持更為完善,而且使用thread模塊里的屬性有可能會與threading出現沖突;其次低級別的thread模塊的同步原語很少(實際上只有一個),而threading模塊則有很多;再者,thread模塊中當主線程結束時,所有的線程都會被強制結束掉,沒有警告也不會有正常的清除工作,至少threading模塊能確保重要的子線程退出后進程才退出。

threading模塊

thread模塊不支持守護線程,當主線程退出時,所有的子線程不論它們是否還在工作,都會被強行退出。而threading模塊支持守護線程,守護線程一般是一個等待客戶請求的服務器,如果沒有客戶提出請求它就在那等着,如果設定一個線程為守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。如果主線程退出不用等待那些子線程完成,那就設定這些線程的daemon屬性,即在線程thread.start()開始前,調用setDaemon()函數設定線程的daemon標志(thread.setDaemon(True))就表示這個線程“不重要”。如果想要等待子線程完成再退出,那就什么都不用做或者顯式地調用thread.setDaemon(False)以保證其daemon標志為False,可以調用thread.isDaemon()函數來判斷其daemon標志的值。新的子線程會繼承其父線程的daemon標志,整個Python會在所有的非守護線程退出后才會結束,即進程中沒有非守護線程存在的時候才結束。
1)threading的Thread類
它有很多thread模塊里沒有的函數,Thread對象的函數:

201549112739440.png

創建一個Thread的實例,傳給它一個函數

#!/usr/bin/env python
import threading
from time import sleep,ctime
loops=[4,2]
def loop(nloop,nsec):
        print 'start loop',nloop,'at:',ctime()
        sleep(nsec)
        print 'loop',nloop,'done at:',ctime()
def main():
        print '***starting at:',ctime()
        threads=[]
        nloops=range(len(loops))
        for i in nloops:
                t=threading.Thread(target=loop,args=(i,loops[i]))
                threads.append(t)
        for i in nloops:
                threads[i].start()
        for i in nloops:
                threads[i].join()
        print 'all done at',ctime()
if __name__=='__main__':
        main()

運行結果:

root@hanfeifei-HP-ProDesk-680-G2-MT:/mnt/han# python threading1.py 
***starting at: Sun Jul 31 17:37:39 2016
start loop 0 at: Sun Jul 31 17:37:39 2016
start loop 1 at: Sun Jul 31 17:37:39 2016
loop 1 done at: Sun Jul 31 17:37:41 2016
loop 0 done at: Sun Jul 31 17:37:43 2016
all done at Sun Jul 31 17:37:43 2016

實例化一個Thread(調用 Thread())與調用thread.start_new_thread()之間最大的區別就是,新的線程不會立即開始。在創建線程對象,但不想馬上開始運行線程的時候,這是一個很有用的同步特性。所有的線程都創建了之后,再一起調用 start()函數啟動,而不是創建一個啟動一個。而且也不用再管理一堆鎖(分配鎖、獲得鎖、釋放鎖、檢查鎖的狀態等),只要簡單地對每個線程調用join()主線程等待子線程的結束即可。join()還可以設置timeout的參數,即主線程等到超時為止。
join()的另一個比較重要的方面是它可以完全不用調用,一旦線程啟動后,就會一直運行,直到線程的函數結束,退出為止。如果主線程除了等線程結束外,還有其它的事情要做,那就不用調用 join(),只有在等待線程結束的時候才調用join()。

創建一個Thread的實例,傳給它一個可調用的類對象

#!/usr/bin/env python
import threading
from time import sleep, ctime
loops=[4,2]
class ThreadFunc(object):
	def __init__(self,func,args,name=''):
		self.name=name
		self.func=func
		self.args=args
	def __call__(self):
		apply(self.func,self.args)
def loop(nloop,nsec):
	print 'start loop',nloop,'at:',ctime()
	sleep(nsec)
	print 'loop',nloop,'done at:',ctime()
def main():
	print 'starting at:',ctime()
	threads=[]
	nloops=range(len(loops))
	for i in nloops:
		t=threading.Thread(target=ThreadFunc(loop,(i,loops[i]),loop.__name__))
		threads.append(t)
	for i in nloops:
		threads[i].start()
	for i in nloops:
		threads[i].join()
	print 'all Done at:',ctime()
if __name__=='__main__':
	main()
與傳一個函數很相似的另一個方法是在創建線程的時候,傳一個可調用的類的實例供線程啟動的時候執行,這是多線程編程的一個更為面向對象的方法。相對於一個或幾個函數來說,類對象里可以使用類的強大的功能。創建新線程的時候,Thread對象會調用ThreadFunc對象,這時會用到一個特殊函數__call__()。由於已經有了要用的參數,所以就不用再傳到Thread()的構造函數中。由於有一個參數的元組,這時要使用apply()函數或使用self.res = self.func(*self.args)。

從Thread派生出一個子類,創建一個這個子類的實例

	
#!/usr/bin/env python 
  
import threading 
from time import sleep, ctime 
  
loops = [ 4, 2 ] 
  
class MyThread(threading.Thread): 
  def __init__(self, func, args, name=''): 
    threading.Thread.__init__(self) 
    self.name = name 
    self.func = func 
    self.args = args 
  
  def getResult(self): 
    return self.res 
  
  def run(self): 
    print 'starting', self.name, 'at:', ctime() 
    self.res = apply(self.func, self.args) 
    print self.name, 'finished at:', ctime() 
  
def loop(nloop, nsec): 
  print 'start loop', nloop, 'at:', ctime() 
  sleep(nsec) 
  print 'loop', nloop, 'done at:', ctime() 
  
def main(): 
  print 'starting at:', ctime() 
  threads = [] 
  nloops = range(len(loops)) 
  
  for i in nloops: 
    t = MyThread(loop, (i, loops[i]), 
    loop.__name__) 
    threads.append(t) 
  
  for i in nloops: 
    threads[i].start() 
  
  for i in nloops: 
    threads[i].join() 
  
  print 'all DONE at:', ctime() 
  
if __name__ == '__main__': 
  main()

子類化Thread類,MyThread子類的構造函數一定要先調用基類的構造函數,特殊函數__call__()在子類中,名字要改為run()。在 MyThread類中,加入一些用於調試的輸出信息,把代碼保存到myThread模塊中,並導入這個類。除使用apply()函數來運行這些函數之外,還可以把結果保存到實現的self.res屬性中,並創建一個新的函數getResult()來得到結果。

Queue模塊

Queue模塊可以用來進行線程間通訊,讓各個線程之間共享數據。Queue解決生產者-消費者的問題,現在創建一個隊列,讓生產者線程把新生產的貨物放進去供消費者線程使用。生產者生產貨物所要花費的時間無法預先確定,消費者消耗生產者生產的貨物的時間也是不確定的。

#!/usr/bin/env python 
  
from random import randint 
from time import sleep 
from Queue import Queue 
from myThread import MyThread 
  
def writeQ(queue): 
  print '+++producing object for Q...', 
  queue.put('xxx', 1) 
  print "+++size now:", queue.qsize() 
  
def readQ(queue): 
  val = queue.get(1) 
  print '---consumed object from Q... size now', \ 
    queue.qsize() 
  
def writer(queue, loops): 
  for i in range(loops): 
    writeQ(queue) 
    sleep(randint(1, 3)) 
  
def reader(queue, loops): 
  for i in range(loops): 
    readQ(queue) 
    sleep(randint(2, 5)) 
  
funcs = [writer, reader] 
nfuncs = range(len(funcs)) 
  
def main(): 
  nloops = randint(2, 5) 
  q = Queue(32) 
  
  threads = [] 
  for i in nfuncs: 
    t = MyThread(funcs[i], (q, nloops), \ 
      funcs[i].__name__) 
    threads.append(t) 
  
  for i in nfuncs: 
    threads[i].start() 
  
  for i in nfuncs: 
    threads[i].join() 
  
  print '***all DONE'
  
if __name__ == '__main__': 
  main() 

~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM