python線程(threading )的理解


 

threading --- 基於線程的並行

官方文檔:threading --- 基於線程的並發 — Python 3.9.9 文檔

 

 

CPython implementation detail: 在 CPython 中,由於存在全局解釋器鎖, 同一時刻只有一個線程可以執行 Python 代碼(雖然某些性能導向的庫可能會去除此限制)。 如果你想讓你的應用更好地利用多核心計算機的計算資源,推薦你使用multiprocessing或concurrent.futures.ProcessPoolExecutor但是,如果你想要同時運行多個 I/O 密集型任務,則多線程仍然是一個合適的模型。

再來引入一個概念:

  並行(parallelism):   是同一時刻,每個線程都在執行。

  並發(concurrency):是同一時刻,只有一個線程執行,然后交替執行(再加上電腦運行速度快)。所以從一個宏觀的角度來看,似乎每個線程都在執行了。

可以知道python線程是並發的。

 

 關於線程Threading的方法(獲取線程的某種屬性)。

  active_count():它會獲得,執行這個方法時,還存活的Thread()的對象數量。

  enumerate():返回當前所有存活的Thread對象的列表。

  current_thread():返回當前調用者 控制Thread()線程的對象。如果調用者控制的線程對象不是由threading創建,則會返回一個功能受限的虛擬線程對象。

  get_ident():返回當前線程的“線程標識符”。它是一個非零整數。

  get_native_id():返回內核分配給當前線程的原生集成線程ID。這是一個非負整數。

  main_thread():返回主線程(thread)對象,一般是python解釋器開始時創建的線程。

  

 一、簡介

線程對象:

  官方解釋:

    The Thread class represents an activity that is run in a separate thread of control. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass. No other methods (except for the constructor) should be overridden in a subclass. In other words, only override the __init__() and run() methods of this class.

    Thread類表示在單獨的控制線程中運行的活動。指定活動有兩種方法:將可調用對象傳遞給構造函數,或重寫子類中的run()方法。子類中不應重寫任何其他方法(構造函數除外)。換句話說,只重寫這個類的_init__;()和run()方法。

 

一旦線程活動開始,該線程會被認為是 '存活的' 。當它的run()  方法終結了(不管是正常的還是拋出未被處理的異常),就不是'存活的'。 

 

先看看該類的參數有哪些:

class threading.Thread(group=Nonetarget=Nonename=Noneargs=()kwargs={}*daemon=None)

  group:官方的解釋是,為了日后擴展ThreadGroup類實現而保留。(唉,我也不太清楚的)

  target:是要於多線程的函數

  name:是線程的名字

  args :函數的參數,類型是元組()

  kwargs:函數的參數,類型是字典{}

 

為了便於理解,先舉一個小例子,為了方便理解,先簡單了解一下該類的一個方法(函數在類中被稱為方法):

  start():開始線程活動

 

import threading
import time

def threading_test(name):

    print('進程名字---->:',name)
    print( name,'執行的第一步')
    print( name,'執行的第二步')
    print(name, '執行的第三步')


# 實例化對象
obj1 = threading.Thread(target=threading_test ,args=("obj1",))
obj2 = threading.Thread(target=threading_test ,args=("obj2",))

# 開始線程
obj1.start()
obj2.start()

運行結果:

  

會不會疑惑,嗯?就這,不理解啊,和調用普通的函數有什么區別啊?

 

 

但是加上一個一個延時函數就會看的區別的

  例如,把上面的函數修改成以下的再看看

def threading_test(name):

    print('進程名字---->:',name)
    time.sleep(0.1)
    print( name,'執行的第一步')
    time.sleep(0.5)
    print( name,'執行的第二步')
    time.sleep(0.2)
    print(name, '執行的第三步')

運行結果就會變成:

  

 

看到區別了吧,其實在這里兩個對象是並發運行的,而且多運行幾次你會發現每次運行的結果不一樣。是因為兩個是並發的,誰先執行就要看誰搶得執行權。

 

二、Threading.Thread其中常用到的方法 

既然看懂了多線程並發的效果,那么接下來就更加深入的了解其它的方法吧。

 

  start():開始線程活動。這里注意,每個對象只能執行一次,不信你可以試試,看他會不會拋出RuntimeError這個異常。

  run() :表示線程的方法,在線程被cpu調度后,就會自動執行這個方法。

         但是如果你在自定義的類中想方法run和父類不一樣,可以重寫。

  join() :等待,直到線程結束。

  setName():給線程設置名字

  getName():獲取線程名字

  is_alive():返回線程是否存活(True或者False)

  setDaemon():設置守護線程(True或者False),必須在start()之前設置,不然會報錯。

  isDaemon() :是否是線程守護,默認是False。

 

 

介紹完之后親自嘗試一下看看效果:

  先看看join()

為了清晰的認識這個方法的作用,我把上面的代碼略作改變

import threading
import time

def threading_test(name):

    print('進程名字---->:',name)
    time.sleep(0.1)
    print( name,'執行的第一步')
    time.sleep(0.5)
    print( name,'執行的第二步')
    time.sleep(0.2)
    print(name, '執行的第三步')


# 實例化對象
obj1 = threading.Thread(target=threading_test ,args=("obj1",))
obj2 = threading.Thread(target=threading_test ,args=("obj2",))

# 開始線程
obj1.start()
obj2.start()


print('<-----主線程結束:--------->')

運行結果:

  

 

注意主線程結束出現的位置,后面還會考到。

稍改一下代碼:

# 開始線程
obj1.start()

# 加入join()之后會等調用該方法的線程對象執行完之后才會往下執行代碼
obj1.join()
obj2.start()

print('<-----主線程結束:--------->')

 

運行結果:

  

 

看到了沒,在obj1執行完之后,才會往下執行,但是主線程與obj2好像也是並發的。多運行幾次你會發現,obj2與主線程的執行會有不同的結果。

為了更加清晰的了解,再稍改一下。

# 開始線程
obj1.start()

obj2.start()
# 把join()放在obj2之后
# 加入join()之后會等調用該方法的線程對象執行完之后才會往下執行代碼
obj1.join()

print('<-----主線程結束:--------->')

運行結果:

  

 可以看出,obj1與obj2是並發的允許,執行完obj1.join()后才往下執行

obj1.join()

 

 為靈魂級別的理解這次改到有點大不過放心,代碼簡單。

 
         
import threading
import time

def threading_test(name,n):

    print('進程名字---->:',name)
    time.sleep(1)
    print(name,'進程正在執行')
    time.sleep(n)
    print(name,'這個進程結束')



# 實例化對象
obj1 = threading.Thread(target=threading_test ,args=("obj1",1))
obj2 = threading.Thread(target=threading_test ,args=("obj2",3))

# 開始線程
obj1.start()

obj2.start()
# 把join()放在obj2之后
# 加入join()之后會等調用該方法的線程對象執行完之后才會往下執行代碼
obj1.join()
print('<-----主線程結束:--------->')
運行結果:
  

 

 
         

 這個時候完全看懂了吧,join()在obj2之后,所以obj1與obj2時並發的,但是由於obj2有一個延時大於obj1,所以在obj1執行完之后,obj2還在執行。

 主程序結束這條語句只是等待join()對象的obj1結束才會執行,並不等obj2。你可以把他們兩個延時的參數調換一下,看看結果。

 

 

接下來看看關於setName和getName的用法吧

 

 

# 實例化對象,為了方便就不加參數和函數了,就看看效果。
obj1 = threading.Thread()
obj2 = threading.Thread()
obj3 = threading.Thread()

# 開始線程
obj1.start()
obj2.start()
obj2.setName('你的名字叫做喂')
obj3.start()
time.sleep(5)
print('獲取線程名字',obj1.getName())
print('獲取線程名字',obj2.getName())
print('獲取線程名字',obj3.getName())

print('<-----主線程結束:--------->')

 

 運行結果:

  

 

 

 可以看到,它是有默認的名字的,而且有規律,Thread-N ,如果對Thread類還有印象的話就會知道里面有一個餐宿name,如果你沒有賦值就會從1到N不斷地給它命名

為Thread-N ,而setName只是改變調用這個方法的線程名(這里就是覆蓋掉了默認的名字),並不會改變其它線程的N值。但是如果你在實例化對象時就給name賦值,會對其后面的N值影響。當然初始化順序也會影響N值。

 
         
# 實例化對象,為了方便就不加參數和函數了,就看看效果。
obj1 = threading.Thread()
obj2 = threading.Thread(name='obj2') #注意這里,Thread-2會跳過去而不是被覆蓋

# 初始化順序影響N的值
obj4 = threading.Thread() 
obj3 = threading.Thread()

# 開始線程
obj1.start()

obj2.start()
obj2.setName('你的名字叫做喂')
obj3.start()
time.sleep(5)
print('獲取線程名字',obj1.getName())
print('獲取線程名字',obj2.getName())
print('獲取線程名字',obj3.getName())
print('獲取線程名字',obj4.getName())

print('<-----主線程結束:--------->')
 

運行結果:

  

 

為了省事,我把下面的一起分析了。

  active_count(),current_thread(),get_ident(),enumerate(),main_thread()

這里要注意有一個隱藏線程就是執行整個代碼的主線程。

# 實例化對象
obj1 = threading.Thread(target=test,args=())
obj2 = threading.Thread(target=test,args=())
obj3 = threading.Thread(target=test,args=())
print('在start方法之前有幾個線程活動',threading.active_count(),' 具體是:')
print(threading.enumerate())
obj1.start()
obj2.start()
obj3.start()
print('在全部start方法之后有幾個存活',threading.active_count(), ' 具體是:')
print(threading.enumerate())
print('返回當前線程的“線程標識符”:',threading.get_ident())
print('返回當前調用者 控制Thread線程的對象:',threading.current_thread())
print('返回主線程(thread)對象:',threading.main_thread())

 

運行結果:     

 

 

 可以看到,主線程在運行代碼時就會開啟,而自己利用threading創建的線程只有start()之后才會‘存活’。而且在主線程里面執行其中的一些方法都會指向主線程。

 

再改一下代碼,

def test(name,n):
    time.sleep(n)
    print('<-------------------------------------------------------------------------->')
    print('對象',name,'在延時',n,'秒之后的一些信息是:')
    print('現在還存活的有', threading.active_count(), '個 具體是:')
    print(threading.enumerate())
    print('返回當前線程的“線程標識符”:', threading.get_ident())
    print('返回當前調用者 控制Thread線程的對象:', threading.current_thread())
    print('返回主線程(thread)對象:', threading.main_thread())


# 實例化對象
obj1 = threading.Thread(target=test,args=('obj1',1))
obj2 = threading.Thread(target=test,args=('obj2',8))
obj1.start()
obj2.start()
time.sleep(4)

運行結果:

  

 

 

 可以看到了get_ident與current_thread,就是獲取當前的情況,main_thread()仍舊不變。這里需要注意,在執行obj2時由於延時8秒,所以obj1已經執行結束了。

 setDaemon()守護線程:

def test(name,n):
    time.sleep(n)
    print('<-------------------------------------------------------------------------->')
    print('對象',name,'在延時',n,'秒之后是否還能繼續:')

# 實例化對象
obj1 = threading.Thread(target=test,args=('obj1',1))
obj2 = threading.Thread(target=test,args=('obj2',8))
obj1.setDaemon(False)
obj2.setDaemon(False)
obj1.start()
obj2.start()
time.sleep(4)

print('主代碼運行結束')

都設置為False的運行結果:

  

 都設置為True的運行結果:

 

 

從這兩個結果應該能看出:當他們都設置為True時,會隨着主線程的結束而退出程序,所以會把還沒執行完的其它線程也結束。

但是有沒有想過其中有一個為True,一個為False會是怎么樣的一個結果。有這樣的一個結論:當沒有存活的非守護線程時,整個Python程序才會退出。

 

這里把obj1為True,obj2為False

 

把兩個秒數對換一下(或者Ture與False對換也行)

 

 看出來了,如果主線程結束,還有非守護線程在執行,程序就不會退出,就能繼續執行。

 

三、關於threading其它類的知識。

引入:回想一下上面,各個線程是並發。而且每一次運行的結果是是不可預知的,是隨機的。這在某些情況是很不好的,有些線程會有先后的關系。

  例如:同時copy多個文件,雖然讀文件是可以同時讀所需要copy的文件,寫也是一樣的;但是對於一個文件而言,只能先讀取,才能寫。

  所以有了同步,互斥等關系。

 

為了下面的知識方便講解:

講一下線程的知識:(鏈接:(23條消息) 線程的5種狀態詳解_老貓的博客-CSDN博客_線程狀態 )

1.新建狀態(New)

  通過:threading.Thread()

2.就緒狀態(Runnable)

  當線程對象調用start()方法即啟動了線程,start()方法創建線程運行的系統資源,並調度線程運行run()方法。當start()方法返回后,線程就處於就緒狀態。

  處於就緒狀態的線程並不一定立即運行run()方法,線程還必須同其他線程競爭CPU時間,只有獲得CPU時間才可以運行線程。

3.運行狀態(Running)

  當線程獲得CPU時間后,它才進入運行狀態,真正開始執行run()方法。

4. 阻塞狀態(Blocked)

        線程運行過程中,可能由於各種原因進入阻塞狀態:
        1>線程通過調用sleep方法進入睡眠狀態;
        2>線程調用一個在I/O上被阻塞的操作,即該操作在輸入輸出操作完成之前不會返回到它的調用者;
        3>線程試圖得到一個鎖,而該鎖正被其他線程持有;
        4>線程在等待某個觸發條件;
        ......           
        所謂阻塞狀態是正在運行的線程沒有運行結束,暫時讓出CPU,這時其他處於就緒狀態的線程就可以獲得CPU時間,進入運行狀態。

  注意:與阻塞對應的是喚醒,由什么事件阻塞,就應由什么事件喚醒,例如由於sleep阻塞,喚醒則是因為sleep結束。

 

 

 

4. 死亡狀態(Dead)

  有兩個原因會導致線程死亡:

    1)run方法正常退出而自然死亡。

    2)一個未捕獲的異常終止了run方法而使線程猝死。

  

其實第二部分一直在講Threading.Thread這里類

1、threading.Lock 這個類用於鎖對象。

我先拋出一個官方的結論(很重要記住啊)。實現原始鎖對象的類。一旦一個線程獲得一個鎖,會阻塞隨后嘗試獲得鎖的線程,直到它被釋放;任何線程都可以釋放它。

 

原始鎖是一個在鎖定時不屬於特定線程的同步基元組件。原始鎖處於 "鎖定" 或者 "非鎖定" 兩種狀態之一。它被創建時為非鎖定狀態。他有兩個基本方法。

acquire()和release()

 

acquire(blocking=Truetimeout=- 1):

  可以阻塞或非阻塞地獲得鎖(這里可以理解為我為了獲得鎖需不需要暫停的等)。

  blocking:默認為True (為阻塞),False(為非阻塞)。

  tiemout:只要無法獲得鎖,將最多阻塞 timeout 設定的秒數。只在要鎖定的情況才有用。默認為-1

  如果已經獲得鎖則返回True,如果超時則返回False

release():釋放鎖(解鎖),由鎖定轉化為非鎖定。注意不能對未鎖定的對象進行解鎖否則會報錯。

looked():返回是否處於鎖定狀態。 

 

import threading
import time


# 實例化鎖對象
look=threading.Lock()
# 鎖定
flag=look.acquire()
print("flag=",flag)
print("look.locked()=",look.locked())

flag=look.acquire(timeout=2) #這里可以解釋為,我最多只等你兩秒,否則就返回False print("flag=",flag) print("look.locked()1=",look.locked()) # 對鎖定的對象解鎖可以 look.release() print("look.locked()2=",look.locked()) print('看看下面是否出錯') look.release()

 

運行結果:

 

 再看一下阻塞和非阻塞這個問題

threading.Thread()

# 實例化鎖對象,初始化為非鎖定
look=threading.Lock()
# 非阻塞獲取鎖,由於處於非鎖定,所以可以鎖定。
flag0=look.acquire(blocking=False)
print("flag0=",flag0)


# 不阻塞,立即取申請鎖,且不成功則返回False(不阻塞的會 由於timeout這個參數會出錯)
flag=look.acquire(blocking=False)
print("flag=",flag)
# 阻塞,默認timeout=-1 它可以無限地等待
# 最多等3秒,不成功則返回False
flag1=look.acquire(blocking=True,timeout=3)
print("flag1=",flag1)
look.release()

 

 

 

 

 下面介紹多線程的另一種的創建方法:繼承父類,可以重寫run方法。

import threading
import time

# 繼承threading.Thread這個類
class BankAccount(threading.Thread):
    def __init__(self, user,sleep_time):
        # 重構run函數必須要寫
        super(BankAccount, self).__init__()
        # 用戶
        self.user=user# 睡眠時間
        self.sleep_time=sleep_time

    def run(self):
        time.sleep(1)
        print(self.user,'開始')



# 實例化對象
obj1 = BankAccount('obj1',100,6)
obj2 = BankAccount('obj2',100,5)
obj3 = BankAccount('obj3',100,4)
obj4 = BankAccount('obj4',100,3)
obj5 = BankAccount('obj5',100,2)
obj6 = BankAccount('obj6',100,1)


obj_list=[obj1,obj2,obj3,obj4,obj5,obj6]
# 開始
for i in range(6):
    obj_list[i].start()

未加鎖定的情況:

  

 

 加鎖之后:

    def run(self):
        look.acquire()
        time.sleep(1)
        print(self.user,'開始')
        look.release()

運行結果:

  

 

 注意由於BankAccount繼承於threading.Thread這個類重寫了run()這個方法,它在start()之后就會自動run()。obj1先啟動obj1所以先鎖定。等obj1運行到解鎖,才會讓obj2對look這個對象上鎖 ......。

 

再來一個有趣一點的代碼吧,也就是改一下的啦。

import threading
import time

# 繼承threading.Thread這個類
class BankAccount(threading.Thread):
    def __init__(self, user, sleep_time):
        # 重構run函數必須要寫
        super(BankAccount, self).__init__()
        # 用戶
        self.user=user
        # 睡眠時間
        self.sleep_time=sleep_time

    def run(self):
        start=time.time()

        # 鎖定
        look.acquire()
        time.sleep(1)
        print(self.user,'開始')

        # 釋放鎖
        look.release()
        end=time.time()
        print(self.user, '鎖運行時間為:', end - start)

        time.sleep(self.sleep_time)
        print(self.user,'鎖結束之后')



# 實例化對象
obj1 = BankAccount('obj1',100,6)
obj2 = BankAccount('obj2',100,5)
obj3 = BankAccount('obj3',100,4)
obj4 = BankAccount('obj4',100,3)
obj5 = BankAccount('obj5',100,2)
obj6 = BankAccount('obj6',100,1)


obj_list=[obj1,obj2,obj3,obj4,obj5,obj6]
# 實例化鎖對象
look=threading.Lock()
# 開始
for i in range(6):
    obj_list[i].start()

 

 

運行結果:

  

 

 

 

  分析一下:每個給鎖的運行時間不一樣,通過time.sleep(self.sleep_time)使得 print(self.user,'鎖結束之后') 這條語句它們再同一起跑線,由於這6個對象屬於多線程並發。所以由obj1,obj2,obj3的這種情況。

 

 

 

2、再來介紹一個相似的:遞歸鎖對象threading.RLoock這個類。

都是有兩個方法:acquire(),release(),用法差不多。但是沒有looked()

來講一下區別:

  遞歸鎖對象:對於同一個線程同一個鎖對象可以上鎖再上鎖(相當於多把鎖),每個線程解鎖只能有開啟的線程關閉。

    一個線程對象獲得了幾次鎖,必須解幾次鎖,才會讓其他線程能鎖定。

  

  鎖對象 : 同一個線程只能有一把鎖,每個線程可以由其它線程解鎖。

 

來看看效果

首先時鎖對象:

import threading
import time


def test(name):
    if name=='obj1':
        # obj1線程鎖定
        look.acquire()
    if name=='obj2':
        # # obj2線程解鎖
        look.release()
    time.sleep(1)
    print(name,'運行中')


# 實例化遞歸鎖對象
Rlook=threading.RLock()

# 實例化鎖對象
look=threading.Lock()

# 實例化線程對象
obj1=threading.Thread(target=test,args=("obj1",))
obj2=threading.Thread(target=test,args=("obj2",))

# 列表
obj_list=[obj1,obj2]

for obj in obj_list:
    obj.start()

 

運行結果:

  

 

 

 遞歸鎖對象(略改一下):

def test(name):
    if name=='obj1':
        # obj1線程鎖定
        Rlook.acquire()
    if name=='obj2':
        # obj2線程解鎖
        Rlook.release()
    time.sleep(1)
    print(name,'運行中')

 

 

報錯:

  

 

 

   在遞歸鎖對象中obj2沒上鎖,不能解鎖

  

再略改下:

def test(name):
    if name=='obj1':
        Rlook.acquire()
        Rlook.acquire(timeout=2)
        Rlook.release()
    if name=='obj2':
        Rlook.acquire() # 對於同一個鎖對象,obj1兩個鎖只解了一個,所以obj2不能上鎖。會在這里給你卡死
        print('我是obj2')
    time.sleep(1)
    print(name,'運行中')

運行結果:

  

 

再改一下:

def test(name):
    if name=='obj1':
        Rlook.acquire()
        Rlook.acquire(timeout=2)
        Rlook.release()
        Rlook.release()
    if name=='obj2':
        Rlook.acquire() 
        print('我是obj2')
        Rlook.release()
    time.sleep(1)
    print(name,'運行中')

 

 

運行結果:

  

 

 

3、條件對象threading.Condition這個類:

方法有:

  acquire(*arg):

    請求底層鎖。此方法調用底層鎖的相應方法,返回值是底層鎖相應方法的返回值。

  release():

    釋放底層鎖。此方法調用底層鎖的相應方法。沒有返回值。

  wait(timeout=None) :

    等待直到被通知或發生超時。如果線程在調用此方法時沒有獲得鎖,將會引發RuntimeError異常。

    這個方法釋放底層鎖,然后阻塞,直到在另外一個線程中調用同一個條件變量的 notify() 或 notify_all() 喚醒它,或者直到可選的超時發生。

      一旦被喚醒或者超時,它重新獲得鎖並返回。

    當底層鎖是個 RLock ,不會使用它的 release() 方法釋放鎖,因為當它被遞歸多次獲取時,實際上可能無法解鎖。相反,使用了RLock  類的內部接口,

      即使多次遞歸獲取它也能解鎖它。 然后,在重新獲取鎖時,使用另一個內部接口來恢復遞歸級別。

    返回 True ,除非提供的 timeout 過期,這種情況下返回 False

  wait_for(predicatetimeout=None) :

    等待,直到條件計算為真。 predicate 應該是一個可調用對象而且它的返回值可被解釋為一個布爾值。可以提供 timeout 參數給出最大等待時間。

    這個實用方法會重復地調用 wait() 直到滿足判斷式或者發生超時。返回值是判斷式最后一個返回值,而且如果方法發生超時會返回 False 。

忽略超時功能,調用此方法大致相當於編寫:

while not predicate(): cv.wait()

因此,規則同樣適用於wait() :鎖必須在被調用時保持獲取,並在返回時重新獲取。 隨着鎖定執行判斷式。

  notify() :

    默認喚醒一個等待這個條件的線程。如果調用線程在沒有獲得鎖的情況下調用這個方法,會引發 RuntimeError異常。

       這個方法喚醒最多 n 個正在等待這個條件變量的線程;如果沒有線程在等待,這是一個空操作。

       當前實現中,如果至少有 n 個線程正在等待,准確喚醒 n 個線程。但是依賴這個行為並不安全。未來,優化的實現有時會喚醒超過 n 個線程。

    注意:被喚醒的線程並沒有真正恢復到它調用的 wait() ,直到它可以重新獲得鎖。 因為 notify()不釋放鎖,其調用者才應該這樣做。

  notify_all() :

    喚醒所有正在等待這個條件的線程。這個方法行為與 notify() 相似,但並不只喚醒單一線程,而是喚醒所有等待線程。

    如果調用線程在調用這個方法時沒有獲得鎖,會引發RuntimeError 異常。

 

 

import threading
import time


def test_a(name,condition):
    # 申請鎖,如果成功
    if cov.acquire():
        print(name)


# 條件對象鎖
cov = threading.Condition()
obj1 = threading.Thread(target=test_a,args=('obj1',12))
obj1.start()

obj2 = threading.Thread(target=test_a,args=('obj2',10))
obj2.start()

運行結果:

  

 

 

 稍改一下:

  

def test_a(name,condition):
    # 申請鎖,如果成功
    if cov.acquire():
        print(name)
        cov.release()
運行結果:
  

可以看出來在一個線程獲得鎖,就會阻止其它線程獲得。

 

再修改一下:

def test_a(name,condition):
    # 申請鎖,如果成功
    if cov.acquire():
        #
        if condition>11:
            # 進入阻塞狀態,並釋放鎖讓其它線程可以活得鎖
            cov.wait()

        print(name)
        cov.release()



# 條件對象鎖
cov = threading.Condition()
obj1 = threading.Thread(target=test_a,args=('obj1',12))
obj1.start()

obj2 = threading.Thread(target=test_a,args=('obj2',10))
obj2.start()

運行結果:
  

 

可以看出:obj1的條件滿足就進入阻塞狀態,並把鎖釋放。obj2可以獲得鎖所以可以繼續往下運行,但是obj1一直每喚醒,會卡死。

 

再稍改一下:

def test_a(name,condition):
    # 申請鎖,如果成功
    if cov.acquire():        if condition>11:
            # 進入阻塞狀態,並釋放鎖讓其它線程可以活得鎖
            cov.wait()
        # 喚醒進入阻塞的狀態
        cov.notify()
        print(name)
        cov.release()

運行結果:
  

 可以看出obj2的notify 把進入到obj1阻塞狀態的線程喚醒,並運行結束

 

def test_a(name,condition):
    # 申請鎖,如果成功
    if cov.acquire():
        #
        if condition>11:
            # 進入阻塞狀態,並釋放鎖讓其它線程可以活得鎖
            cov.wait()
        # 喚醒進入阻塞的狀態,只有obj2才喚醒,由於多了一個對象也滿足條件,所以只釋放了一個,還有一個沒釋放,也會卡死。
        if name=="obj2":
            cov.notify()
        print(name)
        cov.release()




# 條件對象鎖
cov = threading.Condition()
obj1 = threading.Thread(target=test_a,args=('obj1',12))
obj1.start()

# 加入一個新對象
obj3 = threading.Thread(target=test_a,args=('obj3',13))
obj3.start()

obj2 = threading.Thread(target=test_a,args=('obj2',10))
obj2.start()

運行結果:
  

把上面的notify(), 改成notify(2)可以喚醒兩個線程。所以剛好可以全部喚醒。運行結果:

 

 
         

 換成cov.notify_all()喚醒全部。運行結果如上。

 

注意:喚醒之后,它會把進入阻塞狀態的線程先隨機挑出一個線程鎖定,如果不是放鎖。另一個線程就會無法鎖定,卡死。多運行幾次你會發現obj1與obj3的先后不確定。




再變一下:

if name=="obj2":
  cov.notify()
變成:
# 喚醒進入阻塞的狀態,誰都可以喚醒一個,obj2不會進入阻塞,可以釋放一個線程,釋放之后。喚醒的繼續往下執行,也會釋放另一個。也會上面的結果。
cov.notify()

 下面再介紹他的最后一個方法:

wait_for(predicate, timeout=None)

predicate為一個可調用對象,返回值可以解釋為布爾值。

 python可調用對象有七種:

  1、自定義函數

  2、內置函數

  3、內置方法

  4、方法

  5、類

  6、類的實例

  7、使用yield關鍵字的函數或方法

具體詳細的可以參考這個:Python 可調用對象 - _Joshua - 博客園 (cnblogs.com)

 

def test_a(name,condition):
    # 申請鎖,如果成功
    if cov.acquire():
        # func為可調用對象的自定義函數, 注意這里是func返回的解釋為False才會wait()
        # 這里需要住喲tmeout=-1並不是無限時間,None
        cov.wait_for(func,timeout=None)
print(name)
        cov.release()

# 自定義函數,測試可調式對象
def func():
   # 0可以解釋為False value
=0 return value # 條件對象鎖 cov = threading.Condition() obj1 = threading.Thread(target=test_a,args=('obj1',12)) obj1.start() 運行結果:
  

 

 
         

 卡死

把上面的

cov.wait_for(func,timeout=NOne)
改成
cov.wait_for(func,timeout=4)
大概四秒之后就會有輸出。

 

改一下:

 
         
class A(threading.Thread):
    def __init__(self,name, condition):
        super(A,self).__init__()
        self.name=name
        self.condition=condition
    def run(slef):
        # 申請鎖,如果成功
        if cov.acquire():
            cov.wait_for(slef.func,timeout=None)
            
            # 喚醒
            cov.notify()
            print(slef.name)
            cov.release()

    # 自定義函數,測試可調式對象,只有obj2才返回1可解釋為True
    def func(slef):
        # python三目運算符
        # 條件為真時的結果 if 判段的條件 else 條件為假時的結果
        value = 1 if(slef.condition<11) else 0
        return value



# 條件對象鎖
cov = threading.Condition()
obj1 = A('obj1',12)
obj1.start()


obj2 = A('obj2',10)
obj2.start()

運行結果:

 

obj1會進入阻塞狀態,但是沒被喚醒,卡死,只輸出obj2。雖然有喚醒,
但由於wait_for(predicatetimeout=None)
while not predicate(): cv.wait()
會一直wait,除非predicate()返回值變了

 

 

 

4、信號量對象:

class threading.Semaphore(value=1)

  可選參數 value 賦予內部計數器初始值,默認值為 1 。如果 value 被賦予小於0的值,將會引發ValueError 異常。

 

方法:

acquire(blocking=Truetimeout=None)

獲取一個信號量。

在不帶參數的情況下調用時:

  • 如果在進入時內部計數器的值大於零,則將其減一並立即返回 True

  • 如果在進入時內部計數器的值為零,則將會阻塞直到被對release()  的調用喚醒。 一旦被喚醒(並且計數器的值大於 0),則將計數器減 1 並返回 True。 每次對release()  的調用將只喚醒一個線程。 線程被喚醒的次序是不可確定的。

當發起調用時將 blocking 設為假值,則不進行阻塞。 如果一個無參數調用將要阻塞,則立即返回 False;在其他情況下,執行與無參數調用時一樣的操作,然后返回 True

當發起調用時如果 timeout 不為 None,則它將阻塞最多 timeout 秒。 請求在此時段時未能成功完成獲取則將返回 False。 在其他情況下返回 True

 

release(n=1)

釋放一個信號量,將內部計數器的值增加 n。 當進入時值為零且有其他線程正在等待它再次變為大於零時,則喚醒那 n 個線程。默認喚醒一個線程。

import threading
import time


def test(name):
   # 獲得一個信號
if sem.acquire(): print(name,'開始') # 信號量對象對象,value為計數初值。 sem=threading.Semaphore(value=3) # 線程對象 obj1=threading.Thread(target=test,args=('obj1',)) obj1.start() obj2=threading.Thread(target=test,args=('obj2',)) obj2.start() obj3=threading.Thread(target=test,args=('obj3',)) obj3.start()

運行結果:

 

 
         

value為3,而線程對象也是三個

 
         

把value改為2:

 
         

 

 

 obj2的時候計數器減為0,就會阻塞(blocking默認為Ture,為阻塞),要遇到release()才會把它喚醒。

 

 

把函數改成:
def test(name):
if sem.acquire():
print(name,'開始')
if name=='obj1':
time.sleep(1)
sem.release(1)

運行結果:

 

 

 release(1),釋放一個信號量,並將計數器值加1,使得obj3能輸出。

 

 

 

class threading.BoundedSemaphore(value=1)

該類實現有界信號量。有界信號量通過檢查以確保它當前的值不會超過初始值。如果超過了初始值,將會引發 ValueError 異常。在大多情況下,信號量用於保護數量有限的資源。如果信號量被釋放的次數過多,則表明出現了錯誤。沒有指定時, value 的值默認為1。

 

信號量通常用於保護數量有限的資源,例如數據庫服務器。在資源數量固定的任何情況下,都應該使用有界信號量。在生成任何工作線程前,應該在主線程中初始化信號量。

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

工作線程生成后,當需要連接服務器時,這些線程將調用信號量的 acquire 和 release 方法:

 

with pool_sema:
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()
使用有界信號量能減少這種編程錯誤:信號量的釋放次數多於其請求次數。

# 事件對象
ev = threading.Event()

print('is_set()再set()之前的值:',ev.is_set())
ev.set()
print('is_set()再set()之后的值:',ev.is_set())
ev.clear()
print('is_set()再clear()之后的值:',ev.is_set())

運行結果:

 

 

import threading


def test(name):
    ev.wait()
    print(name,'開始')
    # ev.clear()


# 事件對象,默認內部標志為False
ev = threading.Event()

ev.set()

# 線程對象
obj1=threading.Thread(target=test,args=('obj1',))
obj1.start()


obj2=threading.Thread(target=test,args=('obj2',))
obj2.start()
運行結果:

 

 
         

 改一下函數:

def test(name):
ev.wait()
print(name,'開始')
# 把內部標志變成False
ev.clear()

運行結果:

 

 導致obj2會一直阻塞,卡死

 

 



定時器對象

此類表示一個操作應該在等待一定的時間之后運行 --- 相當於一個定時器。與線程一樣,通過調用 start() 方法啟動定時器。而 cancel() 方法可以停止計時器(在計時結束前), 定時器在執行其操作之前等待的時間間隔可能與用戶指定的時間間隔不完全相同。

 
        
import threading
import time

def test(name):
    print(name,'開始')



# 時間對象,3秒之后才會執行函數test()
obj1 = threading.Timer(interval=3,function=test,args=('obj1',))
obj1.start()

如果在三秒之前加入,例如:
time.sleep(2)
obj1.cancel()
就不會執行函數test()

 



柵欄對象

柵欄類提供一個簡單的同步原語,用於應對固定數量的線程需要彼此相互等待的情況。線程調用wait()方法后將阻塞,直到所有線程都調用了wait() 方法。此時所有線程將被同時釋放。

柵欄對象可以被多次使用,但進程的數量不能改變。

 

class threading.Barrier(partiesaction=Nonetimeout=None)

創建一個需要 parties 個線程的柵欄對象。如果提供了可調用的 action 參數,它會在所有線程被釋放時在其中一個線程中自動調用。 timeout 是默認的超時時間,如果沒有在 wait() 方法中指定超時時間的話。

 

wait (timeout=None)

沖出柵欄。當柵欄中所有線程都已經調用了這個函數,它們將同時被釋放。如果提供了 timeout 參數,這里的 timeout 參數優先於創建柵欄對象時提供的 timeout 參數。

函數返回值是一個整數,取值范圍在0到 parties -- 1,在每個線程中的返回值不相同。可用於從所有線程中選擇唯一的一個線程執行一些特別的工作。例如:

i = barrier.wait() if i == 0: # Only one thread needs to print this print("passed the barrier") 

如果創建柵欄對象時在構造函數中提供了 action 參數,它將在其中一個線程釋放前被調用。如果此調用引發了異常,柵欄對象將進入損壞態。

如果發生了超時,柵欄對象將進入破損態。

如果柵欄對象進入破損態,或重置柵欄時仍有線程等待釋放,將會引發 BrokeBarrierError 異常。

reset ()

重置柵欄為默認的初始態。如果柵欄中仍有線程等待釋放,這些線程將會收到BrokeBarrierError異常。

請注意使用此函數時,如果存在狀態未知的其他線程,則可能需要執行外部同步。 如果柵欄已損壞則最好將其廢棄並新建一個。

abort ()

使柵欄處於損壞狀態。 這將導致任何現有和未來對wait()  的調用失敗並引發 BrokeBarrierError。 例如可以在需要中止某個線程時使用此方法,以避免應用程序的死鎖。

更好的方式是:創建柵欄時提供一個合理的超時時間,來自動避免某個線程出錯。

parties

沖出柵欄所需要的線程數量。

n_waiting

當前時刻正在柵欄中阻塞的線程數量。

broken

一個布爾值,值為 True 表明柵欄為破損態。

異常類,是 RuntimeError 異常的子類,在 Barrier 對象重置時仍有線程阻塞時和對象進入破損態時被引發。


代碼:
# 柵欄函數
def Ba_test():
    print('柵欄對象','開始運行')

# 柵欄對象
bar = threading.Barrier(parties=4, action=Ba_test(), timeout=None)

# 一個布爾值,值為 True 表明柵欄為破損態。
print('初始化的柵欄破損態: ',bar.broken)

# 沖出柵欄所需要的線程數量。
print('初始化沖出柵欄所需要的線程數量: ',bar.parties)

# 使柵欄處於損壞狀態。這將導致任何現有和未來對 wait() 的調用失敗
bar.abort()
print('abort后的的柵欄破損態: ',bar.broken)
print('abort后的沖出柵欄所需要的線程數量: ',bar.parties)

# 重置柵欄為默認的初始態.
bar.reset()
print('reset后的柵欄破損態: ',bar.broken)
print('reset后的沖出柵欄所需要的線程數量: ',bar.parties)

運行結果:

 

# 柵欄函數
def Ba_test():
    print('柵欄對象','開始運行')

# 線程函數
def th_test(name):
    bar.wait()
    print(name,'開始')

# 柵欄對象,4個
bar = threading.Barrier(parties=4, action=Ba_test(), timeout=None)


# 四個線程對象都調用wait(),阻塞結束,少一個的話,會阻塞不運行
obj1 = threading.Thread(target=th_test,args=('obj1',))
obj2 = threading.Thread(target=th_test,args=('obj2',))
obj3 = threading.Thread(target=th_test,args=('obj3',))
obj4 = threading.Thread(target=th_test,args=('obj4',))

obj1.start()
obj2.start()
obj3.start()
obj4.start()

運行結果:

 







免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM