[b0038] python 歸納 (二三)_多進程數據共享和同步_隊列Queue


1  隊列讀寫

# -*- coding: utf-8 -*-
"""
多進程  共享  隊列 multiprocessing.Process
邏輯:
   一個進程往隊列寫數據,一個進程從讀寫讀數據
   寫進程完了后,主進程強行結束讀進程

使用:
    1. 創建隊列 q = multiprocessing.Queue() ,默認無限大小,可以指定大小
    2. 把隊列 q 當參數傳給 子進程 執行代碼, 猜測應該不能通過全局變量的方式訪問
    3. 在子進程中讀寫隊列數據   q.put(<data>)  q.get()

參考:
方法
'cancel_join_thread', 'close', 'empty', 'full', 'get', 'get_nowait', 'join_thread', 'put', 'put_nowait', 'qsize'

"""

from multiprocessing import Queue, Process
import time

def write(q):
    for i in ['a','b','c','d']:
        time.sleep(2)
        q.put(i)
        print ('put {0} to queue'.format(i))

def read(q):
    while 1:
        time.sleep(2)
        result = q.get()
        print ("get {0} from queue".format(result))

def main():
    q = Queue()

    pw = Process(target=write, args=(q,))
    pr = Process(target=read,  args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()  # 強行終止讀進程

if __name__ == '__main__':
    main()

"""
Out:

put a to queue
get a from queue
put b to queue
get b from queue
put c to queue
get c from queue
put d to queue
get d from queue
"""

 

2 隊列實現生產者、消費者

# -*- coding: utf-8 -*-
"""
多進程 生產者 消費者模型,使用隊列實現 multiprocessing.Queue

邏輯:
    1個生產者,1個消費者在2個不同的進程中操作同一個隊列
    生產者的速度是消費者的3倍
"""
import multiprocessing
import random
import time

# 生產者
class producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)  # 父類構造
        self.queue = queue

    def run(self):
        for i in range(10):
            item = random.randint(0, 256)

            #  往隊列寫數據
            self.queue.put(item)

            print("Process Producer: item %d appended to queue %s " \
                  %(item, self.name))
            time.sleep(1)
            print("The size of queue is %s" \
                  % self.queue.qsize())


# 消費者
class consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)  # 父類構造
        self.queue = queue

    def run(self):
        while True:
            if (self.queue.empty()):
                print("the queue is empty")
                break
            else:
                time.sleep(2)

                # 從隊列讀取數據,隊列為空會阻塞,這做了非空判斷,只有一個進程讀,不會阻塞
                item = self.queue.get()

                print("Process Consumer: item %d poped from by %s " \
                      % (item, self.name))
                time.sleep(1)


if __name__ == '__main__':
    #  多進程共享對列
    queue = multiprocessing.Queue()

    ## 啟動生產者、消費者
    process_producer = producer(queue)
    process_consumer = consumer(queue)
    process_producer.start()
    process_consumer.start()
    process_producer.join()
    process_consumer.join()

"""
Out:
the queue is empty
Process Producer: item 225 appended to queue producer-1
The size of queue is 1
Process Producer: item 101 appended to queue producer-1
The size of queue is 2
Process Producer: item 50 appended to queue producer-1
The size of queue is 3
Process Producer: item 217 appended to queue producer-1
The size of queue is 4
Process Producer: item 75 appended to queue producer-1
The size of queue is 5
Process Producer: item 45 appended to queue producer-1
The size of queue is 6
Process Producer: item 19 appended to queue producer-1
The size of queue is 7
Process Producer: item 157 appended to queue producer-1
The size of queue is 8
Process Producer: item 127 appended to queue producer-1
The size of queue is 9
Process Producer: item 223 appended to queue producer-1
The size of queue is 10
"""

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM