python進程和線程(五)


python的進程

        由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到並發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

  multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

 

但在使用這些共享API的時候,我們要注意以下幾點:

  • 在UNIX平台上,當某個進程終結之后,該進程需要被其父進程調用wait,否則進程成為僵屍進程(Zombie)。所以,有必要對每個Process對象調用join()方法 (實際上等同於wait)。對於多線程來說,由於只有一個進程,所以不存在此必要性。
  • multiprocessing提供了threading包中沒有的IPC(比如Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因為它們占據的不是用戶進程的資源)。
  • 多進程應該避免共享資源。在多線程中,我們可以比較容易地共享資源,比如使用全局變量或者傳遞參數。在多進程情況下,由於每個進程有自己獨立的內存空間,以上方法並不合適。此時我們可以通過共享內存和Manager的方法來共享資源。但這樣做提高了程序的復雜度,並因為同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果進程還沒有start(),則PID為None。

window系統下,需要注意的是要想啟動一個子進程,必須加上那句if __name__ == "main",進程相關的要寫在這句下面。

1.進程的調用

         進程調用方式和線程一樣,也分為直接調用和類方法調用:

直接調用:

from multiprocessing import Process
import os

def func(num):
    print ('我是%s'%num)
    print('我的進程號',os.getpid())

if __name__ == '__main__':
    L = []
    for i in range(20):
        p = Process(target=func,args=(i,))
        L.append(p)
        p.start()

    for l in L:
        l.join()

    print('ending...')
View Code

類方法調用:

from multiprocessing import Process
import os

class MyProcess(Process):
    def __init__(self,num):
        super(MyProcess,self).__init__()
        self.num = num

    def run(self):
        print('我是%s'%self.num)
        print('父進程PID號是',os.getppid())
        print('我的pid號是',self.pid)

if __name__ == '__main__':
    L = []
    print('main',os.getpid())
    for i in range(20):
        p = MyProcess(i)
        L.append(p)
        p.start()

    for l in L:
        l.join()
View Code

2.Process類

構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
  group: 線程組,目前還沒有實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。
實例方法:
  is_alive():返回進程是否在運行。
  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
  start():進程准備就緒,等待CPU調度
  run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。
  terminate():不管任務是否完成,立即停止工作進程
屬性:
  authkey
  daemon:和線程的setDeamon功能一樣
  exitcode(進程在運行時為None、如果為–N,表示被信號N結束)
  name:進程名字。
  pid:進程號。

3.進程通信

        從一開始講概念我們知道,線程之前是共享進程里面的數據集的,所以線程之間的通信是比較方便的,進程之前沒有這個數據集,那應該怎么通信呢?回想之前的線程有線程隊列,進程是不是也有進程的隊列呢?那肯定是有的:

進程隊列:

import multiprocessing

def Foo(q):
    print(q.get())
    print(q.get())
    print(q.get())


if __name__ == '__main__':
    L = []
    queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=Foo,args=(queue,))
    p.start()

    queue.put({'name:pengfy'})
    queue.put([1,2,3,4,5])
    queue.put('qaq')

    p.join()   #注意join放的位置
View Code

        這里主進程放置了3三元素到隊列,子進程取到並打印出來了,這就是一次簡單的進程間的通信,這里要注意join的位置。

進程管道:

         管道Pipe()函數返回一個由管道連接的連接對象(類似socket通信里面的conn),默認情況下是雙工(雙向):

#管道,類似socket里面的conn
#
from multiprocessing import Process,Pipe

def connect(conn):

    conn.send([12, {"name":"pengfy"}, 'hello'])
    print(conn.recv())
    conn.close()
    print('son2', id(conn))

if __name__ == "__main__":
    parent_conn,child_conn = Pipe()  #雙向管道
    print('son1',id(child_conn))
    p = Process(target=connect,args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    parent_conn.send('孩子你好')
    p.join()
View Code

       看打印的id不一樣,說明用的不是一份數據,是復制過去的。Pipe()返回的兩個連接對象代表管道的兩端。 每個連接對象都有send()和recv()方法(以及其他方法)。 請注意,如果兩個進程(或線程)同時嘗試讀取或寫入管道的同一端,則管道中的數據可能會損壞。 當然,同時使用管道的不同端的進程不存在損壞的風險

Manages

        上面兩種類型,都是數據的傳輸,其實用的比較多的還是Manages(注意大小寫)。Manager()返回的管理器對象控制一個服務器進程,該進程保存Python對象並允許其他進程使用代理操作它們

View Code

        例子里面只列舉了幾種數據類型,總共支持的有listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray.

4.進程同步

       線程里面我們講過線程同步,通過采用線程鎖可以解決這個問題。那么進程有沒有這個問題呢?肯定是有的,比如說:當進程共用一個資源時,需要同步,比如屏幕,不同步的話打印異常(用python2打印比較容易出現)

from multiprocessing import Process

def func(i):

    print('hello',i)


if __name__ == '__main__':
    L = []
    for i in range(10):
        p = Process(target=func,args=(i,))
        p.start()
        L.append(p)
    for l in L:
        l.join()
View Code

 

       像這種情況,在進程里面也有一把鎖來控制:

# from multiprocessing import Process,Lock
#
# def func(lock,i):
#     lock.acquire()
#     print('hello',i)
#     lock.release()
#
# # def func(lock,i):
# #     with lock:
# #         print('hello',i)
#
# if __name__ == '__main__':
#     lock = Lock()
#     L = []
#     for i in range(10):
#         p = Process(target=func,args=(lock,i,))
#         p.start()
#         L.append(p)
#     for l in L:
#         l.join()
View Code

        這樣怎么運行都不會出現上面那種情況了,不信可以試試。

5.進程池

       進程池是什么?就是一個池子,因為開多個進程比較容易消耗資源,所以需要控制同時執行的進程時,就可以用進程池來控制,進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:

  • apply(同步接口,一般用不上)
  • apply_async

看個例子:

from multiprocessing import Process,Pool
import time,os

def func(i):
    time.sleep(1)
    print(i)
    print("son",os.getpid())

    return "HELLO %s"%i

def tag(arg):  #默認帶有一個參數,是上面那個子進程的返回值
    print(arg)

if __name__ == '__main__':
    pool = Pool(5)
    print("main pid", os.getpid())
    for i in range(100):
        # pool.apply(func=Foo, args=(i,))  #同步接口
        # pool.apply_async(func=Foo, args=(i,))

        # 回調函數:  就是某個動作或者函數執行成功后再去執行的函數,比如子進程運行完后都要打印log,就可以統一在回調函數里面操作

        pool.apply_async(func=func, args=(i,), callback=tag)

    pool.close()
    pool.join()  # join與close調用順序是固定的

    print('end')
View Code

       里面涉及到一個回調函數的概念,就是某個動作或者函數執行成功后再去執行的函數,這個例子里面看的不明顯,就是在每個進程執行完,都會打印一個數,你完全可以加在你的子進程函數里面去打印嘛,為什么還有用進程函數?當你每個進程,要需要做同一件事情的時候,就可以用回調函數了,這樣消耗更小。

       進程的知識相對簡單,線程和進程都說完了,還有內容嗎?是的,還有一個協程的內容下一篇再說。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM