Python程序中的線程操作-鎖


Python程序中的線程操作-鎖

一、同步鎖

1.1多個線程搶占資源的情況

from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果可能為99

1.1.1對公共數據的操作

import threading
R=threading.Lock()
R.acquire()
'''
對公共數據的操作
'''
R.release()

1.2同步鎖的引用

from threading import Thread, Lock
import os, time


def work():
    global n
    lock.acquire()
    temp = n
    time.sleep(0.1)
    n = temp - 1
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    n = 100
    l = []
    for i in range(100):
        p = Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n)  # 結果肯定為0,由原來的並發執行變成串行,犧牲了執行效率保證了數據安全

1.3互斥鎖與join的區別

# 不加鎖:並發執行,速度快,數據不安全
from threading import current_thread, Thread, Lock
import os, time


def task():
    global n
    print('%s is running' % current_thread().getName())
    temp = n
    time.sleep(0.5)
    n = temp - 1


if __name__ == '__main__':
    n = 100
    lock = Lock()
    threads = []
    start_time = time.time()
    for i in range(100):
        t = Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time = time.time()
    print('主:%s n:%s' % (stop_time - start_time, n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''
# 不加鎖:未加鎖部分並發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread, Thread, Lock
import os, time


def task():
    # 未加鎖的代碼並發運行
    time.sleep(3)
    print('%s start to run' % current_thread().getName())
    global n
    # 加鎖的代碼串行運行
    lock.acquire()
    temp = n
    time.sleep(0.5)
    n = temp - 1
    lock.release()


if __name__ == '__main__':
    n = 100
    lock = Lock()
    threads = []
    start_time = time.time()
    for i in range(100):
        t = Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time = time.time()
    print('主:%s n:%s' % (stop_time - start_time, n))

'''
Thread-2 start to run
Thread-3 start to run
Thread-1 start to run
Thread-6 start to run
Thread-4 start to run
......
Thread-99 start to run
Thread-96 start to run
Thread-100 start to run
Thread-92 start to run
Thread-93 start to run
主:53.294203758239746 n:0
'''

有的同學可能有疑問:既然加鎖會讓運行變成串行,那么我在start之后立即使用join,就不用加鎖了啊,也是串行的效果啊

沒錯:在start之后立刻使用jion,肯定會將100個任務的執行變成串行,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是

start后立即join:任務內的所有代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的

單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.

from threading import current_thread, Thread, Lock
import os, time


def task():
    time.sleep(3)
    print('%s start to run' % current_thread().getName())
    global n
    temp = n
    time.sleep(0.5)
    n = temp - 1


if __name__ == '__main__':
    n = 100
    lock = Lock()
    start_time = time.time()
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t.join()
    stop_time = time.time()
    print('主:%s n:%s' % (stop_time - start_time, n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多么的恐怖
'''

二、死鎖與遞歸鎖

所謂死鎖:是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖

2.1死鎖

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
然后就卡住,死鎖了
'''

解決方法:遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖。

mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止

三、典型問題:科學家吃面

3.1死鎖問題

import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了面條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃面'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了面條' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','nick','tank']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

3.2遞歸鎖解決死鎖問題

import time
from threading import Thread, RLock

fork_lock = noodle_lock = RLock()


def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了面條' % name)
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    print('%s 吃面' % name)
    fork_lock.release()
    noodle_lock.release()


def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了面條' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()


for name in ['哪吒', 'nick', 'tank']:
    t1 = Thread(target=eat1, args=(name,))
    t2 = Thread(target=eat2, args=(name,))
    t1.start()
    t2.start()

四、信號量Semaphore

同進程的一樣

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):

from threading import Thread,Semaphore
import threading
import time


def task():
    sm.acquire()
    print(f"{threading.current_thread().name} get sm")
    time.sleep(3)
    sm.release()

if __name__ == '__main__':
    sm = Semaphore(5) # 同一時間只有5個進程可以執行。
    for i in range(20):
        t = Thread(target=task)
        t.start()

與進程池是完全不同的概念,進程池Pool(4),最大只能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM