1 引言
上一篇博文詳細總結了Python進程的用法,這一篇博文來所以說Python中線程的用法。實際上,程序的運行都是以線程為基本單位的,每一個進程中都至少有一個線程(主線程),線程又可以創建子線程。線程間共享數據比進程要容易得多(輕而易舉),進程間的切換也要比進程消耗CPU資源少。
線程管理可以通過thead模塊(Python中已棄用)和threading 模塊,但目前主要以threading模塊為主。因為更加先進,有更好的線程支持,且 threading模塊的同步原語遠多於thread模塊。另外,thread 模塊中的一些屬性會和 threading 模塊有沖突。故,本文創建線程和使用線程都通過threading模塊進行。
threading模塊提供的類: Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。
threading 模塊提供的常用方法:
threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
threading 模塊提供的常量:
threading.TIMEOUT_MAX 設置threading全局超時時間。
2 創建線程
無論是用自定義函數的方法創建線程還是用自定義類的方法創建線程與創建進程的方法極其相似的,不過,創建線程時,可以不在“if __name__==”__main__:”語句下進行”。無論是哪種方式,都必須通過threading模塊提供的Thread類進行。Thread類常用屬性和方法如下。
Thread類屬性:
name:線程名
ident:線程的標識符
daemon:布爾值,表示這個線程是否是守護線程
Thread類方法:
__init__(group=None,target=None,name=None,args=(),kwargs={},verbose=None,daemon=None):實例化一個線程對象,需要一個可調用的target對象,以及參數args或者kwargs。還可以傳遞name和group參數。daemon的值將會設定thread.daemon的屬性
start():開始執行該線程
run():定義線程的方法。(通常開發者應該在子類中重寫)
join(timeout=None):直至啟動的線程終止之前一直掛起;除非給出了timeout(單位秒),否則一直被阻塞
isAlive:布爾值,表示這個線程是否還存活(駝峰式命名,python2.6版本開始已被取代)
isDaemon():布爾值,表示是否是守護線程
setDaemon(布爾值):在線程start()之前調用,把線程的守護標識設定為指定的布爾值
在下面兩小節我們分別通過代碼來演示。
2.1 自定義函數的方式創建線程
import os import time import threading def fun(n): print('子線程開始運行……') time.sleep(1) my_thread_name = threading.current_thread().name#獲取當前線程名稱 my_thread_id = threading.current_thread().ident#獲取當前線程id print('當前線程為:{},線程id為:{},所在進程為:{},您輸入的參數為:{}'.format(my_thread_name ,my_thread_id , os.getpid(),n)) print('子線程運行結束……') t = threading.Thread(target=fun , name='線程1',args=('參數1',)) t.start() time.sleep(2) main_thread_name = threading.current_thread().name#獲取當前線程名稱 main_thread_id = threading.current_thread().ident#獲取當前線程id print('主線程為:{},線程id為:{},所在進程為:{}'.format(main_thread_name ,main_thread_id , os.getpid()))
2.2 類的方式創建線程
import os import time import threading class MyThread(threading.Thread): def __init__(self , n , name=None): super().__init__() self.name = name self.n = n def run(self): print('子線程開始運行……') time.sleep(1) my_thread_name = threading.current_thread().name#獲取當前線程名稱 my_thread_id = threading.current_thread().ident#獲取當前線程id print('當前線程為:{},線程id為:{},所在進程為:{},您輸入的參數為:{}'.format(my_thread_name ,my_thread_id , os.getpid(),self.n)) print('子線程運行結束……') t = MyThread(name='線程1', n=1) t.start() time.sleep(2) main_thread_name = threading.current_thread().name#獲取當前線程名稱 main_thread_id = threading.current_thread().ident#獲取當前線程id print('主線程為:{},線程id為:{},所在進程為:{}'.format(main_thread_name ,main_thread_id , os.getpid()))
輸出結果:
子線程開始運行……
當前線程為:線程1,線程id為:11312,所在進程為:4532,您輸入的參數為:1
子線程運行結束……
主線程為:MainThread,線程id為:10868,所在進程為:4532
上述兩塊代碼輸出結果是一樣的(id不一樣),觀察輸出結果可以發現,子線程和主線程所在的進程都是一樣的,證明是在同一進程中的進程。
3 Thread的常用方法和屬性
3.1 守護線程:Deamon
Thread類有一個名為deamon的屬性,標志該線程是否為守護線程,默認值為False,當為設為True是表示設置為守護線程。是否是守護線程有什么區別呢?我們先來看看deamon值為False(默認)情況時:
import os import time import threading def fun(): print('子線程開始運行……') for i in range(6):#運行3秒,每秒輸出一次 time.sleep(1) my_thread_name = threading.current_thread().name print('{}已運行{}秒……'.format(my_thread_name ,i+1)) print('子線程運行結束……') print('主線程開始運行……') t = threading.Thread(target=fun , name='線程1') print('daemon的值為:{}'.format(t.daemon)) t.start() for i in range(3): time.sleep(1) my_thread_name = threading.current_thread().name print('{}已運行{}秒……'.format(my_thread_name, i+1)) print('主線程結束運行……')
輸出結果:
主線程開始運行……
daemon的值為:False
子線程開始運行……
MainThread已運行1秒……
線程1已運行1秒……
MainThread已運行2秒……
線程1已運行2秒……
MainThread已運行3秒……
主線程結束運行……
線程1已運行3秒……
線程1已運行4秒……
線程1已運行5秒……
線程1已運行6秒……
子線程運行結束……
代碼中,主線程只需要運行3秒即可結束,但子線程需要運行6秒,從運行結果中可以看到,主線程代碼運行結束后,子線程還可以繼續運行,這就是非守護線程的特征。
再來看看daemon值為True時:
import time import threading def fun(): print('子線程開始運行……') for i in range(6):#運行3秒,每秒輸出一次 time.sleep(1) my_thread_name = threading.current_thread().name print('{}已運行{}秒……'.format(my_thread_name ,i+1)) print('子線程運行結束……') print('主線程開始運行……') t = threading.Thread(target=fun , name='線程1') t.daemon=True #設置為守護線程 print('daemon的值為:{}'.format(t.daemon)) t.start() for i in range(3): time.sleep(1) my_thread_name = threading.current_thread().name print('{}已運行{}秒……'.format(my_thread_name, i+1)) print('主線程結束運行……')
輸出結果:
主線程開始運行……
daemon的值為:True
子線程開始運行……
MainThread已運行1秒……
線程1已運行1秒……
MainThread已運行2秒……
線程1已運行2秒……
MainThread已運行3秒……
主線程結束運行……
從運行結果中可以看出,當deamon值為True,即設為守護線程后,只要主線程結束了,無論子線程代碼是否結束,都得跟着結束,這就是守護線程的特征。另外,修改deamon的值必須在線程start()方法調用之前,否則會報錯。
3.2 join()方法
join()方法的作用是在調用join()方法處,讓所在線程(主線程)同步的等待被join的線程(下面的p線程),只有p線程結束。我們嘗試在不同的位置調用join方法,對比運行結果。首先在p線程一開始的位置進行join:
import time import threading def fun(): print('子線程開始運行……') for i in range(6):#運行3秒,每秒輸出一次 time.sleep(1) my_thread_name = threading.current_thread().name print('{}已運行{}秒……'.format(my_thread_name ,i+1)) print('子線程運行結束……') print('主線程開始運行……') t = threading.Thread(target=fun , name='線程1') t.daemon=True #設置為守護線程 t.start() t.join() #此處進行join for i in range(3): time.sleep(1) my_thread_name = threading.current_thread().name print('{}已運行{}秒……'.format(my_thread_name, i+1)) print('主線程結束運行……')
輸出結果:
主線程開始運行……
子線程開始運行……
線程1已運行1秒……
線程1已運行2秒……
線程1已運行3秒……
線程1已運行4秒……
線程1已運行5秒……
線程1已運行6秒……
子線程運行結束……
MainThread已運行1秒……
MainThread已運行2秒……
MainThread已運行3秒……
主線程結束運行……
可以看出,等子線程運行完之后,主線程才繼續join下面的代碼。然后在主線程即將結束時進行join:
import time import threading def fun(): print('子線程開始運行……') for i in range(6):#運行3秒,每秒輸出一次 time.sleep(1) my_thread_name = threading.current_thread().name print('{}已運行{}秒……'.format(my_thread_name ,i+1)) print('子線程運行結束……') print('主線程開始運行……') t = threading.Thread(target=fun , name='線程1') t.daemon=True #設置為守護線程 t.start() for i in range(3): time.sleep(1) my_thread_name = threading.current_thread().name print('{}已運行{}秒……'.format(my_thread_name, i+1)) t.join() print('主線程結束運行……')
輸出結果:
主線程開始運行……
子線程開始運行……
MainThread已運行1秒……
線程1已運行1秒……
MainThread已運行2秒……
線程1已運行2秒……
MainThread已運行3秒……
線程1已運行3秒……
線程1已運行4秒……
線程1已運行5秒……
線程1已運行6秒……
子線程運行結束……
主線程結束運行……
上面代碼中,子線程是設置為守護線程的,如果沒有調用join()方法主線程3秒結束,子線程也會跟着結束,但是從運行結果中我們可以看出,主線程3秒后,陷入等待,等子線程運行完之后,才會繼續下面的代碼。
4 線程間的同步機制
在默認情況在,多個線程之間是並發執行的,這就可能給數據代碼不安全性,例如有一個全局變量num=10,線程1、線程2每次讀取該變量后在原有值基礎上減1。但,如果線程1讀取num的值(num=10)后,還沒來得及減1,CPU就切換去執行線程2,線程2也去讀取num,這時候讀取到的值也還是num=10,然后讓num=9,這是CPU有切換回線程1,因為線程1讀取到的值是原來的num=10,所以做減1運算后,也做出num=9的結果。兩個線程都執行了該任務,但最后的值可不是8。如下代碼所示:
import time import threading def fun(): global num temp = num time.sleep(0.2) temp -= 1 num = temp print('主線程開始運行……') t_lst = [] num =10 # 全局變量 for i in range(10): t = threading.Thread(target=fun) t_lst.append(t) t.start() [t.join() for t in t_lst] print('num最后的值為:{}'.format(num)) print('主線程結束運行……')
輸出結果:
主線程開始運行……
num最后的值為:9
主線程結束運行……
最后結果為9,不是0。這就造成了數據混亂。所以,就有了線程同步機制。
4.1 互斥鎖:Lock
線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。互斥鎖為資源設置一個狀態:鎖定和非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。
import time import threading def fun(lock): lock.acquire() global num temp = num time.sleep(0.2) temp -= 1 num = temp lock.release() print('主線程開始運行……') t_lst = [] num =10 # 全局變量 lock = threading.Lock() for i in range(10): t = threading.Thread(target=fun , args=(lock,)) t_lst.append(t) t.start() [t.join() for t in t_lst] print('num最后的值為:{}'.format(num)) print('主線程結束運行……')
輸出結果:
主線程開始運行……
num最后的值為:0
主線程結束運行……
可以看到,最后輸出結果為0,值正確。當然,如果你運行了上述兩塊代碼,你就會發現,使用了鎖之后,代碼運行速度明顯降低,這是因為線程由原來的並發執行變成了串行,不過數據安全性得到保證。
使用Lock的時候必須注意是否會陷入死鎖,所謂死鎖是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程。關於死鎖一個著名的模型是“科學家吃面”模型:
import time from threading import Thread from threading import Lock def eatNoodles_1(noodle_lock, fork_lock, scientist): noodle_lock.acquire() print('{} 拿到了面'.format(scientist)) fork_lock.acquire() print('{} 拿到了叉子'.format(scientist)) time.sleep(1) print('{} 吃到了面'.format(scientist)) fork_lock.release() noodle_lock.release() print('{} 放下了面'.format(scientist)) print('{} 放下了叉子'.format(scientist)) def eatNoodles_2(noodle_lock, fork_lock, scientist): fork_lock.acquire() print('{} 拿到了叉子'.format(scientist)) noodle_lock.acquire() print('{} 拿到了面'.format(scientist)) print('{} 吃到了面'.format(scientist)) noodle_lock.release() print('{} 放下了面'.format(scientist)) fork_lock.release() print('{} 放下了叉子'.format(scientist)) scientist_list1 = ['霍金','居里夫人'] scientist_list2 = ['愛因斯坦','富蘭克林'] noodle_lock = Lock() fork_lock = Lock() for i in scientist_list1: t = Thread(target=eatNoodles_1, args=(noodle_lock, fork_lock, i)) t.start() for i in scientist_list2: t = Thread(target=eatNoodles_2, args=(noodle_lock, fork_lock, i)) t.start()
輸出結果:
霍金 拿到了面
霍金 拿到了叉子
霍金 吃到了面
霍金 放下了面
霍金 放下了叉子
愛因斯坦 拿到了叉子
居里夫人 拿到了面
霍金吃完后,愛因斯坦拿到了叉子,把叉子鎖住了;居里夫人拿到了面,把面鎖住了。愛因斯坦就想:居里夫人不給我面,我就吃不了面,所以我不給叉子。居里夫人就想:愛因斯坦不給我叉子我也吃不了面,我就不給叉子。所以就陷入了死循環。
為了解決Lock死鎖的情況,就有了遞歸鎖:RLock。
4.2 遞歸鎖:RLock
所謂的遞歸鎖也被稱為“鎖中鎖”,指一個線程可以多次申請同一把鎖,但是不會造成死鎖,這就可以用來解決上面的死鎖問題。
import time from threading import Thread from threading import RLock def eatNoodles_1(noodle_lock, fork_lock, scientist): noodle_lock.acquire() print('{} 拿到了面'.format(scientist)) fork_lock.acquire() print('{} 拿到了叉子'.format(scientist)) time.sleep(1) print('{} 吃到了面'.format(scientist)) fork_lock.release() noodle_lock.release() print('{} 放下了面'.format(scientist)) print('{} 放下了叉子'.format(scientist)) def eatNoodles_2(noodle_lock, fork_lock, scientist): fork_lock.acquire() print('{} 拿到了叉子'.format(scientist)) noodle_lock.acquire() print('{} 拿到了面'.format(scientist)) print('{} 吃到了面'.format(scientist)) noodle_lock.release() print('{} 放下了面'.format(scientist)) fork_lock.release() print('{} 放下了叉子'.format(scientist)) scientist_list1 = ['霍金','居里夫人'] scientist_list2 = ['愛因斯坦','富蘭克林'] noodle_lock=fork_lock = RLock() for i in scientist_list1: t = Thread(target=eatNoodles_1, args=(noodle_lock, fork_lock, i)) t.start() for i in scientist_list2: t = Thread(target=eatNoodles_2, args=(noodle_lock, fork_lock, i)) t.start()
上面代碼可以正常運行到所有科學家吃完面條。
RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞歸鎖可以連續acquire多次,而互斥鎖只能acquire一次
4.3 Condition
Condition可以認為是一把比Lock和RLOK更加高級的鎖,其在內部維護一個瑣對象(默認是RLock),可以在創建Condigtion對象的時候把瑣對象作為參數傳入。Condition也提供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的調用內部瑣對象的對應的方法而已。Condition內部常用方法如下:
1)acquire(): 上線程鎖
2)release(): 釋放鎖
3)wait(timeout): 線程掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)才會被喚醒繼續運行。wait()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError。
4)notify(n=1): 通知其他線程,那些掛起的線程接到這個通知之后會開始運行,默認是通知一個正等待該condition的線程,最多則喚醒n個等待的線程。notify()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError。notify()不會主動釋放Lock。
5)notifyAll(): 如果wait狀態線程比較多,notifyAll的作用就是通知所有線程
需要注意的是,notify()方法、notifyAll()方法只有在占用瑣(acquire)之后才能調用,否則將會產生RuntimeError異常。
用Condition來實現生產者消費者模型:
import threading import time # 生產者 def produce(con): # 鎖定線程 global num con.acquire() print("工廠開始生產……") while True: num += 1 print("已生產商品數量:{}".format(num)) time.sleep(1) if num >= 5: print("商品數量達到5件,倉庫飽滿,停止生產……") con.notify() # 喚醒消費者 con.wait()# 生產者自身陷入沉睡 # 釋放鎖 con.release()
# 消費者 def consumer(con): con.acquire() global num print("消費者開始消費……") while True: num -= 1 print("剩余商品數量:{}".format(num)) time.sleep(2) if num <= 0: print("庫存為0,通知工廠開始生產……") con.notify() # 喚醒生產者線程 con.wait() # 消費者自身陷入沉睡 con.release() con = threading.Condition() num = 0 p = threading.Thread(target=produce , args=(con ,)) c = threading.Thread(target=consumer , args=(con ,)) p.start() c.start()
輸出結果:
工廠開始生產……
已生產商品數量:1
已生產商品數量:2
已生產商品數量:3
已生產商品數量:4
已生產商品數量:5
商品數量達到5件,倉庫飽滿,停止生產……
消費者開始消費……
剩余商品數量:4
剩余商品數量:3
剩余商品數量:2
剩余商品數量:1
剩余商品數量:0
庫存為0,通知工廠開始生產……
已生產商品數量:1
已生產商品數量:2
已生產商品數量:3
已生產商品數量:4
4.4 信號量:Semaphore
鎖同時只允許一個線程更改數據,而信號量是同時允許一定數量的進程更改數據 。繼續用上篇博文中用的的吃飯例子,加入有一下應用場景:有10個人吃飯,但只有一張餐桌,只允許做3個人,沒上桌的人不允許吃飯,已上桌吃完飯離座之后,下面的人才能搶占桌子繼續吃飯,如果不用信號量,肯定是10人一窩蜂一起吃飯:
from threading import Thread import time import random def fun(i): print('{}號顧客上座,開始吃飯'.format(i)) time.sleep(random.random()) print('{}號顧客吃完飯了,離座'.format(i)) if __name__=='__main__': for i in range(20): p = Thread(target=fun, args=(i,)) p.start()
輸出結果:
0號顧客上座,開始吃飯
1號顧客上座,開始吃飯
2號顧客上座,開始吃飯
3號顧客上座,開始吃飯
4號顧客上座,開始吃飯
5號顧客上座,開始吃飯
6號顧客上座,開始吃飯
7號顧客上座,開始吃飯
8號顧客上座,開始吃飯
9號顧客上座,開始吃飯
3號顧客吃完飯了,離座
4號顧客吃完飯了,離座
2號顧客吃完飯了,離座
0號顧客吃完飯了,離座
8號顧客吃完飯了,離座
5號顧客吃完飯了,離座
1號顧客吃完飯了,離座
6號顧客吃完飯了,離座
9號顧客吃完飯了,離座
7號顧客吃完飯了,離座
使用信號量之后:
from threading import Thread import time import random from threading import Semaphore def fun(i , sem): sem.acquire() print('{}號顧客上座,開始吃飯'.format(i)) time.sleep(random.random()) print('{}號顧客吃完飯了,離座'.format(i)) sem.release() if __name__=='__main__': sem = Semaphore(3) for i in range(10): p = Thread(target=fun, args=(i,sem)) p.start()
輸出結果:
0號顧客上座,開始吃飯
1號顧客上座,開始吃飯
2號顧客上座,開始吃飯
2號顧客吃完飯了,離座
3號顧客上座,開始吃飯
0號顧客吃完飯了,離座
4號顧客上座,開始吃飯
1號顧客吃完飯了,離座
5號顧客上座,開始吃飯
3號顧客吃完飯了,離座
6號顧客上座,開始吃飯
5號顧客吃完飯了,離座
7號顧客上座,開始吃飯
4號顧客吃完飯了,離座
8號顧客上座,開始吃飯
8號顧客吃完飯了,離座
9號顧客上座,開始吃飯
9號顧客吃完飯了,離座
6號顧客吃完飯了,離座
7號顧客吃完飯了,離座
4.5 事件:Event
如果程序中的其他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時候就可以用threading為我們提供的Event對象,Event對象主要有一下幾個方法:
isSet():返回event的狀態值;
wait():如果 isSet()==False將阻塞線程;
set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
clear():恢復event的狀態值為False。
有如下需求:獲取當前時間的秒數的個位數,如果小於5,設置子線程阻塞,如果大於5則設置子進程非阻塞。代碼如下:
from threading import Event, Thread import time from datetime import datetime def func(e): print('子線程:開始運行……') while True: print('子線程:現在事件秒數是{}'.format(datetime.now().second)) e.wait() # 阻塞等待信號 這里插入了一個Flag 默認為 False time.sleep(1) e = Event() p = Thread(target=func, args=(e,)) p.daemon=True p.start() for i in range(10): s = int(str(datetime.now().second)[-1])#獲取當前秒數的個位數 if s < 5: print('子線程線入阻塞狀態') e.clear() # 使插入的flag為False 線程線入阻塞狀態 else: print('子線程取消阻塞狀態') e.set() # 線程線入非阻塞狀態 time.sleep(1) e.set() print("主線程運行結束……")
輸出結果:
子線程:開始運行……
子線程:現在事件秒數是43
子線程線入阻塞狀態
子線程線入阻塞狀態
子線程取消阻塞狀態
子線程取消阻塞狀態
子線程:現在事件秒數是46
子線程取消阻塞狀態
子線程:現在事件秒數是47
子線程取消阻塞狀態
子線程:現在事件秒數是48
子線程取消阻塞狀態
子線程:現在事件秒數是49
子線程線入阻塞狀態
子線程:現在事件秒數是50
子線程線入阻塞狀態
子線程線入阻塞狀態
主線程運行結束……
4.6 定時器:Timer
如果想要實現每隔一段時間就調用一個函數的話,就要在Timer調用的函數中,再次設置Timer。Timer是Thread類的一個子類。
如果是多長時間后只執行一次:
import threading import time def sayTime(name): print('你好,{}為您報時,現在時間是:{}'.format(name , time.ctime())) if __name__ == "__main__": timer = threading.Timer(2.0, sayTime, ["Jane"]) timer.start() 輸出結果: 你好,Jane為您報時,現在時間是:Thu Dec 6 15:03:41 2018 如果要每個多長時間執行一次: import threading import time def sayTime(name): print('你好,{}為您報時,現在時間是:{}'.format(name , time.ctime())) global timer timer = threading.Timer(3.0, sayTime, [name]) timer.start() if __name__ == "__main__": timer = threading.Timer(2.0, sayTime, ["Jane"]) timer.start()
輸出結果:
你好,Jane為您報時,現在時間是:Thu Dec 6 15:04:30 2018
你好,Jane為您報時,現在時間是:Thu Dec 6 15:04:33 2018
你好,Jane為您報時,現在時間是:Thu Dec 6 15:04:36 2018
你好,Jane為您報時,現在時間是:Thu Dec 6 15:04:39 2018
……
5 進程間的通行
5.1隊列:Queue
python中Queue模塊提供了隊列都實現了鎖原語,是線程安全的,能夠在多線程中直接使用。Queue中的隊列包括以下三種:
1)FIFO(先進先出)隊列, 第一加入隊列的任務, 被第一個取出;
2)LIFO(后進先出)隊列,最后加入隊列的任務, 被第一個取出;
3)PriorityQueue(優先級)隊列, 保持隊列數據有序, 最小值被先取出。
Queue模塊中的常用方法如下:
qsize() 返回隊列的規模
empty() 如果隊列為空,返回True,否則False
full() 如果隊列滿了,返回True,否則False
get([block[, timeout]])獲取隊列,timeout等待時間
get_nowait() 相當get(False)
put(item) 寫入隊列,timeout等待時間,如果隊列已滿再調用該方法會阻塞線程
put_nowait(item) 相當put(item, False)
task_done() 在完成一項工作之后,task_done()函數向任務已經完成的隊列發送一個信號
join() 實際上意味着等到隊列為空,再執行別的操作。
import queue import threading def fun(): while True: try: data = q.get(block = True, timeout = 1) #不設置阻塞的話會一直去嘗試獲取資源 except queue.Empty: print(' {}結束……'.format(threading.current_thread().name)) break print(' {}取得數據:{}'.format(threading.current_thread().name , data)) q.task_done() print(' {}結束……'.format(threading.current_thread().name)) print("主線程開始運行……") q = queue.Queue(5) #往隊列里面放5個數 for i in range(5): q.put(i) for i in range(0, 3): t = threading.Thread(target=fun , name='線程'+str(i)) t.start() q.join() #等待所有的隊列資源都用完 print("主線程結束運行……")
輸出結果:
主線程開始運行……
線程0取得數據:0
線程0結束……
線程0取得數據:1
線程0結束……
線程1取得數據:2
線程0取得數據:3
線程0結束……
線程0取得數據:4
線程0結束……
線程1結束……
主線程結束運行……
線程1結束……
線程2結束……
線程0結束……
6 線程池
在我們上面執行多個任務時,使用的線程方案都是“即時創建, 即時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處於不停的創建線程,銷毀線程的狀態。一個線程的運行時間可以分為3部分:線程的啟動時間、線程體的運行時間和線程的銷毀時間。在多線程處理的情景中,如果線程不能被重用,就意味着每次創建都需要經過啟動、銷毀和運行3個過程。這必然會增加系統相應的時間,降低了效率。所以就有了線程池的誕生,
由於線程預先被創建並放入線程池中,同時處理完當前任務之后並不銷毀而是被安排處理下一個任務,因此能夠避免多次創建線程,從而節省線程創建和銷毀的開銷,能帶來更好的性能和系統穩定性。
from concurrent.futures import ThreadPoolExecutor import time def func(n): time.sleep(2) print(n) return n*n t = ThreadPoolExecutor(max_workers=5) # 最好不要超過CPU核數的5倍 t_lst = [] for i in range(20): r = t.submit(func , 1)#執行任務,傳遞參數 t_lst.append(r.result())#獲取任務返回結果 t.shutdown()#相當於close() + join() print(t_lst) print('主線程運行結束……')
7 總結
關於Python並發編程中的多線程部分就介紹完了,其中諸多描述略帶倉儲,后續博文中再來補充。
參考資料:
https://www.cnblogs.com/linshuhui/p/9704128.html
https://www.cnblogs.com/yoyoketang/p/8337118.html
https://www.cnblogs.com/chengd/articles/7770898.html