Python:多線程編程


1.IO編程

IO(input/output)。凡是用到數據交換的地方,都會涉及io編程,例如磁盤,網絡的數據傳輸。在IO編程中,stream(流)是一種重要的概念,分為輸入流(input stream)和輸出流(output stream)。可以把流季節為一個水管,數據相當於水管中的水,但是只能單向流動,所以數據傳輸過程中需要假設兩個水管,一個負責輸入,一個負責輸出,這樣讀寫就可以實現同步。

2.GIL:global interpreter lock全局解釋器鎖

在Python中,GIL導致同一時刻只有一個線程進入解釋器。

計算機有些操作不依賴於CPU,I/O操作幾乎不利用,IO密集型不占用CPU,多線程可完成

計算操作都要用到CPU,適合多進程

  • IO密集型任務或函數,可以用多線程
  • 計算密集型任務函數,sorry,改C
  • 計算密集型程序適合C語言多線程,I/O密集型適合腳本語言開發的多線程

線程和進程的區別:

  • 線程共享創建它的進程的地址空間; 進程有自己的地址空間。
  • 線程可以直接訪問其進程的數據段; 進程有自己的父進程的數據段副本。
  • 線程可以直接與其進程的其他線程進行通信; 進程必須使用進程間通信來與兄弟進程進行通信。
  • 新線程很容易創建; 新進程需要父進程的重復。
  • 線程可以對相同進程的線程進行相當的控制;
  • 流程只能控制子進程。
  • 對主線程的更改(取消,優先級更改等)可能會影響進程的其他線程的行為; 對父進程的更改不會影響子進程。

進程優點:同時利用多個CPU,同時進行多個操作;缺點:耗費資源(需要開辟多個內存空間)

線程優點:共享多個資源,IO操作,創造並發操作;缺點:搶占資源

3.threading模塊

threading模塊對象
Thread    表示一個線程的執行的對象
Lock    鎖原語對象(跟thread模塊里的鎖對象相同)
RLock    可重入鎖對象。使單線程可以再次獲得已經獲得了的鎖(遞歸鎖定)
Condition條件變量對象能讓一個線程停下來,等待其他線程滿足了某個“條件”。如狀態的改變或值的改變
Event    通用的條件變量。多個線程可以等待某個時間的發生,在事件發生后,所有的線程都被激活
Semaphore    為等待鎖的線程提供一個類似“等候室”的結構
BoundedSemaphore    與Semaphore類似,只是它不允許超過初始值
Timer    與thread類似,只是它要等待一段時間后才開始運行
Barrier    創建一個障礙,必須達到指定數量的線程后才可以繼續

3.1線程的兩種調用方式

#直接調用
import time
import threading begin=time.time() def foo(n): print('start-foo%s'%n) time.sleep(1) print('end foo') def bar(n): print('start-bar%s'%n) time.sleep(2) print('end bar') # foo(2) # bar(2) t1=threading.Thread(target=foo,args=(1,)) t2=threading.Thread(target=bar,args=(1,)) t1.start() t2.start() end=time.time() print('_________main_________') print(t1.getName()) print(t2.getName()) print(end-begin)
#繼承式調用
import threading
import time class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): #定義每個線程要運行的函數 print("running on number:%s" % self.num) time.sleep(4) if __name__ == '__main__': t1 = MyThread(111) t2 = MyThread(222) t1.start() t2.start()

threading的Thread類主要的運行對象

函數
start()                開始線程的執行
run()                定義線程的功能的函數(一般會被子類重寫)
join(timeout=None) 程序掛起,直到線程結束;如果給了timeout,則最多阻塞timeout秒 getName() 返回線程的名字 setName(name) 設置線程的名字 isAlive() 布爾標志,表示這個線程是否還在運行中 isDaemon() 返回線程的daemon標志 setDaemon(daemonic) 把線程的daemon標志設為daemonic(一定要在調用start()函數前調用)
threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。

3.2 join and setDaemon

import threading
from time import ctime,sleep def music(func): for i in range(2): print ("Begin listening to %s. %s" %(func,ctime())) sleep(2) print("end listening %s"%ctime()) def move(func): for i in range(2): print ("Begin watching at the %s! %s" %(func,ctime())) sleep(5) print('end watching %s'%ctime()) threads = [] t1 = threading.Thread(target=music,args=('七里香',)) threads.append(t1) t2 = threading.Thread(target=move,args=('阿甘正傳',)) threads.append(t2) # join在子線程完成運行之前,這個子線程的父線程將一直被阻塞。 if __name__ == '__main__': t2.setDaemon(True) for t in threads: t.setDaemon(True)#守護線程,必須在start() 方法調用之前設置, 如果不設置為守護線程程序會被無限掛起。  t.start() # t.join()#跟線程沒有關系了,像正常一樣運行,沒意義  t1.join() # t2.join()########考慮這三種join位置下的結果? print(threading.current_thread()) print(threading.active_count()) print ("all over %s" %ctime())

join()會等到線程結束,或者在給了timeout參數的時候,等到超時為止。使用join()比使用一個等待鎖釋放的無限循環清楚一些(也稱“自旋鎖”)。

join()的另一個比較重要的方法是它可以完全不用調用。一旦線程啟動后,就會一直運行,直到線程的函數結束,退出為止。
如果你的主線程除了等線程結束外,還有其他的事情要做(如處理或等待其他的客戶請求),那就不用調用join(),只有在你要等待線程結束的時候才要調用join()。

setDaemon(True):將線程聲明為守護線程,必須在start() 方法調用之前設置, 如果不設置為守護線程程序會被無限掛起。這個方法基本和join是相反的。當我們 在程序運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程 就分兵兩路,分別運行,那么當主線程完成想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成后再退出。但是有時候我們需要的是 只要主線程完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法

3.3 同步鎖Lock

由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖 - 同一時刻允許一個線程執行操作。

import time
import threading def addNum(): global num #在每個線程中都獲取這個全局變量 num-=1 temp=num print('--get num:',num ) print('看下結果是0') print('再看下結果') time.sleep(0.001) num =temp-1 #對此公共變量進行-1操作  num = 100 #設定一個共享變量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢  t.join() print('final num:', num )

注意:

1:  why num-=1沒問題呢?這是因為動作太快(完成這個動作在切換的時間內)

2: if sleep(1),現象會更明顯,100個線程每一個一定都沒有執行完就進行了切換,我們說過sleep就等效於IO阻塞,1s之內不會再切換回來,所以最后的結果一定是99.

多個線程都在同時操作同一個共享資源,所以造成了資源破壞,怎么辦呢?

有同學會想用join唄,但join會把整個線程給停住,造成了串行,失去了多線程的意義,而我們只需要把計算(涉及到操作公共數據)的時候串行執行。

我們可以通過同步鎖來解決這種問題

import time
import threading begin=time.time() def addNum(): global num #在每個線程中都獲取這個全局變量 # num-=1  lock.acquire() temp=num # print('--get num:',num ) time.sleep(0.0001) num =temp-1 #對此公共變量進行-1操作  lock.release() num = 100 #設定一個共享變量 thread_list = [] lock=threading.Lock() for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢  t.join() end=time.time() print('final num:', num ) print(end-begin)

同步鎖與GIL的關系?

Python的線程在GIL的控制之下,線程之間,對整個python解釋器,對python提供的C API的訪問都是互斥的,這可以看作是Python內核級的互斥機制。但是這種互斥是我們不能控制的,我們還需要另外一種可控的互斥機制———用戶級互斥。內核級通過互斥保護了內核的共享資源,同樣,用戶級互斥保護了用戶程序中的共享資源。

GIL 的作用是:對於一個解釋器,只能有一個thread在執行bytecode。所以每時每刻只有一條bytecode在被執行一個thread。GIL保證了bytecode 這層面上是thread safe的。
但是如果你有個操作比如 x += 1,這個操作需要多個bytecodes操作,在執行這個操作的多條bytecodes期間的時候可能中途就換thread了,這樣就出現了data races的情況了。

Lock(指令鎖)是可用的最低級的同步指令。Lock處於鎖定狀態時,不被特定的線程擁有。Lock包含兩種狀態——鎖定和非鎖定,
以及兩個基本的方法。可以認為Lock有一個鎖定池,當線程請求鎖定時,將線程至於池中,直到獲得鎖定后出池。
池中的線程處於狀態圖中的同步阻塞狀態。
構造方法:Lock()
實例方法:
acquire([timeout]): 使線程進入同步阻塞狀態,嘗試獲得鎖定。
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

3.4 線程死鎖和遞歸鎖

在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所有這兩個線程在無外力作用下將一直等待下去。

RLock(可重入鎖)是一個可以被同一個線程請求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級”的概念,
處於鎖定狀態時,RLock被某個線程擁有。擁有RLock的線程可以再次調用acquire(),釋放鎖時需要調用release()相同次數。
可以認為RLock包含一個鎖定池和一個初始值為0的計數器,每次成功調用 acquire()/release(),計數器將+1/-1,為0時鎖處於未鎖定狀態。
構造方法:RLock()
實例方法:acquire([timeout])/release(): 跟Lock差不多。

#__author: greg
#date: 2017/9/17 21:38
# 線程死鎖和遞歸鎖
# Thread deadlocks and recursive locks
import threading,time
class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        lockA.release()
        lockB.release()

    def run(self):
        self.doA()
        self.doB()

if __name__=="__main__":
    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()#等待線程結束
View Code

解決辦法:使用遞歸鎖,將 lockA=threading.Lock() lockB=threading.Lock()

改為Rlock
為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。

#__author: greg
#date: 2017/9/17 21:49
import threading,time
class myThread(threading.Thread):
    def doA(self):
        lock.acquire()
        print(self.name,"gotlockA",time.ctime())
        # time.sleep(2)
        lock.acquire()
        print(self.name,"gotlockB",time.ctime())
        lock.release()
        lock.release()

    def doB(self):
        lock.acquire()
        print(self.name,"gotlockC",time.ctime())
        # time.sleep(3)
        lock.acquire()
        print(self.name,"gotlockD",time.ctime())
        lock.release()
        lock.release()

    def run(self):
        self.doA()
        self.doB()

if __name__=="__main__":
    # lockA=threading.Lock()
    # lockB=threading.Lock()
    lock = threading.RLock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()#等待線程結束
View Code

3.5 單線程和多線程執行對比

import threading
from time import ctime, sleep

class MyThread(threading.Thread):
    def __init__(self, func, args, name='', verb=False):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args
        self.verb = verb

    def getResult(self):
        return self.res

    def run(self):
        if self.verb:
            print('starting', self.name, 'at:', ctime())
        self.res = self.func(*self.args)
        if self.verb:
            print(self.name, 'finished at:', ctime())

def fib(x):
    sleep(0.005)
    if x < 2: return 1
    return (fib(x-2) + fib(x-1))

def fac(x):
    sleep(0.1)
    if x < 2: return 1
    return (x * fac(x-1))

def sum(x):
    sleep(0.1)
    if x < 2: return 1
    return (x + sum(x-1))

funcs = (fib, fac, sum)
n = 12

def main():
    nfuncs = range(len(funcs))
    print('*** SINGLE THREAD')
    for i in nfuncs:
        print('starting', funcs[i].__name__,'at:', ctime())
        print(funcs[i](n))
        print(funcs[i].__name__, 'finished at:',ctime())
    print('\n*** MULTIPLE THREADS')
    threads = []
    for i in nfuncs:
        t = MyThread(funcs[i], (n,),
        funcs[i].__name__)
        threads.append(t)
    for i in nfuncs:
        threads[i].start()
    for i in nfuncs:
        threads[i].join()
        print(threads[i].getResult())
    print('all DONE')

if __name__ == '__main__':
    main()
View Code

3.6 條件變量同步(Condition)

      有一類線程需要滿足條件之后才能夠繼續執行,Python提供了threading.Condition 對象用於條件變量線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。

 lock_con=threading.Condition([Lock/Rlock]): 鎖是可選選項,不傳人鎖,對象自動創建一個RLock()。Condition(條件變量)通常與一個鎖關聯。需要在多個Contidion中共享一個鎖時,可以傳遞一個Lock/RLock實例給構造方法,否則它將自己生成一個RLock實例。可以認為,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另一個線程調用notify()/notifyAll()通知;得到通知后線程進入鎖定池等待鎖定。

構造方法: Condition([lock/rlock])
實例方法:
acquire([timeout])/release(): 調用關聯的鎖的相應方法。
wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試獲得鎖定(進入鎖定池);
其他線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
notifyAll(): 調用這個方法將通知等待池中所有的線程,這些線程都將進入鎖定池嘗試獲得鎖定。
調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
wait():條件不滿足時調用,線程會釋放鎖並進入等待阻塞;
notify():條件創造后調用,通知等待池激活一個線程;
notifyAll():條件創造后調用,通知等待池激活所有線程。

#__author: greg
#date: 2017/9/18 11:21
import threading
import time
product = None# 商品
con = threading.Condition()# 條件變量
def produce():# 生產者方法
    global product
    if con.acquire():
        while True:
            if product is None:
                print('produce...')
                product = 'anything'
                con.notify() # 通知消費者,商品已經生產
            con.wait()# 等待通知
            time.sleep(2)

def consume():# 消費者方法
    global product
    if con.acquire():
        while True:
            if product is not None:
                print('consume...')
                product = None
                con.notify()  # 通知生產者,商品已經沒了
            con.wait()  # 等待通知
            time.sleep(2)

t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start()
View Code

3.7 事件event

python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。

  • clear:將“Flag”設置為False
  • set:將“Flag”設置為True

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。

#__author: greg
#date: 2017/9/18 19:37
import threading
import time event = threading.Event() def func(): # 等待事件,進入等待阻塞狀態 print('%s wait for event...' % threading.currentThread().getName()) event.wait() # 收到事件后進入運行狀態 print('%s recv event.' % threading.currentThread().getName()) t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2) print('MainThread set event.')# 發送事件通知 event.set()

threading基於Java的線程模型設計。鎖(Lock)和條件變量(Condition)在Java中是對象的基本行為(每一個對象都自帶了鎖和條件變量),
而在Python中則是獨立的對象。Python Thread提供了Java Thread的行為的子集;沒有優先級、線程組,線程也不能被停止、暫停、恢復、中斷。
Java Thread中的部分被Python實現了的靜態方法在threading中以模塊方法的形式提供。
threading 模塊提供的常用方法:
threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

Event(事件)是最簡單的線程通信機制之一:一個線程通知事件,其他線程等待事件。
Event內置了一個初始為False的標志,當調用set()時設為True,調用clear()時重置為 False
wait()將阻塞線程至等待阻塞狀態。Event其實就是一個簡化版的 Condition
Event沒有鎖,無法使線程進入同步阻塞狀態。
構造方法:
Event()實例方法
isSet(): 當內置標志為True時返回True。
set(): 將標志設為True,並通知所有處於等待阻塞狀態的線程恢復運行狀態。
clear(): 將標志設為False。
wait([timeout]): 如果標志為True將立即返回,否則阻塞線程至等待阻塞狀態,等待其他線程調用set()。
3.8信號量(Semaphore)

Semaphore/BoundedSemaphore

Semaphore(信號量)是計算機科學史上最古老的同步指令之一。Semaphore管理一個內置的計數器,

每當調用acquire()時-1,調用release() 時+1。計數器不能小於0;當計數器為0時,
acquire()將阻塞線程至同步鎖定狀態,直到其他線程調用release()。
基於這個特點,Semaphore經常用來同步一些有“訪客上限”的對象,比如連接池。
BoundedSemaphore 與Semaphore的唯一區別在於前者將在調用release()時檢查計數器的值是否超過了計數器的初始值,
如果超過了將拋出一個異常。
構造方法: Semaphore(value=1): value是計數器的初始值。
實例方法:
acquire([timeout]): 請求Semaphore。如果計數器為0,將阻塞線程至同步阻塞狀態;否則將計數器-1並立即返回。
release(): 釋放Semaphore,將計數器+1,如果使用BoundedSemaphore,還將進行釋放次數檢查。
release()方法不檢查線程是否已獲得 Semaphore。

 
        
#__author: greg
#date: 2017/9/18 10:02
import threading,time
class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(3) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(10) thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
#__author: greg
#date: 2017/9/18 11:28
import threading
import time
semaphore = threading.Semaphore(2)# 計數器初值為2,比較4
# semaphore = threading.BoundedSemaphore(2)
def func():
    # 請求Semaphore,成功后計數器-1;計數器為0時阻塞
    print('%s acquire semaphore...' % threading.currentThread().getName())
    if semaphore.acquire():
        print('%s get semaphore' % threading.currentThread().getName())
        time.sleep(4)
        print('%s release semaphore' % threading.currentThread().getName())# 釋放Semaphore,計數器+1
        semaphore.release()

t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t4 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()
t4.start()
time.sleep(2)
# 沒有獲得semaphore的主線程也可以調用release
# 若使用BoundedSemaphore,t4釋放semaphore時將拋出異常
print('MainThread release semaphore without acquire')
semaphore.release()
實例2

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM