使用multiprocessing中的常見問題


在python的解釋器中,CPython是應用范圍最廣的一種,其具有豐富的擴展包,方便了開發者的使用。當然CPython也不是完美的,由於全局解釋鎖(GIL)的存在,python的多線程可以近似看作單線程。為此,開發者推出了multiprocessing,這里介紹一下使用中的常見問題。

環境

>>> import sys >>> print(sys.version) 3.6.0 |Anaconda 4.3.1 (64-bit)| (default, Dec 23 2016, 12:22:00) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]

共享變量

任務能否切分成多個子任務是判斷一個任務能否使用多進程或多線程的重要標准。在任務切分時,不可避免的需要數據通訊,而共享變量是數據通訊的重要方式。在multiprocess中,共享變量有兩種方式:Shared memoryServer process

share memory

multiprocess通過ArrayValue來共享內存

from multiprocessing import Array, Value num = 10 elements = Array("i", [2 * i + 1 for i in range(num)]) val = Value('d', 0.0)

然后就可以將數據同步到Process中。這里舉一個例子,即將elements翻倍,val增加1,首先定義函數

def func(elements, val): for i, ele in enumerate(elements): elements[i] = ele * 2 val.value += 1

再定義Process

from multiprocessing import Process p = Process(target=func, args=(elements, val, )) p.start() # 運行Process p.join() # 等待Process運行結束

最終運行結果

=====Process運行前=======
[elements]:1 3 5 7 9 11 13 15 17 19 [Value]:0.0 =====Process運行后======= [elements]:2 6 10 14 18 22 26 30 34 38 [Value]:1.0

在某些特定的場景下要共享string類型,方式如下:

from ctypes import c_char_p str_val = Value(c_char_p, b"Hello World")

關於Share Memory支持的更多類型,可以查看module-multiprocessing.sharedctypes

Server process

此種方式通過創建一個Server process來管理python object,然后其他process通過代理來訪問這些python object。相較於share memory,它支持任意類型的共享,包括:list、dict、Namespace等。這里以dict和list舉一個例子:

from multiprocessing import Process, Manager def func(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) print("=====Process運行前=======") print(d) print(l) p = Process(target=func, args=(d, l)) p.start() p.join() print("=====Process運行后=======") print(d) print(l)

運行結果如下

=====Process運行前======= {} [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] =====Process運行后======= {1: '1', '2': 2, 0.25: None} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

進程間通訊

眾所周知,並發編程中應該盡量避免共享變量,多進程更是如此。在這種情況下,多進程間的通訊就要用到QueuePipe

Queue

Queue是一種線程、進程安全的先進先出的隊列。使用中,首先定義Queue

from multiprocessing import Queue queue = Queue()

然后將需要處理的數據放入Queue中

elements = [i for i in range(100)] for i in elements: queue.put(i)

然后創建子進程process

from multiprocessing import Process process = Process(target=func, args=(queue, ))

其中func是子進程處理數據的邏輯。

from queue import Empty def func(queue): buff = [] while True: try: ele = queue.get(block=True, timeout=1) buff.append(str(ele)) except Empty: print(" ".join(buff)) print("Queue has been empty.....") break

使用queue.get時,若Queue中沒有數據,則會拋出queue.Empty錯誤。值得注意的是,在使用queue.get()時一定要設置block=Truetimeout,否則它會一直等待,直到queue中放入數據(剛開始用的時候,我一直奇怪為什么程序一直處在等待狀態)。運行結果

=====單進程====== 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 Queue has been empty.....

Pipe

Pipe是一種管道,一端輸入,一端輸出。在multiprocess中,可以通過Pipe()函數來定義,返回sendrecv的connection。使用中,先定義

from multiprocessing import Pipe parent_conn, child_conn = Pipe()

然后一端放入數據,另一端就可以接受數據了

from multiprocessing import Process def f(conn): conn.send([42, None, 'hello']) conn.close() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join()

輸出結果

[42, None, 'hello']

另外,值得注意的是,若兩個或更多進程同時從管道一端讀或寫數據,會導致管道中的數據corrupt。為了直觀的理解這種情況,這里舉一個例子,即在主進程將數據放入管道,在子進程從管道中讀出數據,並打印結果。區別之處在於,子進程的數量。首先將數據放入管道:

def func(conn): a = conn.recv() print(a) parent, child = Pipe() child.send("Hello world...")

然后開啟子進程

print("======單進程========") p = Process(target=func, args=(parent, )) p.start() p.join() print("======多進程========") num_process = 2 ps = [Process(target=func, args=(parent, )) for _ in range(num_process)] for p in ps: p.start() for p in ps: p.join()

輸出結果

pipe_corrput

多進程並未按照預想的輸出兩個Hello World,而是處於死鎖的狀態。

例子

關於Queue和Pipe的用法講了這么多,下面做一個小練習,內容是:利用多線程從文件中讀取數據,處理后將數據保存到另外一個文件中。具體方法如下: 
1. 開辟一個子進程,從文件中讀取數據,並將數據存入Queue中 
2. 開辟多個子進程,從Queue中讀取數據,處理數據,並將數據放入管道一端(注意加鎖) 
3. 開辟一個子進程,從管道另一端獲取數據,並將數據寫入文件中

0.導包

from multiprocessing import Process, Array, Queue, Value, Pipe, Lock from queue import Empty import sys

1.讀取數據

def read_file(fin, work_queue): for line in fin: i = int(line.strip("\n")) work_queue.put_nowait(i)

其中work_queue用於連通“讀數據的進程”和“處理數據的進程”。

2.處理數據

def worker_func(work_queue, conn, lock, index): while True: try: ele = work_queue.get(block=True, timeout=0.5) + 1 with lock: conn.send(ele) except Empty: print("Process-{} finish...".format(index)) conn.send(-1) break

從隊列中讀取數據,直到隊列中的數據全被取走。當Queue中不存在數據時,向queue放入終止符(-1),告訴后面的進程,前面的人任務已經完成。

3.寫數據

def write_file(conn, fout, num_workers): record = 0 while True: val = conn.recv() if val == -1: record += 1 else: print(val, file=fout) fout.flush() if record == num_workers: break

當寫進程收到特定數量終止符(-1)時,寫進程就終止了。

4.執行

path_file_read = "./raw_data.txt" path_file_write = "./data.txt" with open(path_file_read) as fin, \ open(path_file_write, "w") as fout: queue = Queue() parent, child = Pipe() lock = Lock() read_Process = Process(target=read_file, args=(fin, queue, )) worker_Process = [Process(target=worker_func, args=(queue, parent, lock, index, )) for index in range(3)] write_Process = Process( target=write_file, args=(child, fout, len(worker_Process), )) read_Process.start() for p in worker_Process: p.start() write_Process.start() print("read....") read_Process.join() print("worker....") for p in worker_Process: p.join() print("write....") write_Process.join()

輸入/輸出

打印錯行

在使用多進程中,你會發現打印的結果發生錯行。這是因為python的print函數是線程不安全的,從而導致錯行。解決方法也很簡單,給print加一把鎖就好了,方式如下

from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()

無法打印日志信息

剛開始用多進程時,經常會出現日志信息無法打印的情況。其實問題很簡單。在多進程中,打印內容會存在緩存中,直到達到一定數量才會打印。解決這個問題,只需要加上

import sys sys.stdout.flush() sys.stderr.flush()

例如上面的例子,應該寫成

import sys def f(l, i): l.acquire() try: print('hello world', i) sys.stdout.flush() # 加入flush finally: l.release()

總結

以上就是我在使用multiprocessing遇到的問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM