Python程序中的線程操作-鎖
一、同步鎖
1.1多個線程搶占資源的情況
from threading import Thread
import os,time
def work():
global n
temp=n
time.sleep(0.1)
n=temp-1
if __name__ == '__main__':
n=100
l=[]
for i in range(100):
p=Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) #結果可能為99
1.1.1對公共數據的操作
import threading
R=threading.Lock()
R.acquire()
'''
對公共數據的操作
'''
R.release()
1.2同步鎖的引用
from threading import Thread, Lock
import os, time
def work():
global n
lock.acquire()
temp = n
time.sleep(0.1)
n = temp - 1
lock.release()
if __name__ == '__main__':
lock = Lock()
n = 100
l = []
for i in range(100):
p = Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) # 結果肯定為0,由原來的並發執行變成串行,犧牲了執行效率保證了數據安全
1.3互斥鎖與join的區別
# 不加鎖:並發執行,速度快,數據不安全
from threading import current_thread, Thread, Lock
import os, time
def task():
global n
print('%s is running' % current_thread().getName())
temp = n
time.sleep(0.5)
n = temp - 1
if __name__ == '__main__':
n = 100
lock = Lock()
threads = []
start_time = time.time()
for i in range(100):
t = Thread(target=task)
threads.append(t)
t.start()
for t in threads:
t.join()
stop_time = time.time()
print('主:%s n:%s' % (stop_time - start_time, n))
'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''
# 不加鎖:未加鎖部分並發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread, Thread, Lock
import os, time
def task():
# 未加鎖的代碼並發運行
time.sleep(3)
print('%s start to run' % current_thread().getName())
global n
# 加鎖的代碼串行運行
lock.acquire()
temp = n
time.sleep(0.5)
n = temp - 1
lock.release()
if __name__ == '__main__':
n = 100
lock = Lock()
threads = []
start_time = time.time()
for i in range(100):
t = Thread(target=task)
threads.append(t)
t.start()
for t in threads:
t.join()
stop_time = time.time()
print('主:%s n:%s' % (stop_time - start_time, n))
'''
Thread-2 start to run
Thread-3 start to run
Thread-1 start to run
Thread-6 start to run
Thread-4 start to run
......
Thread-99 start to run
Thread-96 start to run
Thread-100 start to run
Thread-92 start to run
Thread-93 start to run
主:53.294203758239746 n:0
'''
有的同學可能有疑問:既然加鎖會讓運行變成串行,那么我在start之后立即使用join,就不用加鎖了啊,也是串行的效果啊
沒錯:在start之后立刻使用jion,肯定會將100個任務的執行變成串行,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是
start后立即join:任務內的所有代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的
單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread, Thread, Lock
import os, time
def task():
time.sleep(3)
print('%s start to run' % current_thread().getName())
global n
temp = n
time.sleep(0.5)
n = temp - 1
if __name__ == '__main__':
n = 100
lock = Lock()
start_time = time.time()
for i in range(100):
t = Thread(target=task)
t.start()
t.join()
stop_time = time.time()
print('主:%s n:%s' % (stop_time - start_time, n))
'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多么的恐怖
'''
二、死鎖與遞歸鎖
所謂死鎖:是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖
2.1死鎖
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('\033[41m%s 拿到A鎖\033[0m' %self.name)
mutexB.acquire()
print('\033[42m%s 拿到B鎖\033[0m' %self.name)
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print('\033[43m%s 拿到B鎖\033[0m' %self.name)
time.sleep(2)
mutexA.acquire()
print('\033[44m%s 拿到A鎖\033[0m' %self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t=MyThread()
t.start()
'''
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
然后就卡住,死鎖了
'''
解決方法:遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖。
mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止
三、典型問題:科學家吃面
3.1死鎖問題
import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print('%s 搶到了面條'%name)
fork_lock.acquire()
print('%s 搶到了叉子'%name)
print('%s 吃面'%name)
fork_lock.release()
noodle_lock.release()
def eat2(name):
fork_lock.acquire()
print('%s 搶到了叉子' % name)
time.sleep(1)
noodle_lock.acquire()
print('%s 搶到了面條' % name)
print('%s 吃面' % name)
noodle_lock.release()
fork_lock.release()
for name in ['哪吒','nick','tank']:
t1 = Thread(target=eat1,args=(name,))
t2 = Thread(target=eat2,args=(name,))
t1.start()
t2.start()
3.2遞歸鎖解決死鎖問題
import time
from threading import Thread, RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
noodle_lock.acquire()
print('%s 搶到了面條' % name)
fork_lock.acquire()
print('%s 搶到了叉子' % name)
print('%s 吃面' % name)
fork_lock.release()
noodle_lock.release()
def eat2(name):
fork_lock.acquire()
print('%s 搶到了叉子' % name)
time.sleep(1)
noodle_lock.acquire()
print('%s 搶到了面條' % name)
print('%s 吃面' % name)
noodle_lock.release()
fork_lock.release()
for name in ['哪吒', 'nick', 'tank']:
t1 = Thread(target=eat1, args=(name,))
t2 = Thread(target=eat2, args=(name,))
t1.start()
t2.start()
四、信號量Semaphore
同進程的一樣
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。
實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):
from threading import Thread,Semaphore
import threading
import time
def task():
sm.acquire()
print(f"{threading.current_thread().name} get sm")
time.sleep(3)
sm.release()
if __name__ == '__main__':
sm = Semaphore(5) # 同一時間只有5個進程可以執行。
for i in range(20):
t = Thread(target=task)
t.start()
與進程池是完全不同的概念,進程池Pool(4),最大只能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程。