Python多線程編程


1.全局解釋器鎖定

    Python虛擬機使用GIL(Global Interpreter Lock,全局解釋器鎖定)來互斥線程對共享資源的訪問,暫時無法利用多處理器的優勢。雖然python解釋器可以“運行”多個線程,但在任意時刻,不管有多少的處理器,任何時候都總是只有一個線程在執行。對於I/O密集型任務,使用線程一般是沒有問題的,而對於涉及大量CPU計算的應用程序而言,使用線程來細分工作沒有任何好處,用戶最好使用子進程和消息傳遞。

2.threading

     python的threading模塊提供Thread類和各種同步原語,用於編寫多線程的程序。

2.1. Thread(target=None,name=None,args=(),kwargs={})

    此函數創建一個Thread實例。target是一個可調用函數,線程啟動時,run()方法將調用此對象。name是線程的名稱,默認是'Thread-N'的格式。args是傳遞給target函數的參數元組,kwargs是傳遞給target函數的關鍵字參數的字典。

    Thread實例t支持以下方法和屬性:

  • t.start()                 啟動線程,就是調用run()方法
  • t.run()                   可以在Thread的子類中重新定義
  • t.join([timeout])      阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數)。
  • t.is_live()                返回線程的活動狀態
  • t.name                   線程的名稱
  • t.ident                    線程標識符
  • t.daemon                設置線程是否為守護線程,必須在t.start()前設置。當設置為True時,主線程要退出時,不必等守護線程完成。'

    創建線程有兩種方法:

  1. 創建一個Thread實例,傳遞給它一個函數
import threading
import time
def clock(nsec):
    whhile True:
        print 'Now is %s'%time.ctime()
        time.sleep(nsec)
t=threading.Thread(target=clock,args=(5,))
t.daemon=True  #設置為守護線程
t.start()

     2. 從Thread派生出一個子類,然后創建一個子類的實例

import threading
import time
class ClockThread(threading.Thread):
    def __init__(self,nsec):
        threading.Thread.__init__(self)
        self.daemon=True   #設置為守護線程
        self.nsec=nsec
    def run():
        while True:
            print 'Now is s%'%time.ctime()
            time.sleep(self.nsec)
t=ClockThread(5)
t.start()

  后一種方法比較python一點。

      由於線程會無限循環,所以設置daemonTrue,這樣當進程結束時,線程也將被銷毀。

      例如有個數數程序,一個線程從1數到9,另一個線程從a數到j,每個線程都要耗費9s,如果要順序執行的話需耗費18s。

import threading
import time
class CountThread(threading.Thread):
    def __init__(self,func,name):
        threading.Thread.__init__(self)
        self.name=str(name)
        self.func=func
    def run(self):
        apply(self.func)
def numcount():
    print threading.currentThread().name,'start at : ',time.ctime()
    for i in range(10):
        print i
        time.sleep(1)
    print threading.currentThread().name,'done at : ',time.ctime()
def alphacount():
    print threading.currentThread().name,'start at : ',time.ctime()
    for i in range(97,107):
        print chr(i)
        time.sleep(1)
    print threading.currentThread().getName(),'done at : ',time.ctime()
def main():
    funclist=[numcount,alphacount]
    threads=[]
    for i in funclist:
        t=CountThread(i,i.__name__)
        threads.append(t)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print 'All done at :',time.ctime()
if __name__=='__main__':
    main()

  結果:

numcount  start at :    Fri Feb 07 12:19:28
alphacount  start at : Fri Feb 07 12:19:28 2014

a0

b1

2c

3d

4
 e
5f

6g

7
 h
8
 i
9j

alphacount numcount  done at :  done at : Fri Feb 07 12:19:38 2014 
Fri Feb 07 12:19:38 2014
All done at : Fri Feb 07 12:19:38 2014

    10s就完成了。

    舉一個更清晰的看t.join()作用的例子:

import threading
import time
def join():
    print 'in Threadjoin'
    time.sleep(1)
    print 'out Threadjoin'
Threadjoin=threading.Thread(target=join,name='Threadjoin')
def context(Threadjoin):
    print 'in Threadcontext'
    Threadjoin.start()
    Threadjoin.join()   #Threadjoin線程開始阻塞,等待Threadjoin完成
    print 'out Threadcontext'
Threadcontext=threading.Thread(target=context,name='Threadcontext',args=(Threadjoin,))
Threadcontext.start()

  結果:

>>> 
in Threadcontext
in Threadjoin
out Threadjoin
out Threadcontext

2.2. 線程的同步

    線程運行在創建它的進程內部,共享所有的數據和資源,但都有自己獨立的棧和堆。編寫並發編程的難點在於同步和訪問共享數據。多個任務同時更新一個數據結構可能導致數據損壞和程序狀態不一致(也就是競爭條件)。要解決這個問題,必須找出程序的關鍵代碼段,並使用互斥鎖和其它類似的同步手段保護他們。

2.2.1 Lock

    原語鎖定(互斥鎖定)是一個同步原語,狀態是"已鎖定"或"未鎖定"之一。兩個方法acquire()和release()用於修改鎖定的狀態。如果有多個線程在等待獲取鎖定,當鎖定釋放時,只有一個線程能獲得它。

構造方法: 
Lock()  :創建新的Lock對象,初始狀態為未鎖定

實例方法: 
Lock.acquire([timeout]): 使線程進入同步阻塞狀態,嘗試獲得鎖定。 成功獲取鎖定返回True,無法獲取鎖定返回False。
Lock.release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

    Python多線程分塊讀取大文件:

import threading
import os
seekposition=0
blocksize=1000000
filesize=0
def getFilesize(filename):
    f=open(filename)
    f.seek(0,os.SEEK_END)
    filesize=f.tell()
    f.close()
    return filesize

def parsefile(filename):
    global seekposition,filesize
    f=open(filename)
    
    while True:
        lock.acquire()   #seekposition是線程共享的,修改時需要鎖定
        startposition=seekposition
        endposition=(startposition+blocksize) if (startposition+blocksize)<filesize else filesize
        seekposition=endposition
        lock.release()
        if startposition==filesize:
            break
        elif startposition>0:
            f.seek(startposition)
            f.readline()    #分成的block第一行可能不是完整的一行,略掉不處理,而是作為上一個block的最后一行處理
        position=f.tell()
        outfile=open(str(endposition)+'.txt','w')
        while position<=endposition:
            line=f.readline()
            outfile.write(line)
            position=f.tell()
        outfile.close()
    f.close()

def main(filename):
    global seekposition,filesize
    filesize=getFilesize(filename)
    lock=threading.Lock()
    threads=[]
    for i in range(4):
        t=threading.Thread(target=parsefile,args=(filename,))
        threads.append(t)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
if __name__=='__main__':
    filename=''
    main(filename)

2.2.2 RLock

    多重鎖定是一個類似於Lock對象的同步原語,但同一個線程可以多次獲取它。這允許擁有鎖定的線程執行嵌套的acquire()和release()操作。可以認為RLock包含一個鎖定池和一個初始值為0的計數器,每次成功調用 acquire()/release(),計數器將+1/-1,為0時鎖處於未鎖定狀態。

import threading
import time
rlock=threading.RLock()
count=0
class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        global count
        if rlock.acquire():
            count+=1
            print '%s set count : %d'%(self.name,count)
            time.sleep(1)
            if rlock.acquire():
                count+=1
                print '%s set count : %d'%(self.name,count)
                time.sleep(1)
                rlock.release()
            rlock.release()
if __name__=='__main__':
    for i in range(5):
        t=MyThread()
        t.start()

2.2.3 信號量Semaphore

    信號量是一個基於計數器的同步原語,調用acquire()方法時此計數器減1,調用release()方法時此計數器加1.如果計數器為0,acquire()方法會被阻塞,直到其他線程調用release()為止。

    下面是一個說明信號量的好例子,引用自http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html

import threading
import time
 
semaphore = threading.Semaphore(2)   # 計數器初值為2
 
def func():
    
    # 請求Semaphore,成功后計數器-1;計數器為0時阻塞
    print '%s acquire semaphore...' % threading.currentThread().getName()
    if semaphore.acquire():
        
        print '%s get semaphore' % threading.currentThread().getName()
        time.sleep(4)
        
        # 釋放Semaphore,計數器+1
        print '%s release semaphore' % threading.currentThread().getName()
        semaphore.release()
 
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t4 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()
t4.start()
 
time.sleep(2)
 
# 沒有獲得semaphore的主線程也可以調用release
# 若使用BoundedSemaphore,t4釋放semaphore時將拋出異常
print 'MainThread release semaphore without acquire'
semaphore.release()

2.2.4 Condition

    條件變量是構建在另一個鎖定上的同步原語。典型的用法是生產者-使用者問題,其中一個線程生產的數據供另一個線程使用。

    構造方法:
    Condition([lock/rlock])

    實例方法: 
    acquire([timeout])/release(): 調用關聯的鎖的相應方法。 
    wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖,直到另一個線程在條件變量上執行notify()或notify_all()方法將其喚醒為止。使用前線程必須已獲得鎖定,否則將拋出異常。 
    notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試獲得鎖定(進入鎖定池);其他線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。 
    notify_all(): 喚醒所有等待此條件的線程。

import threading
cv = threading.Condition()
alist = []


def producer():
    global alist   
    cv.acquire()
    print 'producer acquire lock'
    for i in range(10):
        alist.append(i)
    cv.notify()
    print 'producer notify wait threading'
    cv.release()
    print 'producer release lock'


def consumer():
    cv.acquire()
    print 'consumer acquire lock'
    while not alist:
        print 'consumer wait and release lock'
        cv.wait()
    print 'wait threading acquire lock'
    cv.release()
    print 'consumer release lock'
    print alist

 
tproducer = threading.Thread(target=producer)
tconsumer = threading.Thread(target=consumer)
tconsumer.start()
tproducer.start()

    運行:

consumer acquire lock
consumer wait and release lock
producer acquire lock
producer notify wait threading
producer release lock 
wait threading acquire lock
consumer release lock
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    調用wait方法時,線程會自動釋放鎖並進入等待池等待通知,當接收到notify的通知時,會自動獲得鎖。

2.3 local()

    返回local對象,用於保存線程的數據,管理 thread-local(線程局部的)數據。對於同一個local,線程無法訪問其他線程設置的屬性;線程設置的屬性不會被其他線程設置的同名屬性替換。 可以把local看成是一個“線程-屬性字典”的字典,local封裝了從自身使用線程作為 key檢索對應的屬性字典、再使用屬性名作為key檢索屬性值的細節。 

import threading 
mydata=threading.local()
mydata.number=42
mydata.color='red'
print mydata.__dict__
log=[]

def foo():
    items=mydata.__dict__.items()  #在此線程中mydata屬性字典為空,無number與color屬性
    items.sort()
    log.append(items)
    mydata.number=11
    log.append(mydata.number)

t=threading.Thread(target=foo)
t.start()
t.join()

print log
print mydata.number   #仍為42

3. Queue

    盡管在Python中可以使用各種鎖定和同步原語的組合編寫非常傳統的多線程程序,但有一種更優的編程方式——即將多線程程序組織為多個獨立任務的集合,這些線程之間通過消息隊列進行通訊。Queue模塊可以用於線程間通訊,讓各個線程共享數據。

    構造方法:

    Queue():創建一個FIFO隊列
    LifoQueue():創建一個LIFO棧

    實例方法: 
    q.put(item):將item放入隊列
    q.get():從隊列中刪除一項,然后返回這個項目
    q.task_done():隊列中數據的使用者用來指示對於項目的處理已結束。從隊列中刪除的每一項都應該調用一次。
    q.join():阻塞直到隊列中的所有項目均被處理為止。

    python核心編程中有關於多線程編程和Queue結合使用的思路:

  • UserThread:負責讀取客戶的輸入,可能是一個I/O通道。程序可以創建多個線程,每個客戶一個,輸入放置到隊列中。
  • ReplyThread:負責把用戶的輸入取出來。
import threading
import Queue
q=Queue.Queue()
class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.daemon=True
    def run(self):
        while True:
            item=q.get()
            print threading.current_thread().name,'get',item
            q.task_done()
for i in range(4):
    t=MyThread()
    t.start()
for i in range(100):
    q.put(i)
q.join()

4. 線程終止與掛起

    下面選自《Python參考手冊》

    線程沒有任何方法可用於強制終止或掛起。由於在設計上,如果某個線程獲取了鎖定,在它釋放之前強制終止線程,將導致整個應用程序出現死鎖。

    可以自己構建終止功能:

import threading
class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self._terminate=False  #設置終止標志
        self.lock=threading.Lock()
    def terminal(self):
        self._terminal=True
    def acquire(self):
        self.lock.acquire()
    def release(self):
        self.lock.release()
    def run(self):
        while True:
            if self._terminal:   #標志為True,則終止線程
                break
            self.lock.acquire()
            statements
            self.lock.release()
            statements

  也可以利用Queue傳遞終止信號

import threading
import Queue

class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.queue=Queue.Queue()
    def send(self,item):
        self.queue.put(item)
    def close(self):
        self.queue.put(None)
        self.queue.join()
    def run(self):
        while True:
            item=self.queue.get()
            if item is None:
                break
            print item
            self.queue.task_done()
        self.queue.task_done()
t=MyThread()
t.start()
t.send('hello')
t.send('world')
t.close()

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM