Python多進程原理與實現


Date: 2019-06-04

Author: Sun

1 進程的基本概念

什么是進程?

​ 進程就是一個程序在一個數據集上的一次動態執行過程。進程一般由程序、數據集、進程控制塊三部分組成。我們編寫的程序用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程中所需要使用的資源;進程控制塊用來記錄進程的外部特征,描述進程的執行變化過程,系統可以利用它來控制和管理進程,它是系統感知進程存在的唯一標志。

2 父進程和子進程

​ Linux 操作系統提供了一個 fork() 函數用來創建子進程,這個函數很特殊,調用一次,返回兩次,因為操作系統是將當前的進程(父進程)復制了一份(子進程),然后分別在父進程和子進程內返回。子進程永遠返回0,而父進程返回子進程的 PID。我們可以通過判斷返回值是不是 0 來判斷當前是在父進程還是子進程中執行。

​ Python 中同樣提供了 fork() 函數,此函數位於 os 模塊下。

# -*- coding: utf-8 -*-  
__author__ = 'sun'
__date__ = '2018/6/04 下午5:17' 

import os
import time

print("在創建子進程前: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

pid = os.fork()  #一次調用,兩次返回
if pid == 0:
    print("子進程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    time.sleep(5)
else:
    print("父進程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    # pid表示回收的子進程的pid
    #pid, result = os.wait()  # 回收子進程資源  阻塞
    time.sleep(5)
    #print("父進程:回收的子進程pid=%d" % pid)
    #print("父進程:子進程退出時 result=%d" % result)

# 下面的內容會被打印兩次,一次是在父進程中,一次是在子進程中。
# 父進程中拿到的返回值是創建的子進程的pid,大於0
print("fork創建完后: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
2.1 父子進程如何區分?

​ 子進程是父進程通過fork()產生出來的,pid = os.fork()

​ 通過返回值pid是否為0,判斷是否為子進程,如果是0,則表示是子進程

​ 由於 fork() 是 Linux 上的概念,所以如果要跨平台,最好還是使用 subprocess 模塊來創建子進程。

2.2 子進程如何回收?

python中采用os.wait()方法用來回收子進程占用的資源

pid, result = os.wait() # 回收子進程資源  阻塞,等待子進程執行完成回收

如果有子進程沒有被回收的,但是父進程已經死掉了,這個子進程就是僵屍進程。

3 Python進程模塊

​ python的進程multiprocessing模塊有多種創建進程的方式,每種創建方式和進程資源的回收都不太相同,下面分別針對Process,Pool及系統自帶的fork三種進程分析。

3.1 fork()
import os
pid = os.fork() # 創建一個子進程
os.wait() # 等待子進程結束釋放資源
pid為0的代表子進程。

缺點:
​ 1.兼容性差,只能在類linux系統下使用,windows系統不可使用;
​ 2.擴展性差,當需要多條進程的時候,進程管理變得很復雜;
​ 3.會產生“孤兒”進程和“僵屍”進程,需要手動回收資源。
優點:
​ 是系統自帶的接近低層的創建方式,運行效率高。

3.2 Process進程

multiprocessing模塊提供Process類實現新建進程

# -*- coding: utf-8 -*-
import os
from multiprocessing  import Process
import time

def fun(name):
	print("2 子進程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
	print("hello " + name)

def test():
	print('ssss')

if __name__ == "__main__":
	print("1 主進程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
	ps = Process(target=fun, args=('jingsanpang', ))
	print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
	print("3 進程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
	print(ps.is_alive())
	ps.start()
	print(ps.is_alive())
	print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
	print("4 進程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
	ps.join()
	print(ps.is_alive())
	print("5 進程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
	ps.terminate()
	print("6 進程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

特點:
​ 1.注意:Process對象可以創建進程,但Process對象不是進程,其刪除與否與系統資源是否被回收沒有直接的關系。
2.主進程執行完畢后會默認等待子進程結束后回收資源,不需要手動回收資源;join()函數用來控制子進程
​ 結束的順序,其內部也有一個清除僵屍進程的函數,可以回收資源;
3.Process進程創建時,子進程會將主進程的Process對象完全復制一份,這樣在主進程和子進程各有一個 Process對象,但是p.start()啟動的是子進程,主進程中的Process對象作為一個靜態對象存在,不執行。

4.當子進程執行完畢后,會產生一個僵屍進程,其會被join函數回收,或者再有一條進程開啟,start函數也會回收僵屍進程,所以不一定需要寫join函數。
5.windows系統在子進程結束后會立即自動清除子進程的Process對象,而linux系統子進程的Process對象如果沒有join函數和start函數的話會在主進程結束后統一清除。

另外還可以通過繼承Process對象來重寫run方法創建進程

3.3 進程池POOL (多個進程)

進程池:為了避免我們多進程創建,銷毀帶來的開銷,引入的進程池

# -*- coding: utf-8 -*-
__author__ = 'sun'
__date__ = '2018/6/04 下午9:16'

import multiprocessing
import time

def work(msg):
	mult_proces_name = multiprocessing.current_process().name
	print('process: ' + mult_proces_name + '-' + msg)
	

if __name__ == "__main__":
	pool = multiprocessing.Pool(processes=5) # 創建4個進程
	for i in range(20):
		msg = "process %d" %(i)
		pool.apply_async(work, (msg, ))
	pool.close() # 關閉進程池,表示不能在往進程池中添加進程
	pool.join() # 等待進程池中的所有進程執行完畢,必須在close()之后調用
	print("Sub-process all done.")

​ 上述代碼中的pool.apply_async()apply()函數的變體,apply_async()apply()的並行版本,apply()apply_async()的阻塞版本,使用apply()主進程會被阻塞直到函數執行結束,所以說是阻塞版本。apply()既是Pool的方法,也是Python內置的函數,兩者等價。可以看到輸出結果並不是按照代碼for循環中的順序輸出的。

多個子進程並返回值

apply_async()本身就可以返回被進程調用的函數的返回值。上一個創建多個子進程的代碼中,如果在函數func中返回一個值,那么pool.apply_async(func, (msg, ))的結果就是返回pool中所有進程的值的對象(注意是對象,不是值本身)

import multiprocessing
import time

def func(msg):
    return multiprocessing.current_process().name + '-' + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4) # 創建4個進程
    results = []
    for i in range(20):
        msg = "process %d" %(i)
        results.append(pool.apply_async(func, (msg, )))
    pool.close() # 關閉進程池,表示不能再往進程池中添加進程,需要在join之前調用
    pool.join() # 等待進程池中的所有進程執行完畢
    print ("Sub-process(es) done.")

    for res in results:
        print (res.get())

​ 與之前的輸出不同,這次的輸出是有序的。

​ 如果電腦是八核,建立8個進程,在Ubuntu下輸入top命令再按下大鍵盤的1,可以看到每個CPU的使用率是比較平均的

4 進程間通信方式

  1. 管道pipe:管道是一種半雙工的通信方式,數據只能單向流動,而且只能在具有親緣關系的進程間使用。進程的親緣關系通常是指父子進程關系。
  2. 命名管道FIFO:有名管道也是半雙工的通信方式,但是它允許無親緣關系進程間的通信。
  3. 消息隊列MessageQueue:消息隊列是由消息的鏈表,存放在內核中並由消息隊列標識符標識。消息隊列克服了信號傳遞信息少、管道只能承載無格式字節流以及緩沖區大小受限等缺點。
  4. 共享存儲SharedMemory:共享內存就是映射一段能被其他進程所訪問的內存,這段共享內存由一個進程創建,但多個進程都可以訪問。共享內存是最快的 IPC 方式,它是針對其他進程間通信方式運行效率低而專門設計的。它往往與其他通信機制,如信號兩,配合使用,來實現進程間的同步和通信。

以上幾種進程間通信方式中,消息隊列是使用的比較頻繁的方式。

(1)管道pipe

import multiprocessing

def foo(sk):
   sk.send('hello father')
   print(sk.recv())

if __name__ == '__main__':
   conn1,conn2=multiprocessing.Pipe()    #開辟兩個口,都是能進能出,括號中如果False即單向通信
   p=multiprocessing.Process(target=foo,args=(conn1,))  #子進程使用sock口,調用foo函數
   p.start()
   print(conn2.recv())  #主進程使用conn口接收
   conn2.send('hi son') #主進程使用conn口發送

(2)消息隊列Queue

Queue是多進程的安全隊列,可以使用Queue實現多進程之間的數據傳遞。

Queue的一些常用方法:

  • Queue.qsize():返回當前隊列包含的消息數量;
  • Queue.empty():如果隊列為空,返回True,反之False ;
  • Queue.full():如果隊列滿了,返回True,反之False;
  • Queue.get():獲取隊列中的一條消息,然后將其從列隊中移除,可傳參超時時長。
  • Queue.get_nowait():相當Queue.get(False),取不到值時觸發異常:Empty;
  • Queue.put():將一個值添加進數列,可傳參超時時長。
  • Queue.put_nowait():相當於Queue.get(False),當隊列滿了時報錯:Full。

案例:

from multiprocessing import Process, Queue
import time

def write(q):
   for i in ['A', 'B', 'C', 'D', 'E']:
      print('Put %s to queue' % i)
      q.put(i)
      time.sleep(0.5)

def read(q):
   while True:
      v = q.get(True)
      print('get %s from queue' % v)


if __name__ == '__main__':
   q = Queue()
   pw = Process(target=write, args=(q,))
   pr = Process(target=read, args=(q,))
   print('write process = ', pw)
   print('read  process = ', pr)
   pw.start()
   pr.start()
   pw.join()
   pr.join()
   pr.terminate()
   pw.terminate()

Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另一個進程的數據

注:進程間通信應該盡量避免使用共享數據的方式

5 多進程實現生產者消費者

以下通過多進程實現生產者,消費者模式

import multiprocessing
from multiprocessing import Process
from time import sleep
import time


class MultiProcessProducer(multiprocessing.Process):
   def __init__(self, num, queue):
      """Constructor"""
      multiprocessing.Process.__init__(self)
      self.num = num
      self.queue = queue

   def run(self):
      t1 = time.time()
      print('producer start ' + str(self.num))
      for i in range(1000):
         self.queue.put((i, self.num))
      # print 'producer put', i, self.num
      t2 = time.time()

      print('producer exit ' + str(self.num))
      use_time = str(t2 - t1)
      print('producer ' + str(self.num) + ', 
      use_time: '+ use_time)

class MultiProcessConsumer(multiprocessing.Process):
   def __init__(self, num, queue):
      """Constructor"""
      multiprocessing.Process.__init__(self)
      self.num = num
      self.queue = queue

   def run(self):
      t1 = time.time()
      print('consumer start ' + str(self.num))
      while True:
         d = self.queue.get()
         if d != None:
            # print 'consumer get', d, self.num
            continue
         else:
            break
      t2 = time.time()
      print('consumer exit ' + str(self.num))
      print('consumer ' + str(self.num) + ', use time:' + str(t2 - t1))

def main():
   # create queue
   queue = multiprocessing.Queue()
   # create processes
   producer = []
   for i in range(5):
      producer.append(MultiProcessProducer(i, queue))

   consumer = []
   for i in range(5):
      consumer.append(MultiProcessConsumer(i, queue))

   # start processes
   for i in range(len(producer)):
      producer[i].start()

   for i in range(len(consumer)):
      consumer[i].start()

   # wait for processs to exit
   for i in range(len(producer)):
      producer[i].join()

   for i in range(len(consumer)):
      queue.put(None)

   for i in range(len(consumer)):
      consumer[i].join()

   print('all done finish')


if __name__ == "__main__":
   main()

6 總結

​ python中的多進程創建有以下兩種方式:

(1)fork子進程

(2)采用 multiprocessing 這個庫創建子進程

​ 需要注意的是隊列中Queue.Queue是線程安全的,但並不是進程安全,所以多進程一般使用線程、進程安全的multiprocessing.Queue()

​ 另外, 進程池使用 multiprocessing.Pool實現,pool = multiprocessing.Pool(processes = 3),產生一個進程池,pool.apply_async實現非租塞模式,pool.apply實現阻塞模式。

apply_async和 apply函數,前者是非阻塞的,后者是阻塞。可以看出運行時間相差的倍數正是進程池數量。

​ 同時可以通過result.append(pool.apply_async(func, (msg, )))獲取非租塞式調用結果信息的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM