進程 (process)
進程是對各種資源管理的集合,包含對各種資源的調用、內存的管理、網絡接口的調用
進程要操作 CPU 必須先啟動一個線程,啟動一個進程的時候會自動創建一個線程,進程里的第一個線程就是主線程
程序執行的實例
有唯一的進程標識符(pid)
multiprossing 模塊
啟動進程
示例:
import multiprocessing
import time
def process_run(n):
time.sleep(1)
print('process', n)
for i in range(10):
p = multiprocessing.Process(target=process_run, args=(i, ))
p.start()
所有進程都是由父進程啟動的
示例:
import multiprocessing
import os
def show_info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id', os.getpid())
print('\n\n')
def f(name):
show_info('function f')
print(name)
if __name__ == '__main__':
show_info('main process line')
p = multiprocessing.Process(target=f, args=('children process', ))
p.start()
進程間通信
線程間共享內存空間,進程間只能通過其他方法進行通信
Queue
注意這個 Queue 不同於 queue.Queue
Queue type using a pipe, buffer and thread
兩個進程的 Queue 並不是同一個,而是將數據 pickle 后傳給另一個進程的 Queue
用於父進程與子進程之間的通信或同一父進程的子進程之間通信
示例:
from multiprocessing import Process, Queue
def p_put(*args):
q.put(args)
print('Has put %s' % args)
def p_get(*args):
print('%s wait to get...' % args)
print(q.get())
print('%s got it' % args)
q = Queue()
p1 = Process(target=p_put, args=('p1', ))
p2 = Process(target=p_get, args=('p2', ))
p1.start()
p2.start()
輸出結果:
Has put p1
p2 wait to get...
('p1',)
p2 got it
換成 queue 示例:
from multiprocessing import Process
import queue
def p_put(*args):
q.put(args)
print('Has put %s' % args)
def p_get(*args):
print('%s wait to get...' % args)
print(q.get())
print('%s got it' % args)
q = queue.Queue()
p1 = Process(target=p_put, args=('p1', ))
p2 = Process(target=p_get, args=('p2', ))
p1.start()
p2.start()
輸出結果:
Has put p1
p2 wait to get...
由於父進程啟動子進程時是復制一份,所以每個子進程里也有一個空的隊列,但是這些隊列數據獨立,所以 get 時會阻塞
Pipe
Pipe(管道) 是通過 socket 進行進程間通信的
所以步驟與建立 socket 連接相似:
建立連接、發送/接收數據(一端發送另一端不接受就會阻塞)、關閉連接
示例:
from multiprocessing import Pipe, Process
def f(conn):
conn.send('send by child')
print('child recv:', conn.recv())
conn.close()
parent_conn, child_conn = Pipe() # 獲得 Pipe 連接的兩端
p = Process(target=f, args=(child_conn, ))
p.start()
print('parent recv:', parent_conn.recv())
parent_conn.send('send by parent')
p.join()
輸出結果:
parent recv: send by child
child recv: send by parent
進程間數據共享
Manager
Manager 實現的是進程間共享數據
支持的可共享數據類型:
list
dict
Value
Array
Namespace
Queue queue.Queue
JoinableQueue queue.Queue
Event threading.Event
Lock threading.Lock
RLock threading.RLock
Semaphore threading.Semaphore
BoundedSemaphore threading.BoundedSemaphore
Condition threading.Condition
Barrier threading.Barrier
Pool pool.Pool
示例:
from multiprocessing import Manager, Process
import os
def func():
m_dict['key'] = 'value'
m_list.append(os.getpid())
manager = Manager()
m_dict = manager.dict()
m_list = manager.list()
p_list = []
for i in range(10):
p = Process(target=func)
p.start()
p_list.append(p)
for p in p_list:
p.join()
print(m_list)
print(m_dict)
進程鎖
打印時可能會出錯,加鎖可以避免
示例:
from multiprocessing import Lock, Process
def foo(n, l):
l.acquire()
print('hello world', n)
l.release()
lock = Lock()
for i in range(100):
process = Process(target=foo, args=(i, lock))
process.start()
進程池 (pool)
同一時間最多有幾個進程在 CPU 上運行
示例:
from multiprocessing import Pool
import time
import os
def foo(n):
time.sleep(1)
print('In process', n, os.getpid())
return n
def bar(*args):
print('>>done: ', args, os.getpid())
pool = Pool(processes=3)
print('主進程: ', os.getpid())
for i in range(10):
# pool.apply(func=foo, args=(i, ))
pool.apply_async(func=foo, args=(i, ), callback=bar)
print('end')
pool.close()
pool.join()
從程序運行過程中可以看出:同一時間最多只有3個進程在運行,類似於線程中的信號量
主進程在執行 callback 函數
注意
1.
pool.apply(func=foo, args=(i, )) 是串行執行
pool.apply_async(func=foo, args=(i, ), callback=bar) 是並行執行
2.
callback 函數會以 target 函數返回結果為參數,在 target 函數執行結束之后執行
callback 函數是主進程調用的
3.
如果不執行 join,程序會在主進程執行完成之后直接結束,不會等待子進程執行完成
Pool.join() 必須在 Pool.close() 之后執行,否則會報錯:ValueError: Pool is still running
