Python多線程之間同步總結


 

線程安全

多線程主要是為了提高我們cpu的資源使用率。但同時,這會給我們帶來很多安全問題!

如果我們在單線程中以“順序”(串行-->獨占)的方式執行代碼是沒有任何問題的。但是到了多線程的環境下(並行),如果沒有設計和控制得好,就會給我們帶來很多意想不到的狀況,也就是線程安全性問題。

因為在多線程的環境下,線程是交替執行的,一般他們會使用多個線程執行相同的代碼。如果在此相同的代碼里邊有着共享的變量,或者一些組合操作(訪問共享的內存),我們想要的正確結果就很容易出現了問題。

那么到底什么是線程安全呢?

線程安全的問題在於多個線程訪問共享的內存而產生的,也就是我們要確保在多線程訪問的時候,我們的程序還能按照我們預期的行為去執行,這句話里最重要的是多線程,共享內存,你現在要記住這兩句話。

舉個例子:

from threading import Thread

num = 0
def add_num():
    global num
    for i in range(100000):
        num += 1
if __name__ == '__main__':
    threads = []  # 創建進程列表
    for i in range(0,10):
        t = Thread(target=add_num)
        t.start()
        threads.append(t)
    # 主線程等待所有子線程運行結束
    for thread in threads:
        thread.join()
    print(num)

上面程序是在多線程環境下跑起來,它的num值計算就不對了!我這里的計算結果為793182。

首先,它共享了num這個變量,其次來說num+=1這個操作來說;這是一個組合的操作(注意,它並非是原子性)

實際上的操作是這樣子的:

1.讀取num 值
2.將值+1
3.將計算結果寫入num

於是多線程執行的時候很可能就會有這樣的情況:

如果當線程A讀取到num 的值是8的時候,同時線程B也進去這個方法上了,也是讀取到num 的值為8

它倆都對值進行加1

都將計算結果寫入到num上。但是,寫入到num上的結果是9

也就是說:兩個線程進來了,但是正確的結果是應該返回10,而它返回了9,這是錯誤的,而我們希望的是num的值為10。

如果說:當多個線程訪問某個類的時候,這個類始終能表現出正確的行為,那么這個類就是線程安全的!

 

臨界區

我們把對共享內存進行訪問的程序片段稱作臨界區,當多個線程訪問涉及共享內存,共享文件、共享任何資源的情況都會引起發生錯,避免錯誤的關鍵要找到途徑阻止多個進程同時讀寫共享的數據,在編程中,就是阻止多個線程同時訪問臨界區
到了這里,線程安全就是防止多個線程同時訪問共享內存,線程安全的本質是其他內存安全

 

如何解決線程安全

為了保證使用共享數據的並發編程能正確、高效協作,對於一個好的解決方案,需要滿足以下四個條件:

  • 任何倆個線程不能同時處於臨界區
  • 不應對cpu的速度和數量做任何假設
  • 臨界區外運行的線程不能阻塞其他線程
  • 不得使線程無限期等待進入臨界區

注意:可以把這里的進程看成線程,進程之間同步的方式和線程類似。

解決方案:

  • 互斥量
  • 信號量
  • 事件

 

信號量(Python Semaphore對象)

Semaphore對象內部管理一個計數器,該計數器由每個acquire()調用遞減,並由每個release()調用遞增計數器永遠不會低於零,當acquire()發現計數器為零時,線程阻塞,等待其他線程調用release()。
Semaphore對象支持上下文管理協議。

此計數器是線程共享的,每個線程都可以操作此計數器。

方法:

Semaphore(value=1):創建一個計數器對象,默認值為1。

acquire(blocking=True, timeout=None)
獲取信號,使計數器遞減1。
blocking=True時:如果調用時計數器大於零,則將其減1並立即返回。如果在調用時計數器為零,則阻塞並等待,直到其他線程調用release()使其大於零。這是通過適當的互鎖來完成的,因此如果多個acquire()被阻塞,release()將只喚醒其中一個,這個過程會隨機選擇一個,因此不應該依賴阻塞線程的被喚醒順序。返回值為True
blocking=False時,不會阻塞。如果調用acquire()時計數器為零,則會立即返回False.
如果設置了timeout參數,它將阻塞最多timeout秒。如果在該時間段內沒有獲取鎖,則返回False,否則返回True

release()
釋放信號,使計數器遞增1。當計數器為零並有另一個線程等待計數器大於零時,喚醒該線程。

總結:信號量基於計數器來實現線程同步,此計數器為線程之間共享,利用信號量,你可以實現限量線程同時訪問臨界區,這句話怎么理解,因為你可以設置不同大小的value即可實現。

來看下面的代碼:

import time
import threading

def foo():
    time.sleep(2)    #程序休息2秒
    print("ok",time.ctime())

for i in range(20):
    t1=threading.Thread(target=foo,args=())    #實例化一個線程
    t1.start()    #啟動線程

執行結果:

ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017
ok Tue Jul 18 20:05:58 2017

可以看到,程序會在很短的時間內生成20個線程來打印一句話。

這時候就可以為這段程序添加一個信號量的計數器功能,來限制一個時間點內的線程數量。

代碼如下:

import time
import threading

s1=threading.Semaphore(5)    #添加一個計數器

def foo():
    s1.acquire()    #計數器獲得鎖
    time.sleep(2)    #程序休眠2秒
    print("ok",time.ctime())
    s1.release()    #計數器釋放鎖


for i in range(20):
    t1=threading.Thread(target=foo,args=())    #創建線程
    t1.start()    #啟動線程    

執行結果:

ok Tue Jul 18 20:04:38 2017
ok Tue Jul 18 20:04:38 2017
ok Tue Jul 18 20:04:38 2017
ok Tue Jul 18 20:04:38 2017
ok Tue Jul 18 20:04:38 2017
ok Tue Jul 18 20:04:40 2017
ok Tue Jul 18 20:04:40 2017
ok Tue Jul 18 20:04:40 2017
ok Tue Jul 18 20:04:40 2017
ok Tue Jul 18 20:04:40 2017
ok Tue Jul 18 20:04:42 2017
ok Tue Jul 18 20:04:42 2017
ok Tue Jul 18 20:04:42 2017
ok Tue Jul 18 20:04:42 2017
ok Tue Jul 18 20:04:42 2017
ok Tue Jul 18 20:04:44 2017
ok Tue Jul 18 20:04:44 2017
ok Tue Jul 18 20:04:44 2017
ok Tue Jul 18 20:04:44 2017
ok Tue Jul 18 20:04:44 2017

可以看到每隔兩秒鍾就有五條信息產生,對應了五個線程,我們實現了限量線程訪問臨界區。

 

 

事件(Python Event對象)

事件對象管理一個內部標志,通過set()方法將其設置為True,並使用clear()方法將其設置為Falsewait()方法阻塞,直到標志為True。該標志初始為False

方法:
is_set()
當且僅當內部標志為True時返回True

set()
將內部標志設置為True。所有等待它成為True的線程都被喚醒。當標志保持在True的狀態時,線程調用wait()是不會阻塞的。

clear()
將內部標志重置為False。隨后,調用wait()的線程將阻塞,直到另一個線程調用set()將內部標志重新設置為True

wait(timeout=None)
阻塞直到內部標志為真。如果內部標志在wait()方法調用時為True,則立即返回。否則,則阻塞,直到另一個線程調用set()將標志設置為True,或發生超時。
該方法總是返回True,除非設置了timeout並發生超時。

總結:Event是一個能在多線程中共用的對象,這和信號量是相同的,信號量基於計數器,而event基於共享標志位,一開始它包含一個為 False的信號標志,一旦在任一一個線程里面把這個標記改為 True,那么所有的線程都會看到這個標記變成了 True。看到這里,event要么把共享此標志的線程全部給阻塞,要么大家一起運行。

設想這樣一個場景:

你創建了10個子線程,每個子線程分別爬一個網站,一開始所有子線程都是阻塞等待。一旦某個事件發生:例如有人在網頁上點了一個按鈕,或者某人在命令行輸入了一個命令,10個爬蟲同時開始工作。

代碼片段可以簡寫為:

import threading
import time
class spider(threading.Thread):
 def __init__(self, n, event):
        super().__init__()
        self.n = n
        self.event = event
 def run(self):
     print(f'第{self.n}號爬蟲已就位!')
     self.event.wait()
     print(f'信號標記變為True!!第{self.n}號爬蟲開始運行')
eve = threading.Event()
for num in range(10):
    crawler = spider(num, eve)
    crawler.start()
input('按下回車鍵,啟動所有爬蟲!')
eve.set() #設置標志為true
time.sleep(5)

運行結果如圖所示:

 

 

 

互斥量

  • 原始鎖 (Python Lock對象)
  • 重入鎖 (Python RLock對象)
  • 條件鎖 (Python Condition(條件對象)
  • 障礙鎖 (Python Barrier對象)

原始鎖 (Python Lock對象)

方法:

acquire(blocking=True,timeout=-1):默認阻塞,阻塞可以設置超時時間。非阻塞時,timeout禁止設置。成功獲取鎖,返回True,否則返回False

release( ):釋放鎖。可以從任何線程釋放。已上鎖的鎖,會拋出RuntimeError異常。

總結:原始鎖是一個處於兩個狀態的變量:解鎖和加鎖,只需要一個二進制位表示,原始鎖和前面信號量和event都是可以被線程之間共享的,這里原始鎖對象始終只讓一個線程進入臨界區,如果該鎖在進入臨界區之前加鎖,則其他線程在進入此臨界區時都會被阻塞,等待臨界區的線程完成解鎖操作。其他線程才有機會進入此臨界區(隨機選擇一個線程允許它獲得鎖)。

原始鎖適用於訪問和修改同一個資源的時候,引起資源爭用的情況下。使用鎖的注意事項:鎖的使用場景:

1.少用鎖,除非有必要。多線程訪問加鎖的資源時,由於鎖的存在,實際就變成了串行。

2.加鎖時間越短越好,不需要就立即釋放鎖。

3.一定要避免死鎖,使用with或者try...finally。

我們修改了最開始的例子,使用原始鎖保證了線程安全:

import threading
num = 0
lock = threading.Lock()
def add_num():
    global num
    try:
        lock.acquire()
        for i in range(100000):
            num += 1
    finally:
        lock.release()
if __name__ == '__main__':
    threads = []  # 創建進程列表
    for i in range(0,10):
        t = threading.Thread(target=add_num)
        t.start()
        threads.append(t)
    # 主線程等待所有子線程運行結束
    for thread in threads:
        thread.join()
    print(num)

 

 

重入鎖  (Python RLock對象)

  為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。

threading.Rlock() 允許多次鎖資源,acquire() 和 release() 必須成對出現,也就是說加了幾把鎖就得釋放幾把鎖。

方法:

acquire(blocking=True,timeout = -1):內部counter變量加1

release():內部counter變量減1

lock = threading.Lock()
# 死鎖
lock.acquire()
lock.acquire()
print('...')
lock.release()
lock.release()
///////////////////////// rlock
= threading.RLock() # 同一線程內不會阻塞線程,可以多次acquire rlock.acquire() rlock.acquire() print('...') rlock.release() rlock.release()

 

 

條件鎖  (Python Condition(條件對象)

條件變量允許一個或多個線程等待,直到他們被另一個線程通知。

方法:

acquire(*args)
獲取鎖。這個方法調用底層鎖的相應方法。

release()
釋放鎖。這個方法調用底層鎖的相應方法。

wait(timeout=None)
線程掛起,等待被喚醒(其他線程的notify方法)或者發生超時。調用該方法的線程必須先獲得鎖,否則引發RuntimeError
該方法會釋放底層鎖,然后阻塞,直到它被另一個線程中的相同條件變量的notify()notify_all()方法喚醒,或者發生超時。一旦被喚醒或超時,它會重新獲取鎖並返回。
返回值為True,如果給定timeout並發生超時,則返回False

wait_for(predicate, timeout=None)
等待知道條件變量的返回值為Truepredicate應該是一個返回值可以解釋為布爾值的可調用對象。可以設置timeout以給定最大等待時間。
該方法可以重復調用wait(),直到predicate的返回值解釋為True,或發生超時。該方法的返回值就是predicate的最后一個返回值,如果發生超時,返回值為False
如果忽略超時功能,該方法大致相當於:

while not predicate(): con.wait() 

它與wait()的規則相同:調用前必須先獲取鎖,阻塞時釋放鎖,並在被喚醒時重新獲取鎖並返回。

notify(n=1)
默認情況下,喚醒等待此條件變量的一個線程(如果有)。調用該方法的線程必須先獲得鎖,否則引發RuntimeError
該方法最多喚醒n個等待中的線程,如果沒有線程在等待,它就是要給無動作的操作。
注意:要被喚醒的線程實際上不會馬上從wait()方法返回(喚醒),而是等到它重新獲取鎖。這是因為notify()並不會釋放鎖,需要線程本身來釋放(通過wait()或者release())

notify_all()
此方法類似於notify(),但喚醒的時所有等待的線程。

 

障礙鎖 (Python Barrier對象)

Barrier(parties, action=None, timeout=None)

每個線程通過調用wait()嘗試通過障礙,並阻塞,直到阻塞的數量達到parties時,阻塞的線程被同時全部釋放。
action是一個可調用對象,當線程被釋放時,其中一個線程會首先調用action,之后再跑自己的代碼。
timeout時默認的超時時間。

方法:
wait(timeout=None)
嘗試通過障礙並阻塞。
返回值是一個在0parties-1范圍內的整數,每個線程都不同。
其中一個線程在釋放之前將調用action。如果此調用引發錯誤,則障礙將進入斷開狀態。
如果等待超時,障礙也將進入斷開狀態。
如果在線程等待期間障礙斷開重置,此方法可能會引發BrokenBarrierError錯誤。

reset()
重置障礙,返回默認的空狀態,即當前阻塞的線程重新來過。見例二

abort()
將障礙置為斷開狀態,這將導致已調用wait()或之后調用wait()引發BrokenBarrierError。見例三

屬性:
partier
通過障礙所需的線程數。

n_waiting
當前在屏障中等待的線程數

broken
如果屏障處於斷開狀態,則返回True

總結:障礙鎖也是線程之間共享的,可以阻塞一個或者多個線程,不同的是,當阻塞的線程達到一個目標值以后,被阻塞的線程會被同時釋放,提供了action,在達到目標值以后會執行此action可調用對象(用戶自定義函數),且只執行一次。

實例

例一:

# -*- coding:utf-8 -*-
import threading
import time


def open():
    print('人數夠了, 開門!')


barrier = threading.Barrier(3, open)


class Customer(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.n = 3

    def run(self):
        while self.n > 0:
            self.n -= 1
            print('{0}在等着開門.'.format(self.name))
            try:
                barrier.wait(2)
            except threading.BrokenBarrierError:
                pass
            print('開門了, go go go')


if __name__ == '__main__':
    t1 = Customer(name='A')
    t2 = Customer(name='B')
    t3 = Customer(name='C')
    t1.start()
    t2.start()
    t3.start()

運行結果:

A在等着開門.
B在等着開門.
C在等着開門.
人數夠了, 開門!
開門了, go go go
開門了, go go go
開門了, go go go
C在等着開門.
A在等着開門.
B在等着開門.
人數夠了, 開門!
開門了, go go go
開門了, go go go
開門了, go go go
...

例二:

 
# -*- coding:utf-8 -*-
import threading
import time


def open():
    print('人數夠了, 開門!')


barrier = threading.Barrier(3, open)


class Customer(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.n = 3

    def run(self):
        while self.n > 0:
            self.n -= 1
            print('{0}在等着開門.'.format(self.name))
            try:
                barrier.wait(2)
            except threading.BrokenBarrierError:
                continue
            print('開門了, go go go')


class Manager(threading.Thread):
    def run(self):
        print('前面幾個排隊的不算,重新來')
        barrier.reset()


if __name__ == '__main__':
    t1 = Customer(name='A')
    t2 = Customer(name='B')
    t3 = Customer(name='C')
    tm = Manager()
    t1.start()
    t2.start()
    tm.start()
    t3.start()

運行結果:

A在等着開門.
B在等着開門.
前面幾個排隊的不算,重新來
A在等着開門.
B在等着開門.
C在等着開門.
人數夠了, 開門!
開門了, go go go
開門了, go go go
開門了, go go go
A在等着開門.
C在等着開門.
B在等着開門.
人數夠了, 開門!
開門了, go go go
開門了, go go go
開門了, go go go
C在等着開門.

例三:

# -*- coding:utf-8 -*-
import threading


def open():
    print('人數夠了, 開門!')


barrier = threading.Barrier(3, open)


class Customer(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.n = 3

    def run(self):
        while self.n > 0:
            self.n -= 1
            print('{0}在等着開門.'.format(self.name))
            try:
                barrier.wait(2)
            except threading.BrokenBarrierError:
                print('今天好像不開門了,回家.')
                break
            print('開門了, go go go')


class Manager(threading.Thread):
    def run(self):
        print('老板跟小姨子跑了,不開門了!')
        barrier.reset()


if __name__ == '__main__':
    t1 = Customer(name='A')
    t2 = Customer(name='B')
    t3 = Customer(name='C')
    tm = Manager()
    t1.start()
    t2.start()
    tm.start()
    t3.start()

運行結果:

A在等着開門.
B在等着開門.
老板跟小姨子跑了,不開門了!
今天好像不開門了,回家.
今天好像不開門了,回家.
C在等着開門.
今天好像不開門了,回家.

 

 

參考:

python中的線程之semaphore信號量

一日一技:Python多線程的事件監控

Python多線程-Barrier(障礙對象)

python高級-多線程總結(思維導圖)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM