Python並發編程之線程消息通信機制任務協調(四)


大家好,並發編程 進入第四篇。

本文目錄


  • 前言
  • Event事件
  • Condition
  • Queue隊列
  • 總結

. 前言

前面我已經向大家介紹了,如何使用創建線程,啟動線程。相信大家都會有這樣一個想法,線程無非就是創建一下,然后再start()下,實在是太簡單了。

可是要知道,在真實的項目中,實際場景可要我們舉的例子要復雜的多得多,不同線程的執行可能是有順序的,或者說他們的執行是有條件的,是要受控制的。如果僅僅依靠前面學的那點淺薄的知識,是遠遠不夠的。

那今天,我們就來探討一下如何控制線程的觸發執行。

要實現對多個線程進行控制,其實本質上就是消息通信機制在起作用,利用這個機制發送指令,告訴線程,什么時候可以執行,什么時候不可以執行,執行什么內容。

經過我的總結,線程中通信方法大致有如下三種:

  • threading.Event
  • threading.Condition
  • queue.Queue

先拋出結論,接下來我們來一一探討下。


. Event事件

Python提供了非常簡單的通信機制 Threading.Event,通用的條件變量。多個線程可以等待某個事件的發生,在事件發生后,所有的線程都會被激活

關於Event的使用也超級簡單,就三個函數

event = threading.Event()

# 重置event,使得所有該event事件都處於待命狀態
event.clear()

# 等待接收event的指令,決定是否阻塞程序執行
event.wait()

# 發送event指令,使所有設置該event事件的線程執行
event.set()

舉個例子來看下。

import time
import threading


class MyThread(threading.Thread):
def __init__(self, name, event):
super().__init__()
self.name = name
self.event = event

def run(self):
print('Thread: {} start at {}'.format(self.name, time.ctime(time.time())))
# 等待event.set()后,才能往下執行
self.event.wait()
print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time())))


threads = []
event = threading.Event()

# 定義五個線程
[threads.append(MyThread(str(i), event)) for i in range(1,5)]

# 重置event,使得event.wait()起到阻塞作用
event.clear()

# 啟動所有線程
[t.start() for t in threads]

print('等待5s...')
time.sleep(5)

print('喚醒所有線程...')
event.set()

執行一下,看看結果

Thread: 1 start at Sun May 13 20:38:08 2018
Thread: 2 start at Sun May 13 20:38:08 2018
Thread: 3 start at Sun May 13 20:38:08 2018
Thread: 4 start at Sun May 13 20:38:08 2018

等待5s...

喚醒所有線程...
Thread: 1 finish at Sun May 13 20:38:13 2018
Thread: 4 finish at Sun May 13 20:38:13 2018
Thread: 2 finish at Sun May 13 20:38:13 2018
Thread: 3 finish at Sun May 13 20:38:13 2018

可見在所有線程都啟動(start())后,並不會執行完,而是都在self.event.wait()止住了,需要我們通過event.set()來給所有線程發送執行指令才能往下執行。

. Condition

Condition和Event 是類似的,並沒有多大區別。

同樣,Condition也只需要掌握幾個函數即可。

cond = threading.Condition()

# 類似lock.acquire()
cond.acquire()

# 類似lock.release()
cond.release()

# 等待指定觸發,同時會釋放對鎖的獲取,直到被notify才重新占有瑣。
cond.wait()

# 發送指定,觸發執行
cond.notify()

舉個網上一個比較趣的捉迷藏的例子來看看

import threading, time

class Hider(threading.Thread):
def __init__(self, cond, name):
super(Hider, self).__init__()
self.cond = cond
self.name = name

def run(self):
time.sleep(1) #確保先運行Seeker中的方法
self.cond.acquire()

print(self.name + ': 我已經把眼睛蒙上了')
self.cond.notify()
self.cond.wait()
print(self.name + ': 我找到你了哦 ~_~')
self.cond.notify()

self.cond.release()
print(self.name + ': 我贏了')

class Seeker(threading.Thread):
def __init__(self, cond, name):
super(Seeker, self).__init__()
self.cond = cond
self.name = name

def run(self):
self.cond.acquire()
self.cond.wait()
print(self.name + ': 我已經藏好了,你快來找我吧')
self.cond.notify()
self.cond.wait()
self.cond.release()
print(self.name + ': 被你找到了,哎~~~')

cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()

通過cond來通信,阻塞自己,並使對方執行。從而,達到有順序的執行。
看下結果

hider:   我已經把眼睛蒙上了
seeker: 我已經藏好了,你快來找我吧
hider: 我找到你了 ~_~
hider: 我贏了
seeker: 被你找到了,哎~~~

. Queue隊列

終於到了我們今天的主角了。

從一個線程向另一個線程發送數據最安全的方式可能就是使用 queue 庫中的隊列了。創建一個被多個線程共享的 Queue 對象,這些線程通過使用put()get() 操作來向隊列中添加或者刪除元素。

同樣,對於Queue,我們也只需要掌握幾個函數即可。

from queue import Queue
# maxsize默認為0,不受限
# 一旦>0,而消息數又達到限制,q.put()也將阻塞
q = Queue(maxsize=0)

# 阻塞程序,等待隊列消息。
q.get()

# 獲取消息,設置超時時間
q.get(timeout=5.0)

# 發送消息
q.put()

# 等待所有的消息都被消費完
q.join()

# 以下三個方法,知道就好,代碼中不要使用

# 查詢當前隊列的消息個數
q.qsize()

# 隊列消息是否都被消費完,True/False
q.empty()

# 檢測隊列里消息是否已滿
q.full()

函數會比之前的多一些,同時也從另一方面說明了其功能更加豐富。

我來舉個老師點名的例子。

from queue import Queue
from threading import Thread
import time

class Student(Thread):
def __init__(self, name, queue):
super().__init__()
self.name = name
self.queue = queue

def run(self):
while True:
# 阻塞程序,時刻監聽老師,接收消息
msg = self.queue.get()
# 一旦發現點到自己名字,就趕緊答到
if msg == self.name:
print("{}:到!".format(self.name))


class Teacher:
def __init__(self, queue):
self.queue=queue

def call(self, student_name):
print("老師:{}來了沒?".format(student_name))
# 發送消息,要點誰的名
self.queue.put(student_name)


queue = Queue()
teacher = Teacher(queue=queue)
s1 = Student(name="小明", queue=queue)
s2 = Student(name="小亮", queue=queue)
s1.start()
s2.start()

print('開始點名~')
teacher.call('小明')
time.sleep(1)
teacher.call('小亮')

運行結果如下

開始點名~
老師:小明來了沒?
小明:到!
老師:小亮來了沒?
小亮:到!

. 總結

學習了以上三種通信方法,我們很容易就能發現EventCondition 是threading模塊原生提供的模塊,原理簡單,功能單一,它能發送 TrueFalse 的指令,所以只能適用於某些簡單的場景中。

Queue則是比較高級的模塊,它可能發送任何類型的消息,包括字符串、字典等。其內部實現其實也引用了Condition模塊(譬如putget函數的阻塞),正是其對Condition進行了功能擴展,所以功能更加豐富,更能滿足實際應用。



 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM