引用:https://zhuanlan.zhihu.com/p/32513483
共享 numpy 數組
需要用到 numpy 時往往是數據量較大的場景,如果直接復制會造成大量內存浪費。共享 numpy 數組則是通過上面一節的 Array 實現,再用 numpy.frombuffer 以及 reshape 對共享的內存封裝成 numpy 數組,代碼如下:
# encoding:utf8 import ctypes import os import multiprocessing import numpy as np NUM_PROCESS = multiprocessing.cpu_count() def worker(index): main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double) main_nparray = main_nparray.reshape(NUM_PROCESS, 10) pid = os.getpid() main_nparray[index, :] = pid return pid if __name__ == "__main__": shared_array_base = multiprocessing.Array( ctypes.c_double, NUM_PROCESS * 10, lock=False) pool = multiprocessing.Pool(processes=NUM_PROCESS) result = pool.map(worker, range(NUM_PROCESS)) main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double) main_nparray = main_nparray.reshape(NUM_PROCESS, 10) print( main_nparray )
運行結果:
===============================================================
多進程共享較大數據,如numpy數組的情況下我們需要使用multiprocessing下面的Value , Array從而實現多進程的共享,但是還有一個重要的問題就是數據的讀寫方式,由於CPython是在語言的數據結構上進行再次包裝的,所以對於數據的讀寫是需要進行翻譯的,也就是說對數據讀寫是需要對Python數據類型下對應的C類型的數據結構進行讀寫的,也正是因為這種數據讀寫方式所以對Python數據進行操作要比對C類型數據進行讀寫操作要慢上很多。
numpy數據的底層同樣也是C類型的數據結構,同時numpy下面的數據操作很多都是可以直接對numpy類型下的底層數據結構來操作的,這樣也就會省掉數據結構轉換的時間花銷,只要不把numpy數據轉為Python類型數據,都是可以在numpy下對底層數據進行直接操作的。
雖然mulprocessing模塊提供了共享數據類型,但是不同進程對共享數據的讀寫本身也會存在數據類型的轉換。
用更直接的話來說,雖然mutprocessing提供了共享數據類型Value和Array,但是不同進程其實也是無法直接對其進行操作的,子進程如果要讀取或寫入共享數據Value和Array就需要將共享數據轉為可以進行操作的Python數據類型或numpy數據類型,否則就難以直接對共享數據進行直接操作,這時候numpy.frombuffer函數就派上用場了,numpy.frombuffer函數可以直接讀取Python數據類型、numpy數據類型和共享數據類型的底層數據類型,即C數據類型,這樣的話使用numpy.frombuffer函數就會省去數據類型轉換這一環節。numpy.frombuffer可以直接讀取共享數據類型Value和Array,因為Value和Array的底層實現就是C數據類型。
下面給出幾種多進程共享數據的讀寫方式代碼,以驗證最佳的多進程大數據量數據的共享方式。
運行環境介紹:
軟件:Ubuntu18.04系統、python3.7.5
硬件:intel i7-8700 cpu,6物理核心12邏輯核心
1. 使用multiprocessing.Value / multiprocessing.Array + numpy.frombuffer方式:
(使用numpy.frombuffer 對數據的讀寫不需要類型的轉換可以直接對數據進行讀寫操作)

import ctypes import time import multiprocessing import numpy as np NUM_PROCESS = multiprocessing.cpu_count() size = 1000000 def worker(index): main_nparray = np.frombuffer(shared_array_base[index], dtype=ctypes.c_double) for i in range(10000): main_nparray[:] = index + i return index if __name__ == "__main__": shared_array_base = [] for _ in range(NUM_PROCESS): shared_array_base.append(multiprocessing.Array("d", size, lock=False)) pool = multiprocessing.Pool(processes=NUM_PROCESS) a = time.time() result = pool.map(worker, range(NUM_PROCESS)) b = time.time() print(b-a) #print(result) for i in range(NUM_PROCESS): main_nparray = np.frombuffer(shared_array_base[i], dtype=ctypes.c_double) print(main_nparray) print(type(main_nparray)) print(main_nparray.shape) # 73.216189146 # 73.2605750561 # 73.3307318687 # 73.4090409279 # 73.4219110012
運行時間:
73.216189146
73.2605750561
73.3307318687
73.4090409279
73.4219110012
運行過程中各進程CPU使用率情況:
可以看到在運行過程中12個子進程的使用率為100%,而主進程的使用率為0.3%,可以看到使用 multiprocessing.Value / multiprocessing.Array + numpy.frombuffer 的方式各子進程在讀寫共享空間內容基本是不需要太多等待的,可以保證子進程基本以全速進行運行,而且主進程基本不參與計算(cpu利用率:0.3%)。也就是說采用該種方式各子進程操作父進程中的共享數據可以和操作自身進程空間內的數據達到基本一致的速度,為證明給出下面代碼:

import multiprocessing import numpy as np NUM_PROCESS = multiprocessing.cpu_count() size = 1000000 def worker(index): main_nparray = np.zeros(size) for i in range(10000): main_nparray[:] = index + i return index if __name__ == "__main__": pool = multiprocessing.Pool(processes=NUM_PROCESS) a = time.time() result = pool.map(worker, range(NUM_PROCESS)) b = time.time() print(b-a)
運行時間:
73.0335
73.2103
73.1925
73.1642
73.2643
也就是說使用該種方式,各子進程對父進程中的共享內存的操作其性能與操作自己進程空間下內存基本一致,沒有什么性能損耗。
=======================================================================
2. 使用multiprocessing.Manger方式:
(共享數據,隱式的、自動的進行數據類型的轉換)

from multiprocessing import Process, Manager import multiprocessing import numpy as np import time NUM_PROCESS = multiprocessing.cpu_count() size = 1000000 def worker(index): shared_array_base[index][0] = np.zeros(shape=size) shared_array_base[index][0] += index for i in range(10000): shared_array_base[index][0] += 1 #1246.2459 #1226.7996 #1238.3933 #1241.1819 #print(shared_array_base[index][0]) return index if __name__ == '__main__': shared_array_base = [] manager = Manager() # 字典方式 for _ in range(NUM_PROCESS): shared_array_base.append(manager.dict()) pool = multiprocessing.Pool(processes=NUM_PROCESS) a = time.time() result = pool.map(worker, range(NUM_PROCESS)) b = time.time() print(b - a) for index in range(NUM_PROCESS): print(shared_array_base[index][0])
運行時間:
1246.2459
1226.7996
1238.3933
1241.1819
1241.2889
運行時個子進程CPU利用率:
可以看到使用Manger的方式雖然可以實現共享內存的操作,但是該種方式需要父進程進行參與,隨着子進程數量的增多父進程的負擔也就越重,同時各子進程需要等待的時間也就越多。從運行時間上可以看到使用Manger的方式對共享內存進行操作整體性能下降了十多倍,其性能遠低於Value/Array+numpy.frombuffer方式。
使用Manger方式對共享內存的操作是需要父進程參與的,這一點和Value/Array方式不同,同時使用Manger方式各子進程需要對共享數據進行類型轉換而這又進一步的影響性能表現。
如果上面代碼修改為:

#encoding:UTF-8 from multiprocessing import Process, Manager import multiprocessing import numpy as np import time NUM_PROCESS = multiprocessing.cpu_count() #NUM_PROCESS = 1 #125.9360 125.2017 118.3942 133.9661 99.3769 118.5580 size = 1000000 def worker(index): shared_array_base[index][0] = np.zeros(shape=size) shared_array_base[index][0] += index for i in range(10000): shared_array_base[index][0][:] = index+i #wrong result ###shared_array_base[index][0] += 1 #1246.2459 #1226.7996 #1238.3933 #1241.1819 #1241.2889 #print(shared_array_base[index][0]) return index if __name__ == '__main__': shared_array_base = [] manager = Manager() # 字典方式 for _ in range(NUM_PROCESS): shared_array_base.append(manager.dict()) pool = multiprocessing.Pool(processes=NUM_PROCESS) a = time.time() result = pool.map(worker, range(NUM_PROCESS)) b = time.time() print(b - a) for index in range(NUM_PROCESS): print(shared_array_base[index][0])
運行結果:
672.297180891037
[0. 0. 0. ... 0. 0. 0.]
[1. 1. 1. ... 1. 1. 1.]
[2. 2. 2. ... 2. 2. 2.]
[3. 3. 3. ... 3. 3. 3.]
[4. 4. 4. ... 4. 4. 4.]
[5. 5. 5. ... 5. 5. 5.]
[6. 6. 6. ... 6. 6. 6.]
[7. 7. 7. ... 7. 7. 7.]
[8. 8. 8. ... 8. 8. 8.]
[9. 9. 9. ... 9. 9. 9.]
[10. 10. 10. ... 10. 10. 10.]
[11. 11. 11. ... 11. 11. 11.]
運行時CPU使用率:
可以看到:
語句:
shared_array_base[index][0] += index
可以實現對共享內存的寫操作。
語句:
shared_array_base[index][0][:] = index+i #wrong result
並不能實現對共享內存的寫操作。
而即使不對共享內存進行寫操作其運行時間也只是縮減了一半,與Value/Array+numpy.frombuffer方式相比使用Manager的方式操作共享內存即使是只進行讀操作也是一種很耗費父進程計算資源的事情。使用Manager的方式各子進程讀取共享內存中的數據也是需要對共享數據進行格式轉換的,而這部分工作也是需要父進程參與的,因此使用Manager的方式並不能達到較好的性能表現。
如果使用單子進程的話:
代碼:

#encoding:UTF-8 from multiprocessing import Process, Manager import multiprocessing import numpy as np import time #NUM_PROCESS = multiprocessing.cpu_count() NUM_PROCESS = 1 #125.9360 125.2017 118.3942 133.9661 99.3769 118.5580 size = 1000000 def worker(index): shared_array_base[index][0] = np.zeros(shape=size) shared_array_base[index][0] += index for i in range(10000): shared_array_base[index][0] += 1 #1246.2459 #1226.7996 #1238.3933 #1241.1819 #1241.2889 #print(shared_array_base[index][0]) return index if __name__ == '__main__': shared_array_base = [] manager = Manager() # 字典方式 for _ in range(NUM_PROCESS): shared_array_base.append(manager.dict()) pool = multiprocessing.Pool(processes=NUM_PROCESS) a = time.time() result = pool.map(worker, range(NUM_PROCESS)) b = time.time() print(b - a) for index in range(NUM_PROCESS): print(shared_array_base[index][0])
運行時間:
125.9360
125.2017
118.3942
133.9661
99.3769
118.5580
123.2825
運行時cpu使用率:
這充分說明子進程對父進程的共享內存進行讀寫操作是需要父進程參與的,而這部分需要父進程參與的工作就是共享內存數據的類型轉換工作。隨着子進程數量的增加會加重父進程的負擔,從而導致各子進程均難以獲得較好的性能表現。
=========================================================================
3. 只使用multiprocessing.Value / multiprocessing.Array 方式:
(共享數據,使用中間數據進行操作后再直接賦值給共享數據:因為沒有進行數據類型轉換的共享數據難以直接進行讀寫操作)
代碼:

import numpy as np import multiprocessing import time import ctypes NUM_PROCESS = multiprocessing.cpu_count() size = 1000000 def worker(index): temp = shared_array_base[index] temp2 = np.zeros(size)+index for i in range(100*100): temp2 += 1 temp[:] = temp2[:] ###2129.3716 ###2253.2248 ###2127.4056 ###2128.4252 return index if __name__ == '__main__': shared_array_base = [] for _ in range(NUM_PROCESS): shared_array_base.append(multiprocessing.Array(ctypes.c_double, size, lock=False)) pool = multiprocessing.Pool(processes=NUM_PROCESS) a = time.time() result = pool.map(worker, range(NUM_PROCESS)) b = time.time() print(b-a) for i in range(NUM_PROCESS): main_nparray = np.frombuffer(shared_array_base[i], dtype=ctypes.c_double) print(main_nparray) print(type(main_nparray)) print(main_nparray.shape)
運行時間:
2129.3716
2253.2248
2127.4056
2128.4252
運行時CPU使用率:
為估計該種形式下讀寫共享數據的耗時,給出下面代碼:

import numpy as np import multiprocessing import time import ctypes NUM_PROCESS = multiprocessing.cpu_count() size = 1000000 def worker(index): temp = shared_array_base[index] temp2 = np.zeros(size)+index for i in range(100*100): temp2 += 1 ###temp[:] = temp2[:] ###2129.3716 ###2253.2248 ###2127.4056 ###2128.4252 return index if __name__ == '__main__': shared_array_base = [] for _ in range(NUM_PROCESS): shared_array_base.append(multiprocessing.Array(ctypes.c_double, size, lock=False)) pool = multiprocessing.Pool(processes=NUM_PROCESS) a = time.time() result = pool.map(worker, range(NUM_PROCESS)) b = time.time() print(b-a) for i in range(NUM_PROCESS): main_nparray = np.frombuffer(shared_array_base[i], dtype=ctypes.c_double) print(main_nparray) print(type(main_nparray)) print(main_nparray.shape)
運行時間:
75.1454
75.0409
75.0613
74.9661
75.3398
74.6730
從上面的代碼的運行時間上我們可以看出如果我們只使用multiprocessing.Value / multiprocessing.Array而不使用numpy.frombuffer函數的話也難以取得很好的性能表現,其中對共享數據的讀寫耗時大致需要2000秒以上,也就是說對共享數據的讀操作如果不使用numpy.frombuffer的方式直接對數據底層進行操作,而是進行手動的類型轉換,其耗時是巨大的,而如果使用nump.frombuffer的方式對共享數據進行讀寫其耗時是幾乎可以不計的。同時我們也可以看到使用multiprocessing.Value / multiprocessing.Array方式子進程對父進程的共享數據進行讀寫操作是不需要父進程參與的,其中讀寫共享數據時進行的數據類型轉換的工作都是在子進程內進行的。
同時可以看到如果不使用numpy.frombuffer方式直接對共享數據底層操作而是進行數據轉換的話,Manager的方式要比手動轉換性能高。
=================================================
最終結論就是在Python中如果多進程對共享內存操作的話,最佳性能的實現是使用multiprocessing.Value / multiprocessing.Array + numpy.frombuffer方式 。
以上功能使用Structure結構編寫:
import time import ctypes from ctypes import * import multiprocessing import numpy as np NUM_PROCESS = multiprocessing.cpu_count() size = 1000000 class Test(Structure): pass Test._fields_ = [(str(i), c_double*(size)) for i in range(NUM_PROCESS)] data = multiprocessing.Value(Test, lock=False) # all zero init def worker(index): main_nparray = np.frombuffer(data, 'd', size, index*size*sizeof(ctypes.c_double)) for i in range(10000): #print(index, main_nparray) main_nparray[:] = index + i return index pool = multiprocessing.Pool(processes=NUM_PROCESS) a = time.time() result = pool.map(worker, range(NUM_PROCESS)) b = time.time() print(b - a) nparray = np.frombuffer(data, 'd', NUM_PROCESS*size, 0) nparray.resize(NUM_PROCESS, size) print("result:") #print(nparray.shape) print(nparray)