https://docs.python.org/zh-cn/3/library/multiprocessing.html
1共享內存基本例程
可以使用
Value
或Array
將數據存儲在共享內存映射中。例如,以下代碼:
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr))p.deamon
=
True
#伴隨主進程關閉而關閉
p.start() print(num.value) print(arr[:])2-1不同種類數據共享例程
https://blog.csdn.net/lechunluo3/article/details/79005910
Manager管理的共享數據類型有:Value、Array、dict、list、Lock、Semaphore等等,同時Manager還可以共享類的實例對象。
實例代碼:from multiprocessing import Process,Manager def func1(shareList,shareValue,shareDict,lock): with lock: shareValue.value+=1 shareDict[1]='1' shareDict[2]='2' for i in xrange(len(shareList)): shareList[i]+=1 if __name__ == '__main__': manager=Manager() list1=manager.list([1,2,3,4,5]) dict1=manager.dict()#存str類型數據 array1=manager.Array('i',range(10)) value1=manager.Value('i',1) lock=manager.Lock() proc=[Process(target=func1,args=(list1,value1,dict1,lock)) for i in xrange(20)] for p in proc: p.start() for p in proc: p.join() print list1 print dict1 print array1 print value1結果
[21, 22, 23, 24, 25] {1: '1', 2: '2'} array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) Value('i', 21)2-2關於str類型數組共享內存的使用
from multiprocessing import Process,Manager #1初始化共享內存 manager=Manager() str_msg=manager.dict()#存str類型數據 str_msg[1]='0' #用幾個必須預先初始化 否則后面無法訪問 str_msg[2]='0' #用幾個必須預先初始化 否則后面無法訪問 #2線程鎖 保護多個線成對數據控制 lock=manager.Lock() #3要執行的函數 def func1(shareDict,lock): with lock: shareDict[1]=str(2+int(shareDict[1])) shareDict[2]=str(3+int(shareDict[2])) #4開啟次線程 使用的函數+輸入共享數據(str)+線程鎖 proc=Process(target=func1,args=(str_msg,lock)) proc.deamon=True #伴隨主進程關閉 proc.start() #開始 proc.join() #加入線程池 #5主線程開啟 while 1: print(str_msg[1]) print(str_msg[2])
3-1讀兩個串口,數據給共享內存
普通的讀取(學習用實際不用,用類方便)
# -*- coding: utf-8 -* import serial import time from multiprocessing import Process, Value, Array #讀取溫度和濕度 def serial_th(num,arr): ser = serial.Serial('/dev/ttyUSB1', 115200) if ser.isOpen == False: ser.open() # 打開串口 #ser.write(b"Raspberry pi is ready") try: while True: line = str(ser.readline()) fengefu='-' a=line.strip().split(fengefu) # x.strip()#除去每行的換行符 按照:分割 tv = "".join(a[1:2] ).strip() # 去除空格 hv = "".join(a[3:4]).strip() # 去除空格 arr[0]=int(tv) arr[1]=int(hv) #print('t-'+str(arr[0])+"-h-"+str(arr[1])) #time.sleep(0.1) # 軟件延時 except KeyboardInterrupt: ser.close() #讀取溫度和濕度 def serial_lmq29(num,arr): ser = serial.Serial('/dev/ttyUSB0', 115200) if ser.isOpen == False: ser.open() # 打開串口 #ser.write(b"Raspberry pi is ready") try: while True: line = str(ser.readline()) fengefu='-' a=line.strip().split(fengefu) # x.strip()#除去每行的換行符 按照:分割 mq2 = "".join(a[1:2] ).strip() # 去除空格 light = "".join(a[3:4]).strip() # 去除空格 mq9 = "".join(a[5:6]).strip() # 去除空格 #print(mq9) arr[2]=int(mq2) arr[3]=int(light) arr[4]=int(mq9) #print('mq2-'+ str(arr[2]) +'-lihgt-'+str(arr[3])+'-mq9-'+str(arr[4])) #time.sleep(0.1) # 軟件延時 except KeyboardInterrupt: ser.close() num_share = Value('d', 0.0) arr_share = Array('i', range(5)) p_wh = Process(target=serial_th, args=(num_share,arr_share)) p_wh.deamon=True #伴隨主進程關閉而關閉 p_wh.start() p_l29 = Process(target=serial_lmq29, args=(num_share,arr_share)) p_l29.deamon=True p_l29.start() while 1: # 打印共享內存數據 print(arr_share[:])3-2讀兩個串口,數據給共享內存,類的使用
# -*- coding: utf-8 -* import serial import time from multiprocessing import Process, Value, Array class Class_sensor: def __init__(self): pass #讀取溫度和濕度 def serial_th(self,num,arr): ser = serial.Serial('/dev/ttyUSB1', 115200) if ser.isOpen == False: ser.open() # 打開串口 #ser.write(b"Raspberry pi is ready") try: while True: line = str(ser.readline()) fengefu='-' a=line.strip().split(fengefu) # x.strip()#除去每行的換行符 按照:分割 tv = "".join(a[1:2] ).strip() # 去除空格 hv = "".join(a[3:4]).strip() # 去除空格 arr[0]=int(tv) arr[1]=int(hv) #print('t-'+str(arr[0])+"-h-"+str(arr[1])) #time.sleep(0.1) # 軟件延時 except KeyboardInterrupt: ser.close() #讀取溫度和濕度 def serial_lmq29(self,num,arr): ser = serial.Serial('/dev/ttyUSB0', 115200) if ser.isOpen == False: ser.open() # 打開串口 #ser.write(b"Raspberry pi is ready") try: while True: line = str(ser.readline()) fengefu='-' a=line.strip().split(fengefu) # x.strip()#除去每行的換行符 按照:分割 mq2 = "".join(a[1:2] ).strip() # 去除空格 light = "".join(a[3:4]).strip() # 去除空格 mq9 = "".join(a[5:6]).strip() # 去除空格 #print(mq9) arr[2]=int(mq2) arr[3]=int(light) arr[4]=int(mq9) #print('mq2-'+ str(arr[2]) +'-lihgt-'+str(arr[3])+'-mq9-'+str(arr[4])) #time.sleep(0.1) # 軟件延時 except KeyboardInterrupt: ser.close() def class_int(self): self.num_share = Value('d', 0.0) self.arr_share = Array('i', range(5)) p_wh = Process(target=self.serial_th, args=(self.num_share,self.arr_share)) p_wh.deamon=True #伴隨主進程關閉而關閉 p_wh.start() p_l29 = Process(target=self.serial_lmq29, args=(self.num_share,self.arr_share)) p_l29.deamon=True p_l29.start() t = Class_sensor()#類的初始化 t.class_int()#串口初始化 while 1: # 打印共享內存數據 print(t.arr_share[:])
3-3字符類型的共享內存串口解析獲取
''' 函數作用: 1開啟其一個進程 2開啟一個串口 3串口數據解析 4開辟共享內存 str 接收溫度 濕度 mq2數據 (字符類型) 5主進程創建類調用這些數據 ''' # -*- coding: utf-8 -* import serial import time from multiprocessing import Process,Manager class Class_sensor: def __init__(self): pass #讀取溫度和濕度和MQ2煙霧火焰 def serial_wsmq2(self,arr,clock): ser = serial.Serial('/dev/ttyUSB0', 9600) if ser.isOpen == False: ser.open() # 打開串口 #ser.write(b"Raspberry pi is ready") try: while True: line = str(ser.readline()) fengefu='-' a=line.strip().split(fengefu) # x.strip()#除去每行的換行符 按照:分割 wendu = "".join(a[1:2] ).strip() # 去除空格 shidu = "".join(a[3:4]).strip() # 去除空格 mq2 = "".join(a[5:6]).strip() # 去除空格 #print(mq9) with clock: arr[1]=str(wendu) arr[2]=str(shidu) arr[3]=str(mq2) print('溫度-'+ str(arr[1]) +'-濕度-'+str(arr[2])+'-火焰煙霧-'+str(arr[3])) #time.sleep(0.1) # 軟件延時 except KeyboardInterrupt: ser.close() def class_int(self): #1初始化共享內存 self.manager=Manager() self.str_msg=self.manager.dict()#存str類型數據 self.str_msg[1]='0' #用幾個必須預先初始化 否則后面無法訪問 self.str_msg[2]='0' #用幾個必須預先初始化 否則后面無法訪問 self.str_msg[3]='0' #2線程鎖 保護多個線成對數據控制 self.lock=self.manager.Lock() p = Process(target=self.serial_wsmq2, args=(self.str_msg,self.lock)) p.deamon=True #伴隨主進程關閉而關閉 p.start() #共享內存測試 t = Class_sensor()#類的初始化 t.class_int()#串口初始化 print(t.str_msg[1]) #調用數據 print(t.str_msg[2]) #調用數據 print(t.str_msg[3]) #調用數據
3-4 python多進程共享變量,附共享圖像內存實例
https://www.cnblogs.com/banrixianxin/p/6781162.html
import numpy as np import cv2, multiprocessing, sharedmem def show_image(image_in): while 1: cv2.imshow("avi",image_in) cv2.waitKey(1) def aa(images): while 1: for i in range(20): cv2.imshow("a",images[i]) cv2.waitKey(1) if __name__ == '__main__': cap = cv2.VideoCapture('1.avi') assert cap.isOpened(), 'Cannot capture source' _, image = cap.read() shape = np.shape(image) dtype = image.dtype images = multiprocessing.Manager().dict() image_in = sharedmem.empty(shape, dtype) image_in[:] = image.copy() a = multiprocessing.Process(target=show_image,args=(image_in,)) a.start() count = 0 for i in range(20): ret, image = cap.read() if not ret: break image_in[:] = image.copy() images[count] = image_in count += 1 multiprocessing.Process(target=aa, args=(images,)).start() aa.join()
共享內存傳圖
主進程采集圖像
次進程1顯示圖像
次進程2顯示圖像
#!/usr/bin/env python # -*- coding=utf-8 -*- import cv2 import zmq import base64 from multiprocessing import Process,Manager class Class_camtcp: def __init__(self): pass def Get_vedio(self,images,clock): #初期化USB攝像頭 #兩路不卡最大分辨率 #w=600 #h=450 #四路不卡最大分辨率 w=480 h=320 fps=25 self.cap = cv2.VideoCapture(self.VedioId) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, w) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h) self.cap.set(cv2.CAP_PROP_FPS, fps) # cv2.namedWindow(str(self.VedioId),0); #初始化傳輸 self.contest = zmq.Context() """zmq對象使用TCP通訊協議""" self.footage_socket = self.contest.socket(zmq.PAIR) """zmq對象和視頻接收端建立TCP通訊協議""" self.vedio_url='tcp://%s:'%self.IP+str(self.Port) self.footage_socket.connect(self.vedio_url) images[1]=1 while( self.cap.isOpened()): #USB攝像頭工作時,讀取一幀圖像 ret, frame = self.cap.read() with clock: images[int(self.CamID)]=frame if self.VedioShow==1: #顯示圖像窗口在樹莓派的屏幕上 cv2.imshow(str(self.CamID),frame) #按下q鍵退出 key = cv2.waitKey(1) if key & 0x00FF == ord('q'): images[0+int(self.CamNum)]=0 images[1+int(self.CamNum)]=0 images[2+int(self.CamNum)]=0 images[3+int(self.CamNum)]=0 break if images[int(self.CamNum)+int(self.CamID)]==0: break encoded, buffer = cv2.imencode('.jpg', frame) #把轉換后的圖像數據再次轉換成流數據, jpg_as_test = base64.b64encode(buffer) #把內存中的圖像流數據進行base64編碼 self.footage_socket.send(jpg_as_test) #把編碼后的流數據發送給視頻的接收端 # 釋放資源和關閉窗口 self.cap.release() cv2.destroyAllWindows() def class_int(self,IP,Port,VedioId,show,ShareImages,CamID): self.CamNum=4#總相機數目 # 獲取基本信息 self.IP = IP #視頻接受端的IP地址 self.Port=Port #視頻接受端的IP端口 5555 self.VedioId=VedioId #打開的系統設備管理默認分配的視頻編號 0 2 4 6 設備號默認的 實際需要查看lsusb self.VedioShow=show #是否顯示圖像窗口調試 實際應取消減少運算浪費 #ShareImages 共享內存圖像用於獲取該路圖像 self.CamID=CamID#相機編號 人為定義 msg="IP:"+str(self.IP )+" Port:"+str(self.Port)+" CamId:"+str(self.VedioId )+" CamShow:"+str(self.VedioShow) print(msg ) #創建共享內存變量 self.lock = Manager().Lock()#創建共享內存容器 #self.ShareImages=Manager().dict()#存str類型數據 #創建線程 p = Process(target=self.Get_vedio, args=(ShareImages,self.lock)) p.deamon=True #伴隨主進程關閉而關閉 p.start() pass Sever_Ip= '192.168.137.1' CamNum=4#總相機數目 showimg=0#是否打開每路畫面查看調試 ShareImages=Manager().dict()#存str類型數據 img = cv2.imread("timg.jpg", 1) ShareImages[0]=img #初始化共享內存圖像 ShareImages[0+CamNum]=1#用於控制相機是否開啟 ShareImages[1]=img#初始化共享內存圖像 ShareImages[1+CamNum]=1#用於控制相機是否開啟 ShareImages[2]=img#初始化共享內存圖像 ShareImages[2+CamNum]=1#用於控制相機是否開啟 ShareImages[3]=img#初始化共享內存圖像 ShareImages[3+CamNum]=1#用於控制相機是否開啟 t0 = Class_camtcp()#類的初始化 t0.class_int(Sever_Ip,5559,0,showimg,ShareImages,0)#串口初始化 t1 = Class_camtcp()#類的初始化 t1.class_int(Sever_Ip,5556,2,showimg,ShareImages,1)#串口初始化 t2 = Class_camtcp()#類的初始化 t2.class_int(Sever_Ip,5557,4,showimg,ShareImages,2)#串口初始化 t3 = Class_camtcp()#類的初始化 t3.class_int(Sever_Ip,5558,6,showimg,ShareImages,3)#串口初始化 while True: #如果圖片為空 賦予默認圖像 if ShareImages[0] is None: ShareImages[0]=img if ShareImages[1] is None: ShareImages[1]=img if ShareImages[1] is None: ShareImages[1]=img if ShareImages[1] is None: ShareImages[1]=img cv2.imshow('s', ShareImages[0]) #默認把圖像1顯示在窗口中,簡單判斷圖像是否開啟 key=cv2.waitKey(1) #延時等待,防止出現窗口無響應 if key & 0x00FF == ord('q'):#按q退出關閉所有相機畫面 ShareImages[0+CamNum]=0 ShareImages[1+CamNum]=0 ShareImages[2+CamNum]=0 ShareImages[3+CamNum]=0 break cv2.destroyAllWindows()