進程的理解:
-
1、系統進行資源分配和調度的基本單位,一個具有一定獨立功能的程序關於某個數據集合的一次運行活動;
-
2、它是一個動態的概念,一個活動的實體;
- 狹義定義:
an instance of a computer program that is being executed
即正在運行的程序的實例化對象。 - 廣義定義:進程是一個具有一定獨立功能的程序關於某個數據集合的一次運行活動,是操作系統進行資源分配和調度的基本單位,是操作系統動態執行的基本單元。
- 狹義定義:
-
注:其概念的關鍵點在於
1)、進程是一個實體(動態的),具有自己獨立的地址空間,包括:
文本區域(text region):存儲處理器執行的代碼;
數據區域(data region):存儲變量與進程執行期間使用的動態分配的內存;
堆棧(stack region):存儲的是程序執行過程中調用的指令與本地變量;
注:正是由於每個進程是一個獨立的實體,其中以上所述的三個區域,即每個進程的數據區域以及堆棧是獨立的,相互隔離的,所以在多進程中可以保證數據的安全性
2)、編寫完的代碼,沒有運行時,稱為程序,
正在運行的代碼,稱為進程
程序是死的(靜態的),進程是活的(動態的)
-
3、進程的三大狀態
- (1) 就緒(Ready)狀態
進程創建完成即其他所有資源都已分配完畢,等待cpu調度執行時,稱為就緒狀態。 - (2) 執行(Running)狀態
cpu開始執行該進程時稱為執行狀態。 - (3) 阻塞(Blocked)狀態
由於等待某個事件發生而無法執行時,便是阻塞狀態,cpu執行其他進程.例如,等待I/O完成input、申請緩沖區不能滿足等等。
如圖所示
- (1) 就緒(Ready)狀態
CPU 調度進程的方式
- 先來先服務fcfs(first come first server):先來的先執行
- 短作業優先算法:分配的cpu多,先把短的算完
- 時間片輪轉算法:每一個任務就執行一個時間片的時間.然后就執行其他的
- 多級反饋隊列算法
- 越是時間長的,cpu分配的資源越少,優先級靠后
- 越是時間短的,cpu分配的資源越多
創建進程
導入multiprocessing模塊中的Process類
以供后續創建類的時候直接調用
p = Process(target = func, name = process01, args=(5,))
實例化進程對象
Process 類參數介紹
- target = func 表示調用對象,即子進程要執行的任務 func
- args 表示任務 func 的位置參數元組,args=(5, )
- name = process01 為子進程的名稱
Process 類常⽤⽅法
- p.start( ): 啟動進程,並調用該子進程中的p.run( )
- p.run( ): 進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要寫入該方法
- p.terminate( ): 強制終止進程p,不會進行任何清理操作
- p.is_alive( ): 如果p仍然運行,返回True。用來判斷進程是否還在運行
- p.join([timeout]): 主進程等待子進程p終止,timeout是可選的等待時間
# 主進程速度快於子進程,join方法可以使得子進程執行結束后,再繼續執行主進程中的代碼,可以用來同步代碼的一致性
import multiprocessing
def func():
print("發送第一份郵件")
if __name__ == "__main__":
p = multiprocessing.Process(target=func)
p.start()
p.join()
print("發送第二份郵件")
# 發送第一份郵件
# 發送第二份郵件
# 多個子進程配合 join 方法實現異步並發
import multiprocessing
def func(index):
print(f"發送第{index}封郵件")
if __name__ == "__main__":
process_list = []
for i in range(10):
p = multiprocessing.Process(target=func, args=(i, ))
p.start()
process_list.append(p)
# p.join() 程序會變成同步阻塞
for i in process_list:
i.join() # 異步並發
print("主進程發最后一封郵件!")
Process類常⽤屬性
- name: 當前進程實例別名, 默認為Process-N, N為從1開始遞增的整
數- pid: 當前進程實例的ID值
創建進程的兩種方法
# 創建進程的方法一:
# 利用multiprocessing模塊提供一個Process類來創建一個進程對象
from multiprocessing import Process
import time
def func(n):
while n > 0:
print(n)
time.sleep(3)
n -= 1
if __name__ == "__main__":
p = Process(target = func, args=(5,))
p.start()
p.join()
# 創建進程的方法二:
# 創建新的進程可以自定義一個類去繼承Process類,每次實例化這個類的時候,就等同於實例化一個進程對象
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
def run(self):
n = 5
while n > 0:
print(n)
time.sleep(3)
n -= 1
if __name__ == "__main__":
p = ClockProcess()
p.start()
p.join()
守護進程
- 守護 主進程 時,如果主進程執行結束了,意味着守護進程的壽命立刻終止.立刻殺死
- 語法:
- 進程對象.daemon = True 設置當前進程為守護進程
- 必須寫在start( )調用進程之前進行設置
- 默認情況下,主進程會等待所有子進程執行完畢之后,關閉程序,釋放資源。若不等待,子進程並不方便管理,容易造成僵屍進程,在后台不停的占用系統的資源(cpu和內存),不清楚進程的來源。
- 守護主進程即在主進程代碼執行結束之后,無需等待子進程執行,立即殺死程序
import multiprocessing def func(): print("start 當前子進程") print("end 當前子進程") if __name__ == "__main__": p = multiprocessing.Process(target=func) p.daemon = True p.start() print("主進程執行結束 ... ") # 主進程執行結束 ...
多個子進程下,未守護主進程,主進程仍會等待子進程執行結束
- 守護進程的實際用途:監控報活
import time # 監控報活 def alive(): while True: print("給監控服務器發消息, 當前5號服務器功能正常 i am ok ~") time.sleep(1) # 當前服務器正常完成的功能 def func(): time.sleep(5) print("當前5號服務器功能,統計財務報表~") if __name__ == "__main__": p1 = Process(target=func) p2 = Process(target=alive) # 守護p2進程 p2.daemon = True p1.start() p2.start() # 等待p1子進程執行結束之后,下面的主程序的代碼才會放行; p1.join() # 未守護主進程,主進程會默認等待 print("當前服務器狀態:統計財務報表功能異常.....") # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 當前5號服務器功能,統計財務報表~ # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 當前服務器狀態:統計財務報表功能異常.....
多任務處理方式一:多進程
創建多進程的兩種方式:
# 手動創建
from multiprocessing import Process
num = 1
def run1():
global num
num += 5
print("子進程1運行中,num = %d" % (num))
def run2():
global num
num += 10
print("子進程2運行中,num = %d" % (num))
if __name__ == "__main__":
print("父進程啟動")
p1 = Process(target=run1)
p2 = Process(target=run2)
print("子進程將要執行")
p1.start()
p2.start()
p1.join()
p2.join()
print("子進程結束")
# 借助舊版進程池創建多進程
from multiprocessing import Pool
import random
import time
def work(num):
print(random.random() * num)
time.sleep(3)
if __name__ == "__main__":
# 實例化進程池對象,設置同一時間內最多可以執行的進程數為3個
# 題中的10個任務都由進程池中的這三個進程輪詢執行,不會創建額外 的進程數
# 若不指定則同一時間內可以執行的進程個數默認為cpu邏輯核心數
p = Pool(3)
for i in range(10):
# apply_async 選擇要調用的任務,每次循環出來的任務會用閑下來的子進程去執行
# 使⽤⾮阻塞⽅式調⽤func(並⾏執⾏,阻塞⽅式必須為等待上⼀個進程退出后才能執⾏下⼀個進程), args為傳遞給func的參數列表,kwargs為傳遞給func的關鍵字參數列表;
p.apply_async(work, (i,))
# 進程池關閉之后不會再接受新的請求
p.close()
# 等待進程池中的所有子進程都結束
p.join()
# 多進程中,主進程一般用來等待子進程執行完畢,真正的任務都由子進程中執行
# 借助新版進程池創建多進程
from concurrent.futures import ProcessPoolExecutor
import os
import time
def func(i):
print("任務執行中... start", os.getpid())
time.sleep(10)
print("任務結束... end", i)
return i
# ProcessPoolExecutor 進程池基本使用
"""
默認如果一個進程短時間內可以完成更多的任務,就不會創建額外的新的進程,以節省資源
"""
if __name__ == "__main__":
lst = []
print(os.cpu_count()) # cpu邏輯核心數
# 創建進程池對象
"""進程池中默認最多創建cpu這么多個進程,所有任務全由這幾個進程完成,不會額外創建進程"""
p = ProcessPoolExecutor()
# 異步提交任務
for i in range(10):
res = p.submit(func, i)
lst.append(res)
# 獲取當前進程池返回值
for i in lst:
print(i.result())
# 等待所有子進程執行結束
p.shutdown() # join
print("主程序執行結束....")
進程間通信
進程間數據不共享,他們之間進行數據傳遞即為通信
from multiprocessing import Queue
借助進程隊列
Queue
完成進程間的通信
Queue 的基本使用
- 消息隊列遵循 先進先出 的原則
初始化
- 初始化Queue()對象時(q=Queue()),若括號中沒有指定最⼤可接收
的消息數量, 或數量為負值, 那么就代表可接受的消息數量沒有上限
入隊操作(存數據)
q = Queue()
q.put(item, [block[, timeout]])
:將item消息寫⼊隊列- block 默認值為True
- 如果block 使⽤默認值,且沒有設置timeout(單位秒)時,若消息列隊已經沒有空間可寫⼊,此時程序將被阻塞(停在寫⼊狀態) ,直到從消息列隊騰出空間為⽌,如果設置了True和timeout,則會等待timeout秒,若還沒空間,則拋 出"q.Full"的異常信息
- 如果block值為False, 消息列隊如果出現沒有空間可寫⼊的情況, 則會⽴刻拋出"q.Full"滿了異常
q.put_nowait(item)
: 相當q.put(item, False)
;
出隊操作(取數據)
q.get([block[, timeout]])
:獲取隊列中的⼀條消息, 然后將其從列隊中移除block默認值為True
如果block使⽤默認值,且沒有設置timeout(單位秒),消息列隊如果為空, 此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息為⽌,
如果設置了timeout, 則會等待timeout秒, 若還沒讀取到任何消息, 則拋
出"q.Empty"異常如果block值為False,消息列隊如果為空,則會⽴刻拋出“q.Empty”空的異常
q.get_nowait()
:相當q.get(False)
其他操作
- q = Queue()
- q.qsize(): 返回當前隊列包含的消息數量
- q.empty(): 如果隊列為空, 返回True, 反之False
- q.full(): 如果隊列滿了, 返回True,反之False
python代碼實現
from multiprocessing import Queue, Process
import time
def write(q):
for value in ["a", "b", "c"]:
print("開始寫入:", value)
q.put(value)
time.sleep(2)
def read(q):
while True:
if not q.empty():
print("讀取到的是", q.get())
time.sleep(2)
else:
break
if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pw.join() #等待接收完畢
pr.start()
pr.join()
print("接受完畢!")
# 三個進程間通信
from multiprocessing import Process
from multiprocessing import Queue
def func1(q1):
q1.put("你好!")
print(f"子進程p1往隊列q1中放入的數據為:你好!")
def func2(q1, q2):
msg = q1.get()
print(f"子進程p2從隊列q1中取出的數據為:{msg}")
q2.put(msg)
print(f"子進程p2往隊列q2中放入的數據為:{msg}")
def func3(q2):
msg = q2.get()
print(f"子進程p3從隊列q2中取出的數據為:{msg}")
if __name__ == "__main__":
q1 = Queue()
q2 = Queue()
p1 = Process(target=func1, args=(q1,))
p2 = Process(target=func2, args=(q1, q2))
p3 = Process(target=func3, args=(q2,))
p1.start()
p2.start()
p3.start()
JoinableQueue 的用法
# put 存儲
# get 獲取
# task_done 隊列計數減1
# join 阻塞
# task_done 配合 join 一起使用
# [1,2,3,4,5]
# 隊列計數5
# put 一次 每存放一個值,隊列計數器加1
# get 一次 通過task_done讓隊列計數器減1
# join 函數,會根據隊列中的計數器來判定是阻塞還是放行
# 如果計數器變量是0,意味着放行,其他情況阻塞;
from multiprocessing import Process,JoinableQueue
jq = JoinableQueue()
# put 會讓隊列計數器加1
jq.put("a")
print(jq.get())
# 通過task_done,讓隊列計數器減1
jq.task_done()
# 只有隊列計數器是0的時,才會放行
jq.join() # 隊列.join
print("finish")
生產者——消費者模型
Queue下的生產者——消費者模型:
# 消費者模型
def consumer(q, name):
while True:
food = q.get()
if food is None:
break
time.sleep(random.uniform(0.1, 1))
print("%s 吃了一個%s" % (name, food))
# 生產者模型
def producer(q, name, food):
for i in range(5):
time.sleep(random.uniform(0.1, 1))
print("%s 生產了 %s%s" % (name, food, i))
q.put(food + str(i))
if __name__ == "__main__":
q = Queue()
# 消費者1
p1 = Process(target=consumer, args=(q, "張三"))
p1.start()
# 消費者2
a2 = Process(target=consumer, args=(q, "李四"))
a2.start()
# 生產者1
p2 = Process(target=producer, args=(q, "王五", "黃金"))
p2.start()
# 生產者2
b2 = Process(target=producer, args=(q, "小明", "鑽石"))
b2.start()
# 在生產完所有的數據之后,在隊列的末尾塞入一個None
p2.join()
b2.join()
# 消費者模型如果獲取的是None,代表停止消費
q.put(None)
q.put(None)
JoinableQueue 下的生產者——消費者模型:
from multiprocessing import Process,JoinableQueue
# 消費者模型
def consumer(q, name):
while True:
food = q.get()
time.sleep(random.uniform(0.1, 1))
print("%s 吃了一個%s" % (name, food))
q.task_done()
# 生產者模型
def producer(q, name, food):
for i in range(5):
time.sleep(random.uniform(0.1, 1))
print("%s 生產了 %s%s" % (name, food, i))
q.put(food + str(i))
if __name__ == "__main__":
q = JoinableQueue()
# 消費者1
p1 = Process(target=consumer, args=(q, "張三"))
p1.daemon = True
p1.start()
# 生產者1
p2 = Process(target=producer, args=(q, "李四", "黃金"))
p2.start()
# 把生產者所有的數據都裝載到隊列中
p2.join()
# 當隊列計數器減到0的時候,會立刻放行
# 必須等待消費者模型中所有的數據都task_done之后,變成0了就代表消費結束.
q.join()
print("程序結束....")
進程池中的進程之間的通信
from multiprocessing import Manager, Pool
import time
def write(q):
for i in "welcome":
print("開始寫入", i)
q.put(i)
def read(q):
time.sleep(2)
for i in range(q.qsize()): # q.qsize()獲取到當前隊列的消息數量!
print("得到消息", q.get())
if __name__ == "__main__":
print("主進程啟動!")
q = Manager().Queue()
po = Pool()
po.apply_async(write, (q,))
po.apply_async(read, (q,))
po.close()
po.join()