Python並發編程—進程間通信


進程間通信(IPC)

1.必要性: 進程間空間獨立,資源不共享,此時在需要進程間數據傳輸時就需要特定的手段進行數據通信。

2.常用進程間通信方法:管道通信、消息隊列、共享內存、信號量

管道通信(Pipe)

1.通信原理:在內存中開辟管道空間,生成管道操作對象,多個進程使用同一個管道對象進行讀寫即可實現通信

2.實現方法

from multiprocessing import Pipe

fd1,fd2 = Pipe(duplex = True)

  • 功能: 創建管道
  • 參數:默認表示雙向管道
  • 如果為False 表示單向管道
  • 返回值:表示管道兩端的讀寫對象
  •   如果是雙向管道均可讀寫
  •   如果是單向管道fd1只讀 fd2只寫

fd.recv()

  • 功能 : 從管道獲取內容
  • 返回值:獲取到的數據

fd.send(data)

  • 功能: 向管道寫入內容
  • 參數: 要寫入的數據
 1 from multiprocessing import Process,Pipe
 2 import os,time
 3 
 4 # 創建管道對象
 5 # False fd1只能recv fd2只能send
 6 fd1,fd2 = Pipe(False)
 7 
 8 def read():
 9   while True:
10     data = fd1.recv()  # 從管道獲取消息
11     print(data)
12 
13 def write():
14   while True:
15     time.sleep(2)
16     fd2.send({'name':'Lily'}) # 向管道發送內容
17 
18 r = Process(target=read)
19 w = Process(target=write)
20 r.start()
21 w.start()
22 r.join()
23 w.join()
管道通信示例

消息隊列

1.通信原理:在內存中建立隊列模型,進程通過隊列將消息存入,或者從隊列取出完成進程間通信。

2.實現方法

from multiprocessing import Queue

q = Queue(maxsize=0)

  • 功能: 創建隊列對象
  • 參數:最多存放消息個數
  • 返回值:隊列對象

q.put(data,[block,timeout])

  • 功能:向隊列存入消息
  • 參數:data 要存入的內容
  • block 設置是否阻塞 False為非阻塞
  • timeout 超時檢測

q.get([block,timeout])

  • 功能:從隊列取出消息
  • 參數:block 設置是否阻塞 False為非阻塞
  • timeout 超時檢測
  • 返回值: 返回獲取到的內容

q.full() 判斷隊列是否為滿

q.empty() 判斷隊列是否為空

q.qsize() 獲取隊列中消息個數

q.close() 關閉隊列

 1 from multiprocessing import Process, Queue
 2 from time import sleep
 3 from random import randint
 4 
 5 #  創建消息隊列
 6 q = Queue(3)
 7 
 8 
 9 # 請求進程
10 def request():
11   for i in range(10):
12     x = randint(0, 100)
13     y = randint(0, 100)
14     q.put((x, y))
15 
16 
17 # 處理進程
18 def handle():
19   while True:
20     sleep(1)
21     try:
22       x, y = q.get(timeout=5)
23     except:
24       break
25     else:
26       print("%d + %d = %d" % (x, y, x + y))
27 
28 
29 p1 = Process(target=request)
30 p2 = Process(target=handle)
31 p1.start()
32 p2.start()
33 p1.join()
34 p2.join()
消息隊列演示

共享內存

1.通信原理:在內中開辟一塊空間,進程可以寫入內容和讀取內容完成通信,但是每次寫入內容會覆蓋之前內容。

2.實現方法

from multiprocessing import Value,Array

obj = Value(ctype,data)

  • 功能 : 開辟共享內存
  • 參數 : ctype 表示共享內存空間類型 'i' 'f' 'c'
  •      data 共享內存空間初始數據
  • 返回值:共享內存對象

obj.value

  • 對該屬性的修改查看即對共享內存讀寫

obj = Array(ctype,data)

  • 功能: 開辟共享內存空間
  • 參數: ctype 表示共享內存數據類型
  •     data 整數則表示開辟空間的大小,其他數據類型表示開辟空間存放的初始化數據
  • 返回值:共享內存對象

Array共享內存讀寫: 通過遍歷obj可以得到每個值,直接可以通過索引序號修改任意值。

* 可以使用obj.value直接打印共享內存中的字節串
 1 from multiprocessing import Process,Value
 2 import time
 3 from random import randint
 4 
 5 # 創建共享內存
 6 money = Value('i',5000)
 7 
 8 #  修改共享內存
 9 def man():
10   for i in range(30):
11     time.sleep(0.2)
12     money.value += randint(1,1000)
13 
14 def girl():
15   for i in range(30):
16     time.sleep(0.15)
17     money.value -= randint(100,800)
18 
19 m = Process(target = man)
20 g = Process(target = girl)
21 m.start()
22 g.start()
23 m.join()
24 g.join()
25 
26 # 獲取共享內存值
27 print("一月余額:",money.value)
value
 1 from multiprocessing import Process,Array
 2 
 3 # 創建共享內存
 4 # shm = Array('i',[1,2,3])
 5 # shm = Array('i',3)  # 表示開辟三個空間的列表
 6 shm = Array('c',b"hello") #字節串
 7 
 8 def fun():
 9   # 共享內存對象可迭代
10   for i in shm:
11     print(i)
12   shm[0] = b'H'
13 
14 p = Process(target = fun)
15 p.start()
16 p.join()
17 
18 for i in shm:
19   print(i)
20 
21 print(shm.value) # 打印字節串
array

信號量(信號燈集)

1.通信原理:給定一個數量對多個進程可見。多個進程都可以操作該數量增減,並根據數量值決定自己的行為。

2.實現方法

from multiprocessing import Semaphore

sem = Semaphore(num)

  • 功能 : 創建信號量對象
  • 參數 : 信號量的初始值
  • 返回值 : 信號量對象

sem.acquire() 將信號量減1 當信號量為0時阻塞

sem.release() 將信號量加1

sem.get_value()獲取信號量數量

 1 from multiprocessing import Process,Semaphore
 2 from time import sleep
 3 import os
 4 
 5 # 創建信號量 最多允許三個任務同時執行
 6 sem = Semaphore(3)
 7 
 8 # 任務函數
 9 def handle():
10   sem.acquire()  # 想執行必須消耗一個信號量
11   print("%d 執行任務"%os.getpid())
12   sleep(2)
13   print("%d 執行任務完畢"%os.getpid())
14   sem.release() # 增加信號量
15 
16 # 10人想執行
17 for i in range(10):
18   p = Process(target=handle)
19   p.start()
sem

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM