1. multiprocessing
Python
實現多進程的模塊最常用的是multiprocessing
,此外還有multiprocess、pathos、concurrent.futures、pp、parallel、pprocess
等模塊。
1.1 multiprocessing.Process
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
參數
group
: 為預留參數target
:子進程要執行的目標函數name
:線程名稱args、kwargs
:參數,args
必須是元組deamon
為bool
值:表示是否為守護進程
實例
# coding=utf-8
import multiprocessing
import time
def run(a):
time.sleep(5)
print(a)
return a * a
if __name__ == '__main__':
p = multiprocessing.Process(target=run, args=(123456,))
p.start() # 運行進程實例
p.join() # 阻塞主進程,當子進程結束后,才會繼續執行主進程
print(123)
1.2 multiprocessing.Pool
創建多個子進程最好是采用進程池 multiprocessing.Pool
multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
參數
processes
:進程數量,如果processes
是None
那么使用os.cpu_count()
返回的數量initializer
: 如果initializer
不是None
,那么每一個工作進程在開始的時候會調用initializer(*initargs)
maxtasksperchild
:工作進程退出之前可以完成的任務數,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild
默認是None
,意味着只要Pool
存在工作進程就會一直存活context
: 用在制定工作進程啟動時的上下文,一般使用multiprocessing.Pool()
或者一個context
對象的Pool()
方法來創建一個池,兩種方法都適當的設置了context
創建子進程的幾種方式
apply()
:同步阻塞執行,上一個子進程結束后才能進行下一個子進程(不推薦)apply_async()
:異步非阻塞執行,每個子進程都是異步執行的(並行)(推薦)map()
:同步阻塞map_async()
:異步非阻塞imap()
:內存不夠用可以采用此種方式,速度慢於map()
imap_unordered
:imap()
的無序版本(不會按照調用順序返回,而是按照結束順序返回),返回迭代器實例
1.2.1 apply
同步阻塞執行,上一個子進程結束后才能進行下一個子進程
apply(func, args=(), kwds={}, callback=None, error_callback=None)
1.2.2 apply_async
異步非阻塞執行,每個子進程都是異步執行的(並行),異步執行指的是一批子進程並行執行,且子進程完成一個,就新開始一個,而不必等待同一批其他進程完成
# callback 回調,error_back 錯誤回調
apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
示例
# coding=utf-8
import multiprocessing
def callback(result):
"""回調函數"""
with open("result.txt", "a+", encoding="utf-8") as f:
f.write(str(result) + "\n")
def run(num):
return num * num
if __name__ == '__main__':
pool = multiprocessing.Pool(6)
for i in range(1000):
pool.apply_async(run, args=(i,), callback=callback)
# # 如有多個參數,可傳一個 iterable
# pool.apply_async(run, args=([i, 123, 456]), callback=callback)
pool.close()
pool.join()
1.2.3 map
若子進程有返回值,且需集中處理,建議采用此種方式(但是它是同步阻塞的):
# iterable 可迭代類型,將 iterable 中每個元素作為參數應用到 func 函數中,返回 list
map(func, iterable, chunksize=None)
1.2.4 map_async
map
的異步非阻塞版本,返回 MapResult
實例,使用 get()
方法,獲取結果(list
方法):
map_async(func, iterable, chunksize=None, callback=None, error_callback=None)
apply_async 與 map_async 對比
# coding=utf-8
import multiprocessing
import time
def run(a):
return a * a
data = []
def my_callback(result):
data.append(result)
if __name__ == '__main__':
st = time.time()
pool = multiprocessing.Pool(6)
# 總耗時:0.4497215747833252
future = pool.map_async(run, range(20000))
print(future.get()) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# # 總耗時:3.019148111343384
# for i in range(20000):
# pool.apply_async(run, args=(i,), callback=my_callback)
#
# print(data)
pool.close()
pool.join()
print(f"總耗時:{time.time() - st}")
結論
map_async
比apply_async
速度快- 若想統一處理結果,
map_async
比apply_async
更方便
1.2.5 imap 和 imap_unordered
內存不夠可以采用 imap
方式,map
的迭代器版本,返回迭代器實例,速度遠慢於 map
,但是堆內存需求小。
imap_unordered
為 imap
的無序版本
imap(func, iterable, chunksize=1)
imap_unordered(func, iterable, chunksize=1)
實例:
# coding=utf-8
import multiprocessing
import time
def run(a):
return a * a
data = []
def my_callback(result):
data.append(result)
if __name__ == '__main__':
st = time.time()
pool = multiprocessing.Pool(6)
# # 總耗時:0.4497215747833252
# future = pool.map_async(run, range(20000))
# print(future.get()) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# # 總耗時:3.019148111343384
# for i in range(20000):
# pool.apply_async(run, args=(i,), callback=my_callback)
#
# print(data)
future = pool.imap(run, range(20000)) # 總耗時:4.171960115432739
print(future)
for i in future:
print(i)
pool.close()
pool.join()
print(f"總耗時:{time.time() - st}") # 總耗時:0.4497215747833252
1.2.6 starmap 和 starmap_async
starmap
可以使子進程活動接收多個參數,而 map
只能接收一個參數:
# 子進程活動 func允許包含多個參數,也即iterable的每個元素也是iterable(其每個元素作為func的參數),返回結果組成的 list
starmap(func, iterable, chunksize=None)
# 異步並行版本,返回 MapResult 實例,get() 方法可以獲取結果組成的 list
starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)
# 使用方式
pool.starmap_async(f, ((a0, b0), (a1, b1), ...)).get()
1.3 進程間通信(數據共享)
每個進程是相互獨立的,內存無法共享,實現進程間數據共享的方式有:
multiprocessing.Value(typecode_or_type, *args, lock=True)
:共享單個數據,共享內存multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
:共享數組,共享內存multiprocessing.Manager()
:共享進程,支持多種數據結構的數據共享
Manager
支持的類型有:list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value
和Array
,不僅可以在本地進程間共享,甚至可以在多客戶端實現網絡共享,不過 Manager
占用資源較大。
1、共享 dict
:
# coding=utf-8
# 多個進程將數據添加到字典 dd 中
import multiprocessing
def run(d, k, v):
d[k] = v
if __name__ == '__main__':
pool = multiprocessing.Pool(6)
manager = multiprocessing.Manager()
dd = manager.dict()
for i in range(20):
future = pool.apply_async(run, args=(dd, i, i * i))
pool.close()
pool.join()
print(dict(dd))
# 運行結果
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 8: 64, 7: 49, 9: 81, 10: 100, 11: 121, 12: 144, 13: 169, 14: 196, 15: 225, 16: 256, 17: 289, 18: 324, 19: 361}
2、管理隊列,並讓不同的進程可以訪問它:
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
m = multiprocessing.Manager()
q = m.Queue()
for i in range(20):
pool.apply_async(worker, (i, q))
pool.close()
pool.join()
# coding=utf-8
import multiprocessing
def write(name, que):
que.put("%d is done" % name)
print(f'{name} write done!')
def read(que):
while not que.empty():
val = que.get(True)
print('read===>: ', val)
# while True:
# if not que.empty():
# val = que.get(True)
# print('read===>: ', val)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
m = multiprocessing.Manager()
q = m.Queue()
for i in range(20):
pool.apply_async(write, (i, q))
p1 = multiprocessing.Process(target=read, args=(q,))
p1.start()
p1.join()
pool.close()
pool.join()
注意:在操作共享對象元素時,除了賦值操作,其他的方法都作用在共享對象的拷貝上,並不會對共享對象生效。比如:
dic['k'] = []; dic['k'].append(x)
,將不會修改dic
的內容
1.4 進程間通信(數據傳遞)
- 隊列
multiprocessing.Queue(maxsize=0)
:建立共享的隊列實例multiprocessing.JoinableQueue(maxsize=0)
:建立可阻塞的隊列實例
- 管道
multiprocessing.Pipe(duplex=True)
:建立一對管道對象,用於在兩個進程之間傳遞數據
2. concurrent.futures 模塊
concurrent.futures
是 3.2
中引入的新模塊,它為異步執行可調用對象提供了高層接口,分為兩類:
ThreadPoolExecutor
:多線程編程ProcessPoolExecutor
:多進程編程
兩者實現了同樣的接口,這些接口由抽象類 Executor
定義;這個模塊提供了兩大類型:
Executor
:執行器,用於管理工作池Future
:管理工作計算出的結果
2.1 concurrent.futures.Executor 類
提供了一系列方法,可以用於異步執行調用,定義的方法有:
# 調用對象執行,fn(*args, **kwargs),返回 Future 對象,可用 future.result() 獲取執行結果
submit(fn, *args, **kwargs)
# 異步執行 func,並支持多次並發調用,返回一個迭代器
# timeout 秒數可以是浮點數或者整數,如果設置為 None 或者不指定,則不限制等待時間
# ProcessPoolExecutor 這個方法將 iterables 划分為多塊,作為獨立的任務提交到進程池(不是 1)可顯著提升性能,ThreadPoolExecutor,chunksize 不起作用
map(func, *iterables, timeout=None, chunksize=1)
# 告訴當執行器 executor 在當前所有等待的 future 對象運行完畢后,應該釋放執行器用到的所有資源
# True 會等待所有 future 執行完畢,且 executor 的資源都釋放完會才會返回,False 會立即返回,executor 的資源會在 future 執行完后釋放
shutdown(wait=True)
2.2 ThreadPoolExecutor
ThreadPoolExecutor + requests 並發執行
# coding=utf-8
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch(req_url):
r = requests.get(req_url)
return r.json()['args']['a']
if __name__ == '__main__':
start = time.time()
numbers = range(12)
url = 'http://httpbin.org/get?a={}'
# submit() 方式
with ThreadPoolExecutor(max_workers=3) as executor:
# task_list = [executor.submit(fetch(url.format(n))) for n in range(12)]
task_list = [executor.submit(fetch, url.format(n)) for n in range(12)]
for future in as_completed(task_list):
print(future.result())
# data = future.result() # 總耗時:2.903249740600586
# print(data)
## map() 方式
# with ThreadPoolExecutor(max_workers=3) as executor:
# future = executor.map(fetch, (url.format(n) for n in range(12)))
#
# for result in future:
# print(result)
print(f'總耗時:{time.time() - start}') # 總耗時:2.630300760269165
實測
submit
未按順序返回結果
2.3 ProcessPoolExecutor
ProcessPoolExecutor
使用進程池來異步執行調用,適合計算密集型任務,方法參數:
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
示例:
# coding=utf-8
from concurrent.futures import ProcessPoolExecutor, as_completed
def fib(n):
if n <= 2:
return 1
return fib(n - 1) + fib(n - 2)
if __name__ == '__main__':
numbers = range(20)
with ProcessPoolExecutor(max_workers=3) as executor:
# # map 方式
# for num, result in zip(numbers, executor.map(fib, numbers)):
# print(f"{num}====>{result}")
# submit 方式
work_dict = {executor.submit(fib, i): i for i in numbers}
for future in as_completed(work_dict):
num = work_dict[future]
try:
data = future.result()
except Exception as e:
print(e)
else:
print(f"fib({num} = {data})")
2.4 Future 類
Future
類封裝了可調用對象的異步執行,由 Executor.submit()
產生,有如下方法:
cancel()
:嘗試取消調用,如果該調用正在執行中,無法取消,本方法返回False
,其他情況下調用會被取消,並返回True
;只有當任務提交了還沒執行才可以通過這種方式取消cancelled()
: 如果調用已經被成功取消,返回True
running()
:如果調用正在執行,無法被取消,則返回True
done()
:如果調用成功被取消或者已經執行完畢,返回 Trueresult(timeout=None)
: 返回調用的返回值。如果調用還沒有完成,則最多等待timeout
秒。如果timeout
秒之后還沒有完成,拋出concurrent.futures.TimeoutError``。timeout
可以為整數或者浮點數。如果不指定或者為None,則不限制等待時間。如果
future 在完成之前被取消了,會拋出CancelledError
異常,如果調用拋出異常,這個方法會拋出同樣的異常。同時它也會阻塞直到任務完成,獲取被取消exception(timeout=None)
:返回被調用拋出的異常,如果調用還沒有執行完畢,則最多等待timeout
秒。如果timeout
秒之后還沒有完成,拋出concurrent.futures.TimeoutError
。timeout
可以為整數或者浮點數。如果不指定或者為None
,則不限制等待時間。
如果future
在完成之前被取消了,會拋出CancelledError
異常,如果調用完成並且沒有拋出異常,返回None
add_done_callback(fn)
:為future
附加可調用對象fn
,當future
運行完畢或者被取消時,它會被用作fn
的唯一參數,並調用fn
。可調用對象按照添加順序依次調用,並且總是在添加時所處進程的一個線程內調用它。如果該可調用對象拋出了屬於Exception
子類的異常,它會被記錄並忽略。如果它拋出了屬於BaseException
子類的異常,該行為未定義。
如果future
已經完成或者已經取消,fn
會被立即調用
通過 add_done_callack() 獲取返回值和捕獲異常
concurrent.futuresthread.ProcessPoolExecutor
線程池中的 worker
引發異常的時候,並不會直接向上拋起異常,而是需要主線程通過調用 concurrent.futures.Future.exception(timeout=None)
方法主動獲取 worker
的異常:
# coding=utf-8
from concurrent.futures import ProcessPoolExecutor, as_completed
def fib(n):
if n <= 2:
return 1
return fib(n - 1) + fib(n - 2)
def call_back(future):
"""
回調(可獲取多進程返回值、錯誤)
:param future: future 對象
:return:
"""
# 獲取錯誤信息
worker_exception = future.exception()
if worker_exception:
print(worker_exception)
# 獲取返回值
print(future.result())
def test(n):
if n % 2 == 0:
n / 0 # 發生異常
return n * 2
if __name__ == '__main__':
numbers = range(20)
with ProcessPoolExecutor(max_workers=3) as executor:
# # map 方式
# for num, result in zip(numbers, executor.map(fib, numbers)):
# print(f"{num}====>{result}")
# submit 方式
# work_dict = {executor.submit(fib, i): i for i in numbers}
# for future in as_completed(work_dict):
# num = work_dict[future]
# try:
# data = future.result()
# except Exception as e:
# print(e)
# else:
# print(f"fib({num} = {data})")
# 其他方法
for i in numbers:
executor.submit(test, i).add_done_callback(call_back)
實測
map()
方式提交的會觸發異常,submit()
方式需要通過add_done_callback()
主動捕獲異常!
參考文章
https://blog.csdn.net/jpch89/article/details/87643972
https://blog.csdn.net/makingLJ/article/details/98084973
3. 實例
3.1 多進程(池)向同一文件寫入數據
# coding=utf-8
"""
Function:回調函數解決多進程向同一文件寫入數據
"""
import multiprocessing
def callback(result):
"""回調函數"""
with open("result.txt", "a+", encoding="utf-8") as f:
f.write(str(result) + "\n")
def run(num):
return num * num
if __name__ == '__main__':
pool = multiprocessing.Pool(6)
for i in range(1000):
pool.apply_async(run, args=(i,), callback=callback)
pool.close()
pool.join()