問題描述:
生產者在生產產品,這些產品將提供給若干個消費者去消費,為了使生產者和消費者能並發執行,在兩者之間設置一個具有多個緩沖區的緩沖池,生產者將它生產的產品放入一個緩沖區中,消費者可以從緩沖區中取走產品進行消費,顯然生產者和消費者之間必須保持同步,即不允許消費者到一個空的緩沖區中取產品,也不允許生產者向一個已經放入產品的緩沖區中再次投放產品。
條件變量解決方案:
- 基於隊列構建一個緩沖區,生產者在隊尾填充,消費者在隊頭獲取。隊列緩沖區作為多個線程的共享資源。
- 由於多個消費者和生產者線程可以並發訪問緩沖區,需要互斥鎖來控制對緩沖區的互斥訪問。
- 隊列空時消費者線程需要等到隊列中存在資源、隊列滿時生產者線程需要等到隊列中有資源被消費。通過使用條件變量來實現線程的阻塞、通知以達到生產、消費線程的同步。
from threading import Lock
from threading import Condition
import threading
class myQueue:
def __init__(self, size):
self.size = size
self.list = list()
self.lock = Lock()
self.notFullCond = Condition(self.lock)
self.notEmptyCond = Condition(self.lock)
def isFull(self):
if self.size == len(self.list):
return True
return False
def isEmpty(self):
if 0 == len(self.list):
return True
return False
def enQueue(self, elem):
self.lock.acquire()
while self.isFull(): #隊列滿時觸發等待notFullCond條件,線程阻塞同時釋放互斥鎖
print('queue is full, waiting...')
self.notFullCond.wait()
print(threading.current_thread().getName() + ' product ' + str(elem))
self.list.append(elem)
#當有資源進入隊列通知所有等待notEmptyCond條件的線程,等釋放互斥鎖后,等待notEmptyCond條件的線程獲取鎖,再次判斷條件
self.notEmptyCond.notify_all()
self.lock.release()
def deQueue(self):
self.lock.acquire()
while self.isEmpty(): #隊列空時觸發等待notEmptyCond條件,線程阻塞同時釋放互斥鎖
print('queue is empty, waiting...')
self.notEmptyCond.wait()
elem = self.list[0]
del(self.list[0])
print(threading.current_thread().getName() + ' consume ' + str(elem))
#當有資源出隊列通知所有等待notFullCond條件的線程,等釋放互斥鎖后,等待notFullCond條件的線程獲取鎖,再次判斷條件
self.notFullCond.notify_all()
self.lock.release()
return elem
信號量解決方案:
- 通過信號量來控制線程的同步,信號量管理可以獲得資源的個數,初始隊列為空,寫信號量資源個數為隊列長度,讀信號量資源個數為0
from threading import Lock from threading import Semaphore import threading class mySemQueue: def __init__(self, size): self.size = size self.list = list() self.lock = Lock() self.writeSem = Semaphore(size)#初始化寫信號量 self.readSem = Semaphore(0) #初始化讀信號量 def enQueue(self, elem): self.writeSem.acquire() #資源入隊申請寫信號量,如果為0則阻塞 self.lock.acquire() #互斥鎖來保證資源的互斥訪問 self.list.append(elem) print(threading.current_thread().getName() + ' product ' + str(elem)) self.lock.release() self.readSem.release() #資源入隊后釋放一個讀信號量,如果其它線程阻塞在這個信號量上,喚醒該線程 def deQueue(self): self.readSem.acquire() #資源出隊申請讀信號量,如果為0則阻塞 self.lock.acquire() elem = self.list[0] del(self.list[0]) print(threading.current_thread().getName() + ' consume ' + str(elem)) self.lock.release() self.writeSem.release() #資源出隊后釋放一個寫信號量,如果其它線程阻塞在這個信號量上,喚醒該線程 return elem
- 測試
from threading import Thread import sys import threading class myThread(Thread): def __init__(self, func): Thread.__init__(self) self.func = func def run(self): print(threading.current_thread().getName() + ' start') self.func() from myThread import myThread from myQueue import myQueue import random import sys def producter(): while True: elem =random.randint(1, 100) que.enQueue(elem) def consumer(): while True: que.deQueue() fp = open('log.txt','w') sys.stdout = fp que = myQueue(10) t1 = myThread(producter) t2 = myThread(consumer) t3 = myThread(consumer) t1.start() t2.start() t3.start()