什么是进程和线程?
进程是指计算机中已运行的程序,是系统进行资源分配和调度的基本单位;线程是CPU调度和分派的基本单位,一般来说,进程是线程的容器,一个进程可以包含多个线程。最近因为一个计算时间比较长的程序,接触了Python的多进程计算,Python实现多进程多线程计算还是比较容易的,我用的是Python的multiprocessing模块。
Python的multiprocessing模块实现多进程
multiprocessing模块实现多进程,可以用Process类,也可以用进程池pool(),在这里主要说说我用进程池的一些问题和体会。
multiprocessing安装用pip工具就可以
pip install multiprocessing
在进行计算之前,如果不知道自己的计算机的CPU核数量,可以用multiprocessing下的命令输出
multiprocessing.cpu_count()
一个简单的实现:
import multiprocessing def func(x): y = x * 2 print(multiprocessing.current_process().name,y) if __name__ == "__main__": pool = multiprocessing.Pool(processes=4)# 创建多个进程 for i in range(100): pool.apply_async(func,(i, )) pool.close() pool.join()
代码中的multiprocessing.current_process().name获得计算时的CPU核心名,输出如下,可以看到4个核心都在计算时用到
apply_async是进程池的常用方法之一,类似的方法还有:
apply(self, func, args=(), kwds={}),将func函数提交给进程池处理;
apply_async(self, func, args=(), kwds={}, callback=None,error_callback=None),是apply的一种异步,不会被阻塞;
map(self, func, iterable, chunksize=None),类似于做一个映射;
进程计算结束后,必须用 close() 关闭进程池,join() 是等待进程池中的所有进程执行完毕,必须在close()之后调用。
向函数中传递多个参数
以上程序只向func里传递了一个函数,那么能不能实现多个函数的传递,因为apply等都不支持传递多个函数,所以需要对传递的多个函数进行一定的处理,这里用到了 zip() 函数,相当于把多个变量压缩:
import multiprocessing import numpy as np def func(x,y,z): return x+y,z if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 创建4个进程 x=np.zeros((100,1)) y=np.zeros((100,1)) z=np.zeros((100,1)) result = [] for i in range(100): x[i,0] = i y[i,0] = i z[i,0] = i xx=np.squeeze(x) yy=np.squeeze(y) zz=np.squeeze(z) c=list(zip(xx,yy,zz)) cc =np.array(c) result=pool.starmap(func,c) pool.close() # 关闭进程池 results=np.array(result) print(results)
因为用apply、map等都不能实现,所以这里用了pool的starmap函数,是与map类似,但是能将传递的参数“解压缩”,就满足了我们传递多个参数的需要。
在函数中使用进程池
另外,还在写程序的时候发现一个坑,进程池的定义和关闭必须在主程序下面,在函数中想利用进程池进行多线程的计算,需要在主程序下创建进程池,在函数中就可以直接使用,如果不放在主程序下会报错,而close也需要放在主程序下,不然函数运行一次进程池就关闭了。
import multiprocessing import numpy as np def func(x,y,z): return x+y,z def text(): x = np.zeros((100, 1)) y = np.zeros((100, 1)) z = np.zeros((100, 1)) result = [] for i in range(100): x[i, 0] = i y[i, 0] = i z[i, 0] = i xx = np.squeeze(x) yy = np.squeeze(y) zz = np.squeeze(z) c = list(zip(xx, yy, zz)) result = pool.starmap(func, c) return result if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 创建4个进程 result = text() pool.close() # 关闭进程池 pool.join() results=np.array(result) print(results)