Python之進程


Python 之進程

理論知識

操作系統背景知識

  顧名思義,進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。
  進程的概念起源於操作系統,是操作系統最核心的概念,也是操作系統提供的最古老也是最重要的抽象概念之一。操作系統的其他所有內容都是圍繞進程的概念展開的。
  所以想要真正了解進程,必須事先了解操作系統,點擊進入
  PS:
   即使可以利用的 cpu 只有一個(早期的計算機確實如此),也能保證支持(偽)並發的能力。將一個單獨的 cpu 變成多個虛擬的 cpu (多道技術:時間多路復用和空間多路復用 + 硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。
  必備的理論基礎:

一 操作系統的作用:
1. 隱藏丑陋復雜的硬件接口,提供良好的抽象接口
2. 管理、調度進程,並且將多個進程對硬件的競爭變得有序
二 多道技術:
1. 產生背景:針對單核,實現並發
ps:
  現在的主機一般是多核,那么每個核都會利用多道技術。
  有4個 cpu ,運行於 cpu1 的某個程序遇到 io 阻塞,會等到 io 結束再重新調度,會被調度到4個 cpu 中的任意一個,具體由操作系統調度算法決定。
2. 空間上的復用:如內存中同時有多道程序
3. 時間上的復用:復用一個 cpu 的時間片
強調:
  遇到 io 切,占用 cpu 時間過長也切,核心在於切之前將進程的狀態保存下來,這樣才能保證下次切換回來時,能基於上次切走的位置繼續運行。

什么是進程

  進程( Process )是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。
  狹義定義
    進程是正在運行的程序的實例( an instance of a computer program that is being executed )。
  廣義定義:
    進程是一個具有一定獨立功能的程序關於某個數據集合的一次運行活動。它是操作系統動態執行的基本單元,在傳統的操作系統中,進程既是基本的分配單元,也是基本的執行單元。
   進程的概念

第一,進程是一個實體。每一個進程都有它自己的地址空間,一般情況下,包括文本區域( text region )、數據區域( data region )和堆棧( stack region )。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲着活動過程調用的指令和本地變量。
第二,進程是一個 “執行中的程序” 。程序是一個沒有生命的實體,只有處理器賦予程序生命時(操作系統執行之),它才能成為一個活動的實體,我們稱其為進程。
進程是操作系統中最基本、重要的概念。是多道程序系統出現后,為了刻畫系統內部出現的動態情況,描述系統內部各道程序的活動規律引進的一個概念,所有多道程序設計操作系統都建立在進程的基礎上。

   操作系統進入進程的概念的原因

從理論角度看,是對正在運行的程序過程的抽象;
從實現角度看,是一種數據結構,目的在於清晰地刻畫動態系統的內在規律,有效管理和調度進入計算機系統主存儲器運行的程序。

   進程的特征

動態性:進程的實質是程序在多道程序系統中的一次執行過程,進程是動態產生,動態消亡的。
並發性:任何進程都可以同其他進程一起並發執行
獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;
異步性:由於進程間的相互制約,使進程具有執行的間斷性,即進程按各自獨立的、不可預知的速度向前推進
結構特征:進程由程序、數據和進程控制塊三部分組成。
多個不同的進程可以包含相同的程序:一個程序在不同的數據集里就構成不同的進程,能得到不同的結果;但是執行過程中,程序不能發生改變。

   進程與程序中的區別

程序是指令和數據的有序集合,其本身沒有任何運行的含義,是一個靜態的概念。
而進程是程序在處理機上的一次執行過程,它是一個動態的概念。
程序可以作為一種軟件資料長期存在,而進程是有一定生命期的。
程序是永久的,進程是暫時的。

  注意:
    同一個程序執行兩次,就會在操作系統中出現兩個進程,所以我們可以同時運行一個軟件,分別做不同的事情也不會混亂。

進程調度

  要想多個進程交替運行,操作系統必須對這些進程進行調度,這個調度也不是隨即進行的,而是需要遵循一定的法則,由此就有了進程的調度算法。
   先來先服務調度算法

先來先服務( FCFS )調度算法是一種最簡單的調度算法,該算法既可用於作業調度,也可用於進程調度。 FCFS 算法比較有利於長作業(進程),而不利於短作業(進程)。由此可知,本算法適合於 CPU 繁忙型作業,而不利於 I/O 繁忙型的作業(進程)。

   短作業優先調度算法

短作業(進程)優先調度算法( SJ/PF )是指對短作業或短進程優先調度的算法,該算法既可用於作業調度,也可用於進程調度。但其對長作業不利;不能保證緊迫性作業(進程)被及時處理;作業的長短只是被估算出來的。

   時間片輪轉法

時間片輪轉( Round Robin , RR )法的基本思路是讓每個進程在就緒隊列中的等待時間與享受服務的時間成比例。在時間片輪轉法中,需要將 CPU 的處理時間分成固定大小的時間片,例如,幾十毫秒至幾百毫秒。如果一個進程在被調度選中之后用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放自己所占有的 CPU 而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。
顯然,輪轉法只能用來調度分配一些可以搶占的資源。這些可以搶占的資源可以隨時被剝奪,而且可以將它們再分配給別的進程。 CPU 是可搶占資源的一種。但打印機等資源是不可搶占的。由於作業調度是對除了 CPU 之外的所有系統硬件資源的分配,其中包含有不可搶占資源,所以作業調度不使用輪轉法。
在輪轉法中,時間片長度的選取非常重要。首先,時間片長度的選擇會直接影響到系統的開銷和響應時間。如果時間片長度過短,則調度程序搶占處理機的次數增多。這將使進程上下文切換次數也大大增加,從而加重系統開銷。反過來,如果時間片長度選擇過長,例如,一個時間片能保證就緒隊列中所需執行時間最長的進程能執行完畢,則輪轉法變成了先來先服務法。時間片長度的選擇是根據系統對響應時間的要求和就緒隊列中所允許最大的進程數來確定的。
在輪轉法中,加入到就緒隊列的進程有3種情況:
  一種是分給它的時間片用完,但進程還未完成,回到就緒隊列的末尾等待下次調度去繼續執行。
  另一種情況是分給該進程的時間片並未用完,只是因為請求 I/O 或由於進程的互斥與同步關系而被阻塞。當阻塞解除之后再回到就緒隊列。
  第三種情況就是新創建進程進入就緒隊列。
如果對這些進程區別對待,給予不同的優先級和時間片從直觀上看,可以進一步改善系統服務質量和效率。例如,我們可把就緒隊列按照進程到達就緒隊列的類型和進程被阻塞時的阻塞原因分成不同的就緒隊列,每個隊列按 FCFS 原則排列,各隊列之間的進程享有不同的優先級,但同一隊列內優先級相同。這樣,當一個進程在執行完它的時間片之后,或從睡眠中被喚醒以及被創建之后,將進入不同的就緒隊列。

   多級反饋隊列

前面介紹的各種用作進程調度的算法都有一定的局限性。如短進程優先的調度算法,僅照顧了短進程而忽略了長進程,而且如果並未指明進程的長度,則短進程優先和基於進程長度的搶占式調度算法都將無法使用。
而多級反饋隊列調度算法則不必事先知道各種進程所需的執行時間,而且還可以滿足各種類型進程的需要,因而它是目前被公認的一種較好的進程調度算法。在采用多級反饋隊列調度算法的系統中,調度算法的實施過程如下所述。
  1. 應設置多個就緒隊列,並為各個隊列賦予不同的優先級。第一個隊列的優先級最高,第二個隊列次之,其余各隊列的優先權逐個降低。該算法賦予各個隊列中進程執行時間片的大小也各不相同,在優先權愈高的隊列中,為每個進程所規定的執行時間片就愈小。例如,第二個隊列的時間片要比第一個隊列的時間片長1倍 …… 第 i+1 個隊列的時間片要比第 i 個隊列的時間片長一倍。
  2. 當一個新進程進入內存后,首先將它放入第一隊列的末尾,按 FCFS 原則排隊等待調度。當輪到該進程執行時,如它能在該時間片內完成,便可准備撤離系統;如果它在一個時間片結束時尚未完成,調度程序便將該進程轉入第二隊列的末尾,再同樣地按 FCFS 原則等待調度執行;如果它在第二隊列中運行一個時間片后仍未完成,再依次將它放入第三隊列 …… 如此下去,當一個長作業(進程)從第一隊列依次降到第 n 隊列后,在第 n 隊列便采取按時間片輪轉的方式運行。
  3. 僅當第一隊列空閑時,調度程序才調度第二隊列中的進程運行;僅當第 1 ~ (i-1) 隊列均空時,才會調度第 i 隊列中的進程運行。如果處理機正在第 i 隊列中為某進程服務時,又有新進程進入優先權較高的隊列(第 1~ (i-1) 中的任何一個隊列),則此時新進程將搶占正在運行進程的處理機,即由調度程序把正在運行的進程放回到第 i 隊列的末尾,把處理機分配給新到的高優先權進程。

進程的並發與並行

   並行
    並行是指兩者同時執行,比如賽跑,兩個人都在不停的往前跑;(資源夠用,比如三個線程,四核的 CPU )
   並發
    並發是指資源有限的情況下,兩者交替輪流使用資源,比如一段路(單核 CPU 資源)同時只能過一個人, A 走一段后,讓給 B , B 用完繼續給 A ,交替使用,目的是提高效率。
   區別
    並行是從微觀上,也就是在一個精確的時間片刻,有不同的程序在執行,這就要求必須有多個處理器。
    並發是從宏觀上,在一個時間段上可以看出是同時執行的,比如一個服務器同時處理多個 session 。

同步/異步/阻塞/非阻塞

狀態介紹

image
  在了解其他概念之前,我們首先要了解進程的幾個狀態。在程序運行的過程中,由於被操作系統的調度算法控制,程序會進入幾個狀態:就緒、運行和阻塞。
  1. 就緒( Ready )狀態

當進程已分配到除 CPU 以外的所有必要的資源,只要獲得處理機便可立即執行,這時的進程狀態稱為就緒狀態。

  2. 執行/運行( Running )

狀態當進程已獲得處理機,其程序正在處理機上執行,此時的進程狀態稱為執行狀態。

  3. 阻塞( Blocked )

狀態正在執行的進程,由於等待某個事件發生而無法執行時,便放棄處理機而處於阻塞狀態。引起進程阻塞的事件可有多種,例如,等待 I/O 完成、申請緩沖區不能滿足、等待信件(信號)等。

image

同步與異步

  所謂同步就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成,這是一種可靠的任務序列。成功都成功,失敗都失敗,兩個任務的狀態可以保持一致。
  所謂異步是不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什么工作,依賴的任務也立即執行,只要自己完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務無法確定,所以它是不可靠的任務序列。

比如我去銀行辦理業務,可能會有兩種方式:
  第一種 :選擇排隊等候;
  第二種 :選擇取一個小紙條上面有我的號碼,等到排到我這一號時由櫃台的人通知我輪到我去辦理業務了;
  第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業務情況;
  第二種:后者(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業務的人)往往注冊一個回調機制,在所等待的事件被觸發時由觸發機制(在這里是櫃台的人)通過某種機制(在這里是寫在小紙條上的號碼,喊號)找到等待該事件的人。

阻塞與非阻塞

  阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來說的。

繼續上面的那個例子,不論是排隊還是使用號碼等待通知,如果在這個等待的過程中,等待者除了等待消息通知之外不能做其它的事情,那么該機制就是阻塞的,表現在程序中,也就是該程序一直阻塞在該函數調用處不能繼續往下執行。
相反,有的人喜歡在銀行辦理這些業務的時候一邊打打電話發發短信一邊等待,這樣的狀態就是非阻塞的,因為他(等待者)沒有阻塞在這個消息通知上,而是一邊做自己的事情一邊等待。
注意:
  同步非阻塞形式實際上是效率低下的,想象一下你一邊打着電話一邊還需要抬頭看到底隊伍排到你了沒有。如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的;而異步非阻塞形式卻沒有這樣的問題,因為打電話是你(等待者)的事情,而通知你則是櫃台(消息觸發機制)的事情,程序沒有在兩種不同的操作中來回切換。

同步/異步與阻塞/非阻塞

  1. 同步阻塞形式
      效率最低。拿上面的例子來說,就是你專心排隊,什么別的事都不做。
  2. 異步阻塞形式
      如果在銀行等待辦理業務的人采用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間里他不能離開銀行做其它的事情,那么很顯然,這個人被阻塞在了這個等待的操作上面;
      異步操作是可以被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。
  3. 同步非阻塞形式
      實際上是效率低下的。
      想象一下你一邊打着電話一邊還需要抬頭看到底隊伍排到你了沒有,如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的。
  4. 異步非阻塞形式
      效率更高。
      因為打電話是你(等待者)的事情,而通知你則是櫃台(消息觸發機制)的事情,程序沒有在兩種不同的操作中來回切換。
      比如說,這個人突然發覺自己煙癮犯了,需要出去抽根煙,於是他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那么他就沒有被阻塞在這個等待的操作上面,自然這個就是 異步 + 非阻塞 的方式了。

  很多人會把同步和阻塞混淆,是因為很多時候同步操作會以阻塞的形式表現出來,同樣的,很多人也會把異步和非阻塞混淆,因為異步操作一般都不會在真正的 I/O 操作處被阻塞。

進程的創建與結束

進程的創建

  但凡是硬件,都需要有操作系統去管理,只要有操作系統,就有進程的概念,就需要有創建進程的方式,一些操作系統只為一個應用程序設計,比如微波爐中的控制器,一旦啟動微波爐,所有的進程都已經存在。
  而對於通用系統(跑很多應用程序),需要有系統運行過程中創建或撤銷進程的能力,主要分為4中形式創建新的進程:

1. 系統初始化(查看進程 linux 中用 ps 命令, windows 中用任務管理器,前台進程負責與用戶交互,后台運行的進程與用戶無關,運行在后台並且只在需要時才喚醒的進程,稱為守護進程,如電子郵件、 web 頁面、新聞、打印)
2. 一個進程在運行過程中開啟了子進程(如 nginx 開啟多進程、os.forksubprocess.Popen等)
3. 用戶的交互式請求,而創建一個新進程(如用戶雙擊暴風影音)
4. 一個批處理作業的初始化(只在大型機的批處理系統中應用)

  無論哪一種,新進程的創建都是由一個已經存在的進程執行了一個用於創建進程的系統調用而創建的。
  創建進程

1. 在 UNIX 中該系統調用是: forkfork 會創建一個與父進程一模一樣的副本,二者有相同的存儲映像、同樣的環境字符串和同樣的打開文件(在 shell 解釋器進程中,執行一個命令就會創建一個子進程)
2. 在 windows 中該系統調用是: CreateProcessCreateProcess 既處理進程的創建,也負責把正確的程序裝入新進程。

  關於創建子進程, UNIXwindows

1. 相同的是:進程創建后,父進程和子進程有各自不同的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另外一個進程。
2. 不同的是:在 UNIX 中,子進程的初始地址空間是父進程的一個副本。
提示:
  子進程和父進程是可以有只讀的共享內存區的。但是對於 windows 系統來說,從一開始父進程與子進程的地址空間就是不同的。

進程的創建

  1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用,發起系統調用正常退出,在 linux 中用 exit ,在 windows 中用 ExitProcess
  2. 出錯退出(自願,python a.pya.py 不存在)
  3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存、1/0 等,可以捕捉異常, try...except...
  4. 被其他進程殺死(非自願,如 kill -9

在 Python 程序中的進程操作

  之前我們已經了解了很多進程相關的理論知識,了解進程是什么應該不再困難了,剛剛我們已經了解了運行中的程序就是一個進程。所有的進程都是通過它的父進程來創建的。因此,運行起來的 python 程序也是一個進程,那么我們也可以在程序中再創建進程。多個進程可以實現並發效果,也就是說,當我們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以我們之前所學的知識,並不能實現創建進程這個功能,所以我們就需要借助 python 中強大的模塊。

multiprocess 模塊

  仔細說來, multiprocess 不是一個模塊而是 python 中一個操作、管理進程的包。 之所以叫 multi 是取自 multiple 的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。由於提供的子模塊非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分:創建進程部分、進程同步部分、進程池部分、進程之間數據共享。

multiprocess.Process 模塊

process 模塊介紹

   process 模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。

Process([group [, target [, name [, args [, kwargs]]]]])  # 由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)
'''
強調:
    1. 需要使用關鍵字的方式來指定參數
    2. args 指定的為傳給 target 函數的位置參數,是一個元組形式,必須有逗號

參數介紹:
    1. group 參數未使用,值始終為 None
    2. target 表示調用對象,即子進程要執行的任務
    3. args 表示調用對象的位置參數元組,args=(1, 2, 'egon')
    4. kwargs 表示調用對象的字典,kwargs={'name':'egon', 'age':18}
    5. name 為子進程的名稱
'''

  方法介紹

p.start()          # 啟動進程,並調用該子進程中的 p.run()
p.run()            # 進程啟動時運行的方法,正是它去調用 target 指定的函數,我們自定義類的類中一定要實現該方法
p.terminate()      # 強制終止進程 p ,不會進行任何清理操作,如果 p 創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果 p 還保存了一個鎖那么也將不會被釋放,進而導致死鎖
p.is_alive()       # 如果 p 仍然運行,返回 True
p.join([timeout])  # 主線程等待 p 終止(強調:是主線程處於等的狀態,而 p 是處於運行的狀態)。 timeout 是可選的超時時間,需要強調的是, p.join 只能 join 住 start 開啟的進程,而不能 join 住 run 開啟的進程

  屬性介紹

p.daemon    # 默認值為 False ,如果設為 True ,代表 p 為后台運行的守護進程,當 p 的父進程終止時, p 也隨之終止,並且設定為 True 后, p 不能創建自己的新進程,必須在 p.start() 之前設置
p.name      # 進程的名稱
p.pid       # 進程的 pid
p.exitcode  # 進程在運行時為 None 、如果為 –N ,表示被信號 N 結束(了解即可)
p.authkey   # 進程的身份驗證鍵,默認是由 os.urandom() 隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

  在Windows中使用 process 模塊的注意事項

在 Windows 操作系統中由於沒有 fork ( linux 操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將 process() 直接寫在文件中就會無限遞歸創建子進程報錯。所以必須把創建子進程的部分使用 if __name__ ==‘__main__’ 判斷保護起來, import 的時候,就不會遞歸運行了。

使用 process 模塊創建進程

  在一個 python 進程中開啟子進程, start 方法和並發效果。
  在 python 進程中啟動第一個子進程

import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    print('我是子進程')


if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    time.sleep(1)
    print('執行主進程的內容了')

'''
('hello', 'bob')
我是子進程
執行主進程的內容了
'''

  join 方法

import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)
    print('我是子進程')


if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    # p.join()
    print('我是父進程')

  查看主進程和子進程號

import os
from multiprocessing import Process


def f(x):
    print('子進程id :', os.getpid(), '父進程id :', os.getppid())
    return x * x


if __name__ == '__main__':
    print('主進程id :', os.getpid())
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()

  進階,多個進程同時運行(注意,子進程的執行順序不是根據啟動順序決定的)
  多進程同時運行

import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)

'''
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')

幾乎同時輸出
'''

  多進程同時運行,再談 join 方法(1)

import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
        p.join()
    # [p.join() for p in p_lst]
    print('父進程在執行')

'''
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')
父進程在執行

一個個打印出來,類似於同步執行的過程
'''

  多進程同時運行,再談 join 方法(2)

import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)

if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
    # [p.join() for p in p_lst]
    print('父進程在執行')
'''
父進程在執行
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')
('hello', 'bob')

使用join時,父進程等在子進程結束后再結束;不使用join時,主進程不等待子進程。
'''

  除了上面這些開啟進程的方法,還有一種以繼承Process類的形式開啟進程的方式
  通過繼承Process類開啟進程

import os
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(os.getpid())
        print('%s 正在和女主播聊天' % self.name)

p1 = MyProcess('wupeiqi')
p2 = MyProcess('yuanhao')
p3 = MyProcess('nezha')

p1.start()  # start 會自動調用 run
p2.start()
# p2.run()
p3.start()


p1.join()
p2.join()
p3.join()

print('主線程')

  進程之間的數據隔離問題
  進程之間的數據隔離問題

from multiprocessing import Process


def work():
    global n
    n = 0
    print('子進程內: ', n)


if __name__ == '__main__':
    n = 100
    p = Process(target=work)
    p.start()
    print('主進程內: ', n)

'''
('主進程內: ', 100)
('子進程內: ', 0)
'''

守護進程

  會隨着主進程的結束而結束。
  主進程創建守護進程
    其一,守護進程會在主進程代碼執行結束后就終止;
    其二,守護進程內無法再開啟子進程,否則拋出異常: AssertionError: daemonic processes are not allowed to have children

  注意
    進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止。
  守護進程的啟動

import os
import time
from multiprocessing import Process


class Myprocess(Process):
    def __init__(self, person):
        super().__init__()
        self.person = person

    def run(self):
        print(os.getpid(), self.name)
        print('%s正在和女主播聊天' % self.person)


p = Myprocess('哪吒')
p.daemon = True  # 一定要在 p.start() 前設置,設置 p 為守護進程,禁止 p 創建子進程,並且父進程代碼執行結束, p 即終止運行
p.start()
time.sleep(10)  # 在 sleep 時查看進程 id 對應的進程 ps -ef|grep id
print('主')

  主進程代碼執行結束守護進程立即結束

from multiprocessing import Process


def foo():
    print(123)
    time.sleep(1)
    print("end123")


def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1 = Process(target=foo)
p2 = Process(target=bar)

p1.daemon = True
p1.start()
p2.start()
time.sleep(0.1)
print("main-------")  # 打印該行則主進程代碼結束,則守護進程 p1 應該被終止.可能會有 p1 任務執行的打印信息123,因為主進程打印 main---- 時, p1 也執行了,但是隨即被終止.

socket 聊天並發實例

  使用多進程實現 socket 聊天並發- server

from socket import *
from multiprocessing import Process

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)


def talk(conn, client_addr):
    while True:
        try:
            msg = conn.recv(1024)
            if not msg:
                break
            conn.send(msg.upper())
        except Exception as e:
            break


if __name__ == '__main__':  # windows 下 start 進程一定要寫到這下面
    while True:
        conn, client_addr = server.accept()
        p = Process(target=talk, args=(conn, client_addr))
        p.start()

  使用多進程實現 socket 聊天並發- client

from socket import *

client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))

while True:
    msg = input('>>: ').strip()
    if not msg:
        continue

    client.send(msg.encode('utf-8'))
    msg = client.recv(1024)
    print(msg.decode('utf-8'))

多進程中的其他方法

  進程對象的其他方法:terninateis_alive

import time
import random
from multiprocessing import Process


class Myprocess(Process):
    def __init__(self, person):
        self.name = person
        super().__init__()

    def run(self):
        print('%s正在和網紅臉聊天' % self.name)
        time.sleep(random.randrange(1, 5))
        print('%s還在和網紅臉聊天' % self.name)


p1 = Myprocess('哪吒')
p1.start()

p1.terminate()  # 關閉進程,不會立即關閉,所以 is_alive 立刻查看的結果可能還是存活
print(p1.is_alive())  # 結果為 True

print('開始')
print(p1.is_alive())  # 結果為 False

  進程對象的其他屬性:pidname

class Myprocess(Process):
    def __init__(self, person):
        self.name = person  # name 屬性是 Process 中的屬性,標示進程的名字
        super().__init__()  # 執行父類的初始化方法會覆蓋 name 屬性
        # self.name = person  # 在這里設置就可以修改進程名字了
        # self.person = person  # 如果不想覆蓋進程名,就修改屬性名稱就可以了

    def run(self):
        print('%s正在和網紅臉聊天' % self.name)
        # print('%s正在和網紅臉聊天' % self.person)
        time.sleep(random.randrange(1, 5))
        print('%s正在和網紅臉聊天' % self.name)
        # print('%s正在和網紅臉聊天' % self.person)


 p1 = Myprocess('哪吒')
 p1.start()
 print(p1.pid)  # 可以查看子進程的進程 id

進程同步控制

鎖 —— multiprocess.Lock

  通過剛剛的學習,我們千方百計實現了程序的異步,讓多個任務可以同時在幾個進程中並發處理,他們之間的運行沒有順序,一旦開啟也不受我們控制。盡管並發編程讓我們能更加充分的利用 I/O 資源,但是也給我們帶來了新的問題。
  當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。
  多進程搶占資源輸出

import os
import time
import random
from multiprocessing import Process


def work(n):
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' % (n, os.getpid()))


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=work, args=(i,))
        p.start()

  使用鎖維護執行順序

# 由並發變成了串行,犧牲了運行效率,但避免了競爭
import os
import time
import random
from multiprocessing import Process,Lock


def work(lock, n):
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(3):
        p = Process(target=work, args=(lock, i))
        p.start()

  上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。
  接下來,我們以模擬搶票為例,來看看數據安全的重要性。
  多進程同時搶購余票

# 文件 db 的內容為:{"count":1}
# 注意一定要用雙引號,不然 json 無法識別
# 並發運行,效率高,但競爭寫同一文件,數據寫入錯亂
import time,json,random
from multiprocessing import Process,Lock


def search():
    dic = json.load(open('db'))
    print('\033[43m剩余票數%s\033[0m' % dic['count'])


def get():
    dic = json.load(open('db'))
    time.sleep(0.1)  # 模擬讀數據的網絡延遲
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(0.2)  # 模擬寫數據的網絡延遲
        json.dump(dic, open('db', 'w'))
        print('\033[43m購票成功\033[0m')


def task():
    search()
    get()


if __name__ == '__main__':
    for i in range(100):  # 模擬並發100個客戶端搶票
        p = Process(target=task)
        p.start()

  使用鎖來保證數據安全

# 文件 db 的內容為:{"count":5}
# 注意一定要用雙引號,不然 json 無法識別
# 並發運行,效率高,但競爭寫同一文件,數據寫入錯亂
import time,json,random
from multiprocessing import Process,Lock


def search():
    dic = json.load(open('db'))
    print('\033[43m剩余票數%s\033[0m' % dic['count'])


def get():
    dic = json.load(open('db'))
    time.sleep(random.random())  # 模擬讀數據的網絡延遲
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(random.random())  # 模擬寫數據的網絡延遲
        json.dump(dic, open('db', 'w'))
        print('\033[32m購票成功\033[0m')
    else:
        print('\033[31m購票失敗\033[0m')


def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(100):  # 模擬並發100個客戶端搶票
        p = Process(target=task, args=(lock,))
        p.start()

  加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
  雖然可以用文件共享數據實現進程間通信,但問題是:

1. 效率低(共享數據基於文件,而文件是硬盤上的數據)
2. 需要自己加鎖處理

  因此我們最好找尋一種解決方案能夠兼顧:

1. 效率高(多個進程共享一塊內存的數據)
2. 幫我們處理好鎖問題。

  這就是 mutiprocessing 模塊為我們提供的基於消息的 IPC 通信機制:隊列和管道。
  隊列和管道都是將數據存放於內存中
  隊列又是基於(管道 + 鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
  我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

進程間通信 —— 隊列( multiprocess.Queue

進程間通信

  IPC(Inter-Process Communication)

隊列

概念介紹

  創建共享的進程隊列, Queue 是多進程安全的隊列,可以使用 Queue 實現多進程之間的數據傳遞。

Queue([maxsize])
 
# 創建共享的進程隊列。
# 參數 :
#     maxsize 是隊列中允許的最大項數。如果省略此參數,則無大小限制。
# 底層隊列使用管道和鎖定實現。

  方法介紹

Queue([maxsize]) 
# 創建共享的進程隊列。 maxsize 是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。 

# Queue 的實例 q 具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
# 返回 q 中的一個項目。如果 q 為空,此方法將阻塞,直到隊列中有項目可用為止。 
# block 用於控制阻塞行為,默認為 True . 如果設置為 False ,將引發 Queue.Empty 異常(定義在 Queue 模塊中)。 
# timeout 是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發 Queue.Empty 異常。

q.get_nowait( ) 
# 同 q.get(False) 方法。

q.put(item [, block [,timeout ] ] ) 
# 將 item 放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。
# block 控制阻塞行為,默認為 True 。如果設置為 False ,將引發 Queue.Empty 異常(定義在 Queue 庫模塊中)。
# timeout 指定在阻塞模式中等待可用空間的時間長短。超時后將引發 Queue.Full 異常。

q.qsize() 
# 返回隊列中目前項目的正確數量。
# 此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。
# 在某些系統上,此方法可能引發 NotImplementedError 異常。

q.empty() 
# 如果調用此方法時 q 為空,返回 True 。
# 如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
# 如果 q 已滿,返回為 True . 
# 由於線程的存在,結果也可能是不可靠的(參考 q.empty()方法)。

  其他方法(了解)

q.close() 
# 關閉隊列,防止隊列中加入更多數據。
# 調用此方法時,后台線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。
# 如果 q 被垃圾收集,將自動調用此方法。
# 關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在 get() 操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。

q.join_thread() 
# 連接隊列的后台線程。此方法用於在調用 q.close() 方法后,等待所有隊列項被消耗。
# 默認情況下,此方法由不是 q 的原始創建者的所有進程調用。
# 調用 q.cancel_join_thread() 方法可以禁止這種行為。

q.cancel_join_thread() 
# 不會在進程退出時自動連接后台線程。這可以防止 join_thread() 方法阻塞。
代碼實例

  單看隊列用法

'''
multiprocessing 模塊支持進程間通信的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,但是隊列接口
'''
from multiprocessing import Queue

q = Queue(3)

# put, get, put_nowait, get_nowait, full, empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)  # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。
            # 如果隊列中的數據一直不被取走,程序就會永遠停在這里。

try:
    q.put_nowait(3)  # 可以使用 put_nowait ,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
except:  # 因此我們可以用一個 try 語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。
    print('隊列已經滿了')

# 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續 put 了。
print(q.full())  # 滿了

print(q.get())
print(q.get())
print(q.get())
# print(q.get())  # 同 put 方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。

try:
    q.get_nowait(3)  # 可以使用 get_nowait ,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。
except:  # 因此我們可以用一個 try 語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了')

print(q.empty())  # 空了

  上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。
  子進程發送數據給父進程

import time
from multiprocessing import Process, Queue


def f(q):
    q.put([time.asctime(), 'from Eva', 'hello'])  # 調用主函數中 p 進程傳遞過來的進程參數 put 函數為向隊列中添加一條數據。


if __name__ == '__main__':
    q = Queue()  # 創建一個 Queue 對象
    p = Process(target=f, args=(q,))  # 創建一個進程
    p.start()
    print(q.get())
    p.join()

  上面是一個 queue 的簡單應用,使用隊列 q 對象調用 get 函數來取得隊列中最先進入的數據。 接下來看一個稍微復雜一些的例子:
  批量生產數據放入隊列再批量獲取結果 x

import os
import time
import multiprocessing


# 向 queue 中輸入數據的函數
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)


# 向 queue 中輸出數據的函數
def outputQ(queue):
    info = queue.get()
    print ('%s%s\033[32m%s\033[0m' % (str(os.getpid()), '(get):', info))


# Main
if __name__ == '__main__':
    multiprocessing.freeze_support()
    record1 = []  # store input processes
    record2 = []  # store output processes
    queue = multiprocessing.Queue(3)

    # 輸入進程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ, args=(queue,))
        process.start()
        record1.append(process)

    # 輸出進程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ, args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()

生產者消費者模型

  在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
  為什么要使用生產者和消費者模式
  在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
  什么是生產者消費者模式
  生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
  基於隊列實現生產者消費者模型
  基於隊列實現生產者消費者模型

import time, random, os
from multiprocessing import Process, Queue


def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))


def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q,))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    c1.start()
    print('主')

  此時的問題是主進程永遠不會結束,原因是:生產者 p 在生產完后就結束了,但是消費者 c 在取空了 q 之后,則一直處於死循環中且卡在 q.get() 這一步。
  解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以 break 出死循環。
  改良版——生產者消費者模型

import time, random, os
from multiprocessing import Process, Queue


def consumer(q):
    while True:
        res = q.get()
        if res is None:
            break  # 收到結束信號則結束
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))


def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))
    q.put(None)  # 發送結束信號


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q,))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    c1.start()
    print('主')

  注意:
    結束信號 None ,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束后才應該發送該信號
  主進程在生產者生產完畢后發送結束信號None

import time, random, os
from multiprocessing import Process, Queue


def consumer(q):
    while True:
        res = q.get()
        if res is None:
            break  # 收到結束信號則結束
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))


def producer(q):
    for i in range(2):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q,))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    c1.start()

    p1.join()
    q.put(None)  # 發送結束信號
    print('主')

  但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很 low 的方式去解決
  多個消費者的例子:有幾個消費者就需要發送幾次結束信號

import time, random, os
from multiprocessing import Process, Queue


def consumer(q):
    while True:
        res = q.get()
        if res is None:
            break  # 收到結束信號則結束
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))


def producer(name, q):
    for i in range(2):
        time.sleep(random.randint(1, 3))
        res='%s%s' % (name, i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨頭', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join()  # 必須保證生產者全部生產完畢,才應該發送結束信號
    p2.join()
    p3.join()
    q.put(None)  # 有幾個消費者就應該發送幾次結束信號 None
    q.put(None)  # 發送結束信號
    print('主')

  JoinableQueue([maxsize])
  創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
  方法介紹

# JoinableQueue 的實例 p 除了與 Queue 對象相同的方法之外,還具有以下方法:

q.task_done() 
# 使用者使用此方法發出信號,表示 q.get() 返回的項目已經被處理。
# 如果調用此方法的次數大於從隊列中刪除的項目數量,將引發 ValueError 異常。

q.join() 
# 生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用 q.task_done() 方法為止。 

  下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
  JoinableQueue 隊列實現消費者生產者模型

import time, random, os
from multiprocessing import Process, JoinableQueue


def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))
        q.task_done()  # 向 q.join() 發送一次信號,證明一個數據已經被取走了


def producer(name, q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res='%s%s' % (name, i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))
    q.join()  # 生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。


if __name__ == '__main__':
    q = JoinableQueue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨頭', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True
    c2.daemon = True

    # 開始
    p_l = [p1, p2, p3, c1, c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('主') 
    
    # 主進程等 ---> p1, p2, p3 等 ---> c1, c2
    # p1, p2, p3 結束了,證明 c1, c2 肯定全都收完了 p1, p2, p3 發到隊列的數據
    # 因而 c1, c2 也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,所以設置成守護進程就可以了。

進程間的數據共享

  展望未來,基於消息傳遞的並發編程是大勢所趨。
  即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合,通過消息隊列交換數據。
  這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統中。
  但進程間應該盡量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。
  以后我們會嘗試使用數據庫來解決現在進程之間的數據共享問題。
  Manager 模塊介紹

進程間數據是獨立的,可以借助於隊列或管道實現通信,二者都是基於消息傳遞的
雖然進程間數據獨立,但可以通過 Manager 實現數據共享,事實上 Manager 的功能遠不止於此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

  Manager 例子

from multiprocessing import Manager, Process, Lock


def work(d, lock):
    with lock:  # 不加鎖而操作共享的數據,肯定會出現數據錯亂
        d['count'] -= 1


if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
        dic= m.dict({'count':100})
        p_l = []

        for i in range(100):
            p = Process(target=work, args=(dic, lock))
            p_l.append(p)
            p.start()

        for p in p_l:
            p.join()

        print(dic)

進程池和 multiprocess.Pool 模塊

進程池

  為什么要有進程池?(進程池的概念。)
  在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。那么我們要怎么做呢?
  在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。

multiprocess.Pool 模塊

概念介紹
Pool([numprocess  [, initializer [, initargs]]])  # 創建進程池

  參數介紹

numprocess   # 要創建的進程數,如果省略,將默認使用 cpu_count() 的值
initializer  # 是每個工作進程啟動時要執行的可調用對象,默認為 None
initargs     # 是要傳給 initializer 的參數組

  主要方法

p.apply(func [, args [, kwargs]])  # 在一個池工作進程中執行 func(*args, **kwargs) ,然后返回結果。
'''
需要強調的是:
    此操作並不會在所有池工作進程中並執行 func 函數。
    如果要通過不同參數並發地執行 func 函數,必須從不同線程調用 p.apply() 函數或者使用 p.apply_async()
'''

p.apply_async(func [, args [, kwargs]])  # 在一個池工作進程中執行 func(*args, **kwargs) ,然后返回結果。
'''
此方法的結果是 AsyncResult 類的實例, callback 是可調用對象,接收輸入參數。
當 func 的結果變為可用時,將理解傳遞給 callback 。 callback 禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。
'''
   
p.close()  # 關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成

P.join()  # 等待所有工作進程退出。此方法只能在 close() 或 teminate() 之后調用

  其他方法(了解)

# 方法 apply_async() 和 map_async() 的返回值是 AsyncResul 的實例 obj 。
# 實例具有以下方法:
 
obj.get()            # 返回結果,如果有必要則等待結果到達。timeout 是可選的。如果在指定時間內還沒有到達,將引發異常。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。

obj.ready()          # 如果調用完成,返回 True

obj.successful()     # 如果調用完成且沒有引發異常,返回 True ,如果在結果就緒之前調用此方法,引發異常

obj.wait([timeout])  # 等待結果變為可用。

obj.terminate()      # 立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果 p 被垃圾回收,將自動調用此函數
代碼實例

  進程池和多進程效率對比
  p.map 進程池和進程效率測試


  同步和異步
  進程池的同步調用

import os, time
from multiprocessing import Pool


def work(n):
    print('%s run' % os.getpid())
    time.sleep(3)
    return n ** 2


if __name__ == '__main__':
    p = Pool(3)  # 進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
    res_l = []
    for i in range(10):
        res = p.apply(work, args=(i,))  # 同步調用,直到本次任務執行完畢拿到 res ,等待任務 work 執行的過程中可能有阻塞也可能沒有阻塞
                                        # 但不管該任務是否存在阻塞,同步調用都會在原地等着
        res_l.append(res)
    print(res_l)

  進程池的異步調用

import os
import time
import random
from multiprocessing import Pool


def work(n):
    print('%s run' % os.getpid())
    time.sleep(random.random())
    return n ** 2


if __name__ == '__main__':
    p = Pool(3)  # 進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
    res_l = []
    for i in range(10):
        res = p.apply_async(work, args=(i,))  # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
                                              # 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務
                                              # 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束
                                              # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。  
        res_l.append(res)

    # 異步 apply_async 用法:如果使用異步提交的任務,主進程需要使用 join ,等待進程池內任務都處理完,然后可以用 get 收集結果
    # 否則,主進程結束,進程池可能還沒來得及執行,也就跟着一起結束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get())  # 使用 get 來獲取 apply_aync 的結果,如果是 apply ,則沒有 get 方法,因為 apply 是同步執行,立刻獲取結果,也根本無需 get

  練習
  server: 進程池版 socket 並發聊天

# Pool 內的進程數默認是 cpu 核數,假設為4(查看方法 os.cpu_count() )
# 開啟6個客戶端,會發現2個客戶端處於等待狀態
# 在每個進程內查看 pid,會發現 pid 使用為4個,即多個客戶端公用4個進程
import os
from socket import *
from multiprocessing import Pool

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)


def talk(conn):
    print('進程pid: %s' % os.getpid())
    while True:
        try:
            msg = conn.recv(1024)
            if not msg:
                break
            conn.send(msg.upper())
        except Exception:
            break


if __name__ == '__main__':
    p = Pool(4)
    while True:
        conn, *_ = server.accept()
        p.apply_async(talk, args=(conn,))
        # p.apply(talk, args=(conn, client_addr))  # 同步的話,則同一時間只有一個客戶端能訪問

  client

from socket import *

client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))

while True:
    msg = input('>>: ').strip()
    if not msg:
        continue

    client.send(msg.encode('utf-8'))
    msg = client.recv(1024)
    print(msg.decode('utf-8'))

  發現:
    並發開啟多個客戶端,服務端同一時間只有4個不同的 pid ,只能結束一個客戶端,另外一個客戶端才會進來.
  回調函數
  需要回調函數的場景:
    進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
    我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了 I/O 的過程,直接拿到的是任務的結果。
  使用多進程請求多個 URL 來減少網絡等待浪費的時間

import requests
import json
import os
from multiprocessing import Pool


def get_page(url):
    print('<進程%s> get %s' % (os.getpid(), url))
    respone = requests.get(url)
    if respone.status_code == 200:
        return {'url': url, 'text': respone.text}


def pasrse_page(res):
    print('<進程%s> parse %s' % (os.getpid(), res['url']))
    parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text']))
    with open('db.txt', 'a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls = [
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p = Pool(3)
    res_l = []
    for url in urls:
        res = p.apply_async(get_page, args=(url,), callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l])  # 拿到的是 get_page 的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了

'''
打印結果:
<進程3388> get https://www.baidu.com
<進程3389> get https://www.python.org
<進程3390> get https://www.openstack.org
<進程3388> get https://help.github.com/
<進程3387> parse https://www.baidu.com
<進程3389> get http://www.sina.com.cn/
<進程3387> parse https://www.python.org
<進程3387> parse https://help.github.com/
<進程3387> parse http://www.sina.com.cn/
<進程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''

  爬蟲實驗

import re
from urllib.request import urlopen
from multiprocessing import Pool


def get_page(url, pattern):
    response = urlopen(url).read().decode('utf-8')
    return pattern, response


def parse_page(info):
    pattern, page_content = info
    res = re.findall(pattern, page_content)
    for item in res:
        dic={
            'index': item[0].strip(),
            'title': item[1].strip(),
            'actor': item[2].strip(),
            'time': item[3].strip(),
        }
        print(dic)


if __name__ == '__main__':
    regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
    pattern1 = re.compile(regex, re.S)

    url_dic = {
        'http://maoyan.com/board/7': pattern1,
    }

    p = Pool()
    res_l = []
    for url, pattern in url_dic.items():
        res = p.apply_async(get_page, args=(url, pattern), callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

  如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數
  無需回調函數

import time, random, os
from multiprocessing import Pool


def work(n):
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = Pool()

    res_l = []
    for i in range(10):
        res = p.apply_async(work, args=(i,))
        res_l.append(res)

    p.close()
    p.join()  # 等待進程池中所有進程執行完畢

    nums = []
    for res in res_l:
        nums.append(res.get())  # 拿到所有結果
    print(nums)  # 主進程拿到所有的處理結果,可以在主進程中進行統一進行處理

進程池的其他實現方式:https://docs.python.org/dev/library/concurrent.futures.html

參考資料:
python之路——進程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM