並發系列是一個很龐大的知識體系,要想完全弄明白是挺困難的,因為最近打算閱讀Tornado源碼, 其介紹談到了內部使用了異步非阻塞的調用方式。之前也沒有深入了解,這次就借此好好整理一下。
線程(threading模塊)
線程是應用程序運行的最小單元,在同一個進程中,可以並發開啟多個線程,每個線程擁有自己的棧(存放臨時變量),同時相互之間是共享資源的。
Python中使用threading模塊來開啟多線程
import threading, time def func(n): time.sleep(2)
print(time.time(),n)
if __name__ == '__main__': for i in range(10): t = threading.Thread(target=func, args=(1,)) t.start() print('主線程結束')
'結果'
主線程結束 1532921321.058243 1 1532921321.058243 1 1532921321.058243 1 1532921321.058243 1 ...
或者通過自定義類繼承Thread,並且重寫run方法
import threading,time class Mythread(threading.Thread): def __init__(self,name): super().__init__() self.name = name def run(self): time.sleep(1) print('hello %s'%self.name) if __name__ == '__main__': for i in range(5): m = Mythread('py') m.start() print('---主線程結束---')
執行順序如下
主線程和子線程之間是相互獨立的,但是主線程運行完畢會等待子線程的運行,直到完畢,才回收資源。
Thread對象可調用的方法
hread實例對象的方法 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。
#join():使主線程阻塞,直到該線程結束
threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
守護線程(setDaemon)
如果一個線程設置為守護線程,那么它將會和主線程一起結束,而主線程會等待所有的非守護線程的子線程結束而退出。因此可以認為,守護線程是“不重要的線程”,主線程不等它。
import threading, time """ 設置兩個線程 """ def func1(): print('--非守護線程開始--') time.sleep(2) print('--非守護線程結束--') def func2(): print('--守護線程開始--') time.sleep(4) print('--守護線程結束--') if __name__ == '__main__': t1 = threading.Thread(target=func1,args=()) t2 = threading.Thread(target=func2,args=()) t2.setDaemon(True) t1.start() t2.start() ''' --非守護線程開始-- --守護線程開始-- --非守護線程結束-- 守護線程還沒運行完,主線程就結束了 '''
而線程之間共享數據,必然會導致同時操作數據時的混亂,影響數據安全
import threading,time def func(): #開始處理數據 global n a=n+1 time.sleep(0.0001) n =a # 結束處理 if __name__ == '__main__': n=0 li =[] for i in range(1000): t=threading.Thread(target=func,args=()) li.append(t) t.start() for i in li: i.join() #等待子線程全部執行完 print(n) #253 ''' 我們希望能從0加到1000,但是由於有多個線程會拿到數據, 如果處理速度慢,就會使數據混亂 '''
因此,對數據進行加鎖就很有必要了。
互斥鎖(Lock)
通過獲取鎖對象,訪問共有數據,最后釋放鎖來完成一次操作,一旦某個線程獲取了鎖,當這個線程被切換時,下個個進程無法獲取該公有數據
import threading,time def func(): #開始處理數據 global n lock.acquire() #獲取 a=n+1 time.sleep(0.00001) n =a lock.release() #釋放 # 結束處理 if __name__ == '__main__': n=0 lock=threading.Lock() li =[] for i in range(1000): t=threading.Thread(target=func,args=()) li.append(t) t.start() for i in li: i.join() #等待子線程全部執行完 print(n) #1000
通過同步的互斥鎖來保證數據安全相比於線程串行運行而言,如每個線程start之前都使用.join()方法,無疑速度更快,因為它就只有在訪問數據的時候是串級的,其他的情況下是是並發的(雖然也不能是並行運行,因為GIL,之后會談到)。
再看如下情況
死鎖
import threading if __name__ == '__main__': lock=threading.Lock() lock.acquire() lock.acquire() #程序卡住,只能獲取一次 lock.release() lock.release()
遞歸鎖(RLock)
RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使
用RLock代替Lock,則不會發生死鎖:
import threading if __name__ == '__main__': lock=threading.RLock() lock.acquire() lock.acquire() #可以多次獲取,程序順利執行 lock.release() lock.release()
信號量(Semaphore)
能夠並發執行的線程數,超出的線程阻塞,直到有線程運行完成。
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。
import threading,time def sever_help(n): s.acquire() print('%s歡迎用戶%d'%(threading.current_thread().getName(),n)) time.sleep(2) s.release() if __name__ == '__main__': s = threading.Semaphore(5) li = [] for i in range(32): t = threading.Thread(target=sever_help,args=(i,)) li.append(t) t.start() for i in li: i.join() print("===結束==")
通過對比互斥鎖可以看出,互斥鎖就是Semaphore(1)的情況,也完全可以使用后者,但是如果數據必須單獨使用,那么用互斥鎖效率更高。
事件(Event)
如果某一個線程執行,需要判斷另一個線程的狀態,就可以使用Event,如:用Event類初始化一個event對象,線程a執行到某一步,設置event.wait(),即線程a阻塞,直到另一個線程設置event.set(),將event
狀態設置為True(默認是False)。

import threading import time, random def eating(): event.wait() print('去吃飯的路上...') def makeing(): print('做飯中') time.sleep(random.randint(1,2)) print('做好了,快來...') event.set() if __name__ == '__main__': event=threading.Event() t1 = threading.Thread(target=eating) t2 = threading.Thread(target=makeing) t1.start() t2.start() # 做飯中 # 做好了,快來... # 去吃飯的路上...
基本方法:
event.isSet():返回event的狀態值; event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度; event.clear():恢復event的狀態值為False。
線程隊列(queue)
特點:先進先出,
作用:多個線程之間進行通信(作用不大,多進程的隊列用處大)
常用方法:
- .get() 獲取 無數據時會阻塞
- .set('item') 設置,先設置的數據,先取出
- .empty() 是否為空
基本使用:生成者消費者模型

import threading, queue def eating(n): #消費 i = q.get() print('消費者%d吃了第%d份食物' % (n, i)) def making(): #生產 for i in range(1, 11): print('正在制作第%d份食物' % i) time.sleep(1) q.put(i) if __name__ == '__main__': q = queue.Queue() t2 = threading.Thread(target=makeing) t2.start() for i in range(1, 11): t = threading.Thread(target=eating, args=(i,)) t.start()