並發編程-進程
相關概念
進程
進程:正則進行的一個過程或者說一個任務,而負責執行任務的則是CPU。進程是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統基礎的結構。早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。
狹義定義:進程是正在運行的程序的實例。
廣義定義:進程是一個具有一定獨功能的程序關於某個數據集合的一次運動活體。它是操作系統動態執行的基本單元,在傳統的操作系統中,進程既是基本的分配單元,也是基本的執行單元。

第一,進程是一個實體。每一個進程都有它自己的地址空間,一般情況下,包括文本區域(text region)、數據區域(data region)和堆棧(stack region)。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲着活動過程調用的指令和本地變量。 第二,進程是一個“執行中的程序”。程序是一個沒有生命的實體,只有處理器賦予程序生命時(操作系統執行之),它才能成為一個活動的實體,我們稱其為進程。[3] 進程是操作系統中最基本、重要的概念。是多道程序系統出現后,為了刻畫系統內部出現的動態情況,描述系統內部各道程序的活動規律引進的一個概念,所有多道程序設計操作系統都建立在進程的基礎上。

動態性:進程的實質是程序在多道程序系統中的一次執行過程,進程是動態產生,動態消亡的。
並發性:任何進程都可以同其他進程一起並發執行
獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;
異步性:由於進程間的相互制約,使進程具有執行的間斷性,即進程按各自獨立的、不可預知的速度向前推進
結構特征:進程由程序、數據和進程控制塊三部分組成。
多個不同的進程可以包含相同的程序:一個程序在不同的數據集里就構成不同的進程,能得到不同的結果;但是執行過程中,程序不能發生改變。

程序是指令和數據的有序集合,其本身沒有任何運行的含義,是一個靜態的概念。
而進程是程序正在處理機上的一次執行過程,它是一個動態的概念。
程序可以作為一種軟件資料長期存在,而進程是有一定的生命期的。
程序是永久的,進程是暫時的。
注意:同一個程序執行兩次,就會在操作系統中出現兩個進程,所以我們可以同時運行一個軟件,分別做不同的事情也不會混亂。
進程調度
想要多個進程交替運行,操作系統必須對這些進程進行調度,這個調度也不是隨即進行的,而是需要遵行一定的法則,由此就有了進程的調度算法。

先來先服務(FCFS)調度算法是一種最簡單的調度算法,該算法既可以作業調度,也可以作用域進程調度。FCFS算法比較有利於長作業(進程),而不利於短作業(進程)。由此可知,本算法適合於CPU繁忙型作業,而不利於I/O繁忙型作業(進程)。

短作業(進程)優先調度算法(SJ/PF)是指對短作業或者短進程優先調度的算法,該算法既可以用於作業調度,也可用於進程調度。但其對長作業不利;不能保證緊迫性作業(進程)被及時處理;作業的長短只是被估算出來的。

時間片輪轉(Round Robin,RR)法的基本思路是讓每個進程在就緒隊列中的等待時間與享受服務的時間成比例。在時間片輪轉法中,需要將CPU的處理時間分成固定大小的時間片,例如,幾十毫秒至幾百毫秒。如果一個進程在被調度選中之后用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放自己所占有的CPU而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。 顯然,輪轉法只能用來調度分配一些可以搶占的資源。這些可以搶占的資源可以隨時被剝奪,而且可以將它們再分配給別的進程。CPU是可搶占資源的一種。但打印機等資源是不可搶占的。由於作業調度是對除了CPU之外的所有系統硬件資源的分配,其中包含有不可搶占資源,所以作業調度不使用輪轉法。 在輪轉法中,時間片長度的選取非常重要。首先,時間片長度的選擇會直接影響到系統的開銷和響應時間。如果時間片長度過短,則調度程序搶占處理機的次數增多。這將使進程上下文切換次數也大大增加,從而加重系統開銷。反過來,如果時間片長度選擇過長,例如,一個時間片能保證就緒隊列中所需執行時間最長的進程能執行完畢,則輪轉法變成了先來先服務法。時間片長度的選擇是根據系統對響應時間的要求和就緒隊列中所允許最大的進程數來確定的。 在輪轉法中,加入到就緒隊列的進程有3種情況: 一種是分給它的時間片用完,但進程還未完成,回到就緒隊列的末尾等待下次調度去繼續執行。 另一種情況是分給該進程的時間片並未用完,只是因為請求I/O或由於進程的互斥與同步關系而被阻塞。當阻塞解除之后再回到就緒隊列。 第三種情況就是新創建進程進入就緒隊列。 如果對這些進程區別對待,給予不同的優先級和時間片從直觀上看,可以進一步改善系統服務質量和效率。例如,我們可把就緒隊列按照進程到達就緒隊列的類型和進程被阻塞時的阻塞原因分成不同的就緒隊列,每個隊列按FCFS原則排列,各隊列之間的進程享有不同的優先級,但同一隊列內優先級相同。這樣,當一個進程在執行完它的時間片之后,或從睡眠中被喚醒以及被創建之后,將進入不同的就緒隊列。

前面介紹的各種用作進程調度的算法都有一定的局限性。如短進程優先的調度算法,僅照顧了短進程而忽略了長進程,而且如果並未指明進程的長度,則短進程優先和基於進程長度的搶占式調度算法都將無法使用。 而多級反饋隊列調度算法則不必事先知道各種進程所需的執行時間,而且還可以滿足各種類型進程的需要,因而它是目前被公認的一種較好的進程調度算法。在采用多級反饋隊列調度算法的系統中,調度算法的實施過程如下所述。 (1) 應設置多個就緒隊列,並為各個隊列賦予不同的優先級。第一個隊列的優先級最高,第二個隊列次之,其余各隊列的優先權逐個降低。該算法賦予各個隊列中進程執行時間片的大小也各不相同,在優先權愈高的隊列中,為每個進程所規定的執行時間片就愈小。例如,第二個隊列的時間片要比第一個隊列的時間片長一倍,……,第i+1個隊列的時間片要比第i個隊列的時間片長一倍。 (2) 當一個新進程進入內存后,首先將它放入第一隊列的末尾,按FCFS原則排隊等待調度。當輪到該進程執行時,如它能在該時間片內完成,便可准備撤離系統;如果它在一個時間片結束時尚未完成,調度程序便將該進程轉入第二隊列的末尾,再同樣地按FCFS原則等待調度執行;如果它在第二隊列中運行一個時間片后仍未完成,再依次將它放入第三隊列,……,如此下去,當一個長作業(進程)從第一隊列依次降到第n隊列后,在第n 隊列便采取按時間片輪轉的方式運行。 (3) 僅當第一隊列空閑時,調度程序才調度第二隊列中的進程運行;僅當第1~(i-1)隊列均空時,才會調度第i隊列中的進程運行。如果處理機正在第i隊列中為某進程服務時,又有新進程進入優先權較高的隊列(第1~(i-1)中的任何一個隊列),則此時新進程將搶占正在運行進程的處理機,即由調度程序把正在運行的進程放回到第i隊列的末尾,把處理機分配給新到的高優先權進程。
進程的並行與並發
並行:並行是指兩者同時執行,比如賽跑,兩個人都在不停的往前跑;(資源夠用,比如三個線程,四核CPU)
並發:並行是指資源有限的情況下,兩者交替輪流使用資源,比如一段路(單核CPU資源)同時只能過一個人,A走一段后,讓給B,B用完繼續給A,交替使用,目的是提高效率。
區別:
並行是從微觀上,也就是在一個精確的時間片刻,有不同的程序在執行,這就要求必須有多個處理器。
並發是從宏觀上,在一個時間段上可以看出是同時執行,比如一個服務器同時處理多個session。
同步異步阻塞非阻塞
狀態介紹
在了解其他概念之前,我們首先要了解進程的幾個狀態。在程序運行的過程中,由於被操作系統的調度算法控制,程序會進入幾個狀態:就緒、運行和阻塞。
(1)就緒(Ready)狀態
當進程已分配到除CPU以為的所有必要的資源,只要獲得處理機便可立即執行,這時進程狀態稱為就緒狀態。
(2)執行/運行(Running)狀態
當進程已獲得處理機,其程序正在處理機上執行,此時的進程狀態稱為執行狀態。
(3)阻塞(Blocked)狀態
正在執行的進程,由於等待某個時間發生而無法執行時,便放棄處理機而處於阻塞狀態。引起進程阻塞的時間有多種,例如:等待I/O完成、申請緩沖區不能滿足、等待信件(信號)等。
同步/異步
同步:所謂同步就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成,這是一種可靠的任務序列
。要么成功都成功,失敗都失敗,兩個任務的狀態可以保持一致。
異步:所謂異步是不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什么工作,依賴的任務也立即執行,只要自己完成了整個任務就算完成了
。至於被依賴的任務最終是否真正完成,依賴它的任務無法確定,所以它是不可靠的任務序列
。

比如我去銀行辦理業務,可能會有兩種方式:
第一種 :選擇排隊等候;
第二種 :選擇取一個小紙條上面有我的號碼,等到排到我這一號時由櫃台的人通知我輪到我去辦理業務了;
第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業務情況;
第二種:后者(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業務的人)往往注冊一個回調機制,在所等待的事件被觸發時由觸發機制(在這里是櫃台的人)通過某種機制(在這里是寫在小紙條上的號碼,喊號)找到等待該事件的人。
阻塞/非阻塞
阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來說的

繼續上面的那個舉例,不論是排隊還是使用號碼等待通知,如果在這個等待的過程中,等待者除了等待消息通知之外不能做其它的事情,那么該機制就是阻塞的,表現在程序中,也就是該程序一直阻塞在該函數調用處不能繼續往下執行。
相反,有的人喜歡在銀行辦理這些業務的時候一邊打打電話發發短信一邊等待,這樣的狀態就是非阻塞的,因為他(等待者)沒有阻塞在這個消息通知上,而是一邊做自己的事情一邊等待。
注意:同步非阻塞形式實際上是效率低下的,想象一下你一邊打着電話一邊還需要抬頭看到底隊伍排到你了沒有。如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的;而異步非阻塞形式卻沒有這樣的問題,因為打電話是你(等待者)的事情,而通知你則是櫃台(消息觸發機制)的事情,程序沒有在兩種不同的操作中來回切換。
同步/異步與阻塞/非阻塞
(1)同步阻塞形式
效率最低。拿上面的舉例來說,就是你專心排隊,什么別的事都不做
(2)異步阻塞形式
如果在銀行等待辦理業務的人采用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間里他不能離開銀行去做其它的事情,那么很顯然,這個人被阻塞在了這個等待的操作上面;
異步操作也可以被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知書時被阻塞。
(3)同步非阻塞形式
實際上是效率低下的。想象一下你一邊打着電話一邊還需要抬頭看到底隊伍排到你了沒有,如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個程序需要在這兩種不同的行為之間來回的切換
,效率可想而知是低下的。
(4)異步非阻塞形式
效率更高,因為打電話是你(等待者)的事情,而通知你則是櫃台(消息觸發機制)的事情,程序沒有在兩種不同的操作中來回切換
。
比如說,這個人突然發覺自己煙癮犯了,需要出去抽根煙,於是他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那么他就沒有被阻塞在這個等待的操作上面,自然這個就是異步+非阻塞的方式了。
很多人會把同步和阻塞混淆,是因為很多時候同步操作會以阻塞的形式表現出來
,同樣的,很多人也會把異步和非阻塞混淆,因為異步操作一般都不會在真正的IO操作處被阻塞
。
進場的創建與結束
進程的創建
但凡是硬件,都需要有操作系統去管理,只要有操作系統,就有進程的概念,就需要有創建進程的方式,一些操作系統只為一個應用程序設計,比如微波爐中的控制器,一旦啟動微波爐,所有的進程都已經存在。
而對於通用系統(跑很多應用程序),需要有系統運行過程中創建或撤銷進程的能力,主要分為4中形式創建新的進程
(1)系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前台進程負責與用戶交互,后台運行的進程與用戶無關,運行在后台並且只在需要時才喚醒的進程,稱為守護進程,如電子郵件、web頁面、新聞、打印)
(2)一個進程在運行過程中開啟了子進程(如nginx開啟多進程,os.fork,subprocess.Popen等)
(3)用戶的交互式請求,而創建一個新進程(如用戶雙擊暴風影音)
(4)一個批處理作業的初始化(只在大型機的批處理系統中應用)
無論哪一種,新進程的創建都是由一個已經存在的進程執行了一個用於創建進程的系統調用而創建的。

1. 在UNIX中該系統調用是:fork,fork會創建一個與父進程一模一樣的副本,二者有相同的存儲映像、同樣的環境字符串和同樣的打開文件(在shell解釋器進程中,執行一個命令就會創建一個子進程) 2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的創建,也負責把正確的程序裝入新進程。 關於創建子進程,UNIX和windows 1.相同的是:進程創建后,父進程和子進程有各自不同的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另外一個進程。 2.不同的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是可以有只讀的共享內存區的。但是對於windows系統來說,從一開始父進程與子進程的地址空間就是不同的。
進程的結束
1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出錯退出(自願,python a.py中a.py不存在)
3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,可以捕捉異常,try...except...)
4. 被其他進程殺死(非自願,如kill -9)
Python程序中的進程操作
multiprocess模塊
python中的多線程無法利用多核優勢,如果想要充分的使用CPU資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python中提供了multiprocess。仔細來說multiprocess不是一個模塊而是python中一個操作、管理進程的包。之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。大致分為四個部分:創建進程部分、進程同步部分、進程池部分、進程之間數據共享。
multiprocess.Process模塊
Process模塊介紹
Process模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹:
1 group參數未使用,值始終為None 2 target表示調用對象,即子進程要執行的任務 3 args表示調用對象的位置參數元組,args=(1,2,'egon',) 4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} 5 name為子進程的名稱
方法介紹:
1 p.start():啟動進程,並調用該子進程中的p.run() 2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 3 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖 4 p.is_alive():如果p仍然運行,返回True 5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
屬性介紹:
1 p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置 2 p.name:進程的名稱 3 p.pid:進程的pid 4 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可) 5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

在Windows操作系統中由於沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。所以必須把創建子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。
使用Process模塊創建進程
創建並開啟子進程的兩種方式

import os from multiprocessing import Process def func(name): print('hello', name) print("我是子進程: %d;我的父進程id是:%d" % (os.getpid(), os.getppid())) # os.getpid可以獲取進程的id, os.getppid可以獲取父進程的id if __name__ == '__main__': p = Process(target=func, args=('xiaobai',)) # 此處傳參必須是元組數據類型 p.start() print("我是父進程:%d" % os.getpid()) ''' # 執行結果 我是父進程:12612 hello xiaobai 我是子進程: 5760;我的父進程id是:12612 '''

# 通過繼承Process類的形式開啟進程的方式 import os from multiprocessing import Process class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print(os.getpid()) print('%s 正在和女神聊天' % self.name) if __name__ == '__main__': p1 = MyProcess('xiaobai') p2 = MyProcess('xiaohei') p1.start() # start會自動調用run方法 p2.start() # 說明:如果需要傳參,必須寫入到__init__方法里面,且必須加上super().__init__();因為父類Process里面也有__init__方法。
Process對象的join方法

import time from multiprocessing import Process def func(name): print("hello", name) time.sleep(1) print('我是子進程') if __name__ == '__main__': p = Process(target=func, args=('xiaobai',)) p.start() p.join() # 加上join方法后,父進程就會阻塞等待子進程結束而結束。 print("父進程")
Process開啟多進程
多個進程同事運行(注意,子進程的執行順序不是根據自動順序決定的)
import time from multiprocessing import Process def func(name): print("hello 進程 %d" % name ) time.sleep(1) if __name__ == '__main__': for i in range(10): p = Process(target=func, args=(i,)) p.start()

import time from multiprocessing import Process def func(name): print("hello 進程 %d" % name ) time.sleep(0.1) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func, args=(i,)) p.start() p_lst.append(p) print("父進程執行中")

import time from multiprocessing import Process def func(name): print("hello 進程 %d" % name ) time.sleep(0.1) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func, args=(i,)) p.start() p_lst.append(p) p.join() print("父進程執行中")
進程之間的數據隔離問題

from multiprocessing import Process n = 100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就可以了 def work(): global n n = 0 print("子進程內:", n) if __name__ == '__main__': p = Process(target=work) p.start() print("主進程內:", n)
守護進程
守護進程會隨着主進程的結束而結束。
主進程創建守護進程
其一:守護進程會在主進程代碼執行結束后就終止
其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) if __name__ == '__main__': p=Myprocess('小白') p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行 p.start() time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id print('主')

import time from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------")#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,因為主進程打印main----時,p1也執行了,但是隨即被終止.
socket聊天並發實例

from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start進程一定要寫到這下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start()

from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
多進程中的其他方法

from multiprocessing import Process import time import random class Myprocess(Process): def __init__(self,person): self.name=person super().__init__() def run(self): print('%s正在和網紅臉聊天' %self.name) time.sleep(random.randrange(1,5)) print('%s還在和網紅臉聊天' %self.name) if __name__ == '__main__': p1=Myprocess('哪吒') p1.start() p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活 print(p1.is_alive()) #結果為True is_alive判斷是否還存活 print('開始') time.sleep(0.1) print(p1.is_alive()) #結果為False

import time import random from multiprocessing import Process class Myprocess(Process): def __init__(self,person): self.name=person # name屬性是Process中的屬性,標示進程的名字 super().__init__() # 執行父類的初始化方法會覆蓋name屬性 #self.name = person # 在這里設置就可以修改進程名字了 #self.person = person #如果不想覆蓋進程名,就修改屬性名稱就可以了 def run(self): print('%s正在和網紅臉聊天' %self.name) # print('%s正在和網紅臉聊天' %self.person) time.sleep(random.randrange(1,5)) print('%s正在和網紅臉聊天' %self.name) # print('%s正在和網紅臉聊天' %self.person) if __name__ == '__main__': p1=Myprocess('小白') p1.start() print(p1.pid) #可以查看子進程的進程id
進程同步(鎖)
鎖:multiprocess.Lock
當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。

import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) if __name__ == '__main__': for i in range(3): p = Process(target=work, args=(i,)) p.start() # 執行結果 """ 0: 14316 is running 1: 9900 is running 2: 10056 is running 1: 9900 is done 2: 10056 is done 0: 14316 is done """

import os import time import random from multiprocessing import Process, Lock def work(lock, n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock = Lock() for i in range(3): p = Process(target=work, args=(lock, i)) p.start() # 執行結果 """ 0: 15276 is running 0: 15276 is done 1: 6360 is running 1: 6360 is done 2: 14776 is running 2: 14776 is done """
上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了,這樣確實會浪費了時間,但卻保證了數據的安全。
實例:模擬搶票為例,來看看數據安全的重要性

#文件db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 #並發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩余票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(0.1) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open('db','w')) print('\033[43m購票成功\033[0m') def task(): search() get() if __name__ == '__main__': for i in range(100): #模擬並發100個客戶端搶票 p=Process(target=task) p.start()

#文件db的內容為:{"count":5} #注意一定要用雙引號,不然json無法識別 #並發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩余票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(random.random()) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(random.random()) #模擬寫數據的網絡延遲 json.dump(dic,open('db','w')) print('\033[32m購票成功\033[0m') else: print('\033[31m購票失敗\033[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(100): #模擬並發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start()
# 加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據的安全性 雖然可以文件共享數據實現進程間通信,但問題是: 1、效率低(共享數據基於文件,而文件是硬盤上的數據) 2、需要自己加鎖處理 # 因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題,這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道 隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來, 我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可擴展性。