線程池的原理:
線程池是預先創建線程的一種技術。線程池在還沒有任務到來之前,
創建一定數量的線程,放入空閑隊列中。這些線程都是處於睡眠狀態,
即均為啟動,不消耗CPU,而只是占用較小的內存空間。當請求到來之后,
緩沖池給這次請求分配一個空閑線程,把請求傳入此線程中運行,進行處理。
當預先創建的線程都處於運行狀態,即預制線程不夠,線程池可以自由創建一定數量的新線程,
用於處理更多的請求。當系統比較閑的時候,也可以通過移除一部分一直處於停用狀態的線程。
進程間的通信原理:
OS提供了溝通的媒介供進程之間“對話”用。既然要溝通,如同人類社會的溝通一樣,
溝通要付出時間和金錢,計算機中也一樣,必然有溝通需要付出的成本。
出於所解決問題的特性,OS提供了多種溝通的方式,每種方式的溝通成本也不盡相同,
使用成本和溝通效率也有所不同。我們經常聽到的 管道、消息隊列、共享內存都是OS提供的供進程之間對話的方式。
Process(target, name, args, kwargs)
name:
給
進程取
名字
默認為
Process-1,Process-2.....
p.name 查看進程名
args:
以
元組的形式
給target函數傳參
kwargs:
以
字典的形式
給對應鍵的值傳參
進程對象的其他
常用屬性方法:
p.name p.start() p.join()
p.pid:
獲取創建進程的
pid號
p.is_alive():
判斷進程是處於alive狀態
p.daemon:
默認為Flase 如果
設置為True
主進程結束時
殺死所有子進程
daemon屬性
一定要在start()前設置
設置daemon
為True 一般
不需要加join()
daemon不是真正意義上的守護進程
守護進程:
不受終端控制
后台自動運行
生命周期長
多進程copy一個文件拆分為兩個進行保存
import os from multiprocessing import Process from time import sleep #獲取文件的大小 size = os.path.getsize("./timg.jpeg") # 獲取文件的字節數 # f = open("timg.jpeg",'rb') #復制前半部分 def copy1(img): f = open(img,'rb') # 二進制讀取要復制的文件 n = size // 2 fw = open('1.jpeg','wb') # 二進制創建文件 while True: if n < 1024: # 判斷文件大小是否大於1024字節 如果小於則直接讀取寫入 data = f.read(n) fw.write(data) break data = f.read(1024) # 否則每次循環讀取1024字節並寫入 fw.write(data) n -= 1024 f.close() fw.close() #復制后半部分 def copy2(img): f = open(img,'rb') # 讀取文件必須要每次讀取 如果在父進程中打開文件流對像 # 子進程會通同時調用一個文件流對像 由於文件流對象特性會記錄游標 # 如若先執行后半部復制這前半部會導致讀取不到數據 fw = open('2.jpeg','wb') f.seek(size // 2,0) while True: data = f.read(1024) if not data: break fw.write(data) fw.close() f.close() p1 = Process(target = copy1,args = ('timg.jpeg',)) # 創建子進程並讓子進程分別同時復制 p2 = Process(target = copy2,args = ('timg.jpeg',)) p1.start() p2.start() p1.join() p2.join()
os.path.getsize('./1.txt'):
讀取文件大小
注:
1.如果
多個子進程拷貝同一個
父進程的對象則多個
子進程
使用的是同一個對象(如文件隊形,套接字,隊列,管道。。。)
2.如果在
創建子進程后單獨創建的對象,則多個
子進程各不相同
創建子自定義進程類
1.編寫類
繼承Process
2.在自定義類中
加載父類__init__以獲取父類屬性,
同時可以
自定義新的屬性
3
.重寫run方法 在調用start時自動執行該方法
示例:
from multiprocessing import Process import time class ClockProcess(Process): def __init__(self,value): #調用父類init super().__init__() self.value = value #重寫run方法 def run(self): for i in range(5): time.sleep(self.value) print("The time is {}".format(time.ctime())) p = ClockProcess(2) #自動執行run p.start() p.join()
進程的缺點:
進程在
創建和銷毀的過程中
消耗的
資源相對
較多
進程池技術:
產生原因:
如果有大量的任務需要多進程完成,而調用周期比較短且需要頻繁創建
此時可能產生大量進程頻繁創建銷毀的情況 消耗計算機資源較大
使用方法:
1.
創建進程池,
在池內放入適當數量的進程
2.
將事件封裝成函數。
放入到
進程池
3.事件不斷運行,
直到所有放入進程池事件運行完成
4.
關閉進程池,
回收進程
from multiprocessing import pool
pool(Process)
功能:
創建進程池對象
參數:進程數量
返回值:進程池對象
pool = pool()
pool.apply_async(fun, args, kwds)(異步執行)
功能:
將事件放入進程池內
參數:
fun:要執行的
函數
args:以
元組形式為fun
傳參
kwds:以
字典形式為fun
傳參
返回值:
返回一個事件對象,通過p.
get()函數可以獲取fun的返回值
pool.close():
功能:
關閉進程池,無法再加入新的事件,並等待已有事件結束執行
pool.join()
功能:
回收進程池
pool.apply(fun, args, kwds)(同步執行)
功能:將事件放入進程池內
參數:
fun:要執行的函數
args:以元組形式為fun傳參
kwds:以字典形式為fun傳參
沒有返回值
示例:
from multiprocessing import Pool from time import sleep,ctime def worker(msg): sleep(2) print(msg) return ctime() #創建進程池對象 pool = Pool(processes = 4) result = [] for i in range(10): msg = "hello %d"%i #將事件放入進程池 r = pool.apply_async(func = worker,args = (msg,)) result.append(r) #同步執行 # pool.apply(func = worker,args = (msg,)) #關閉進程池 pool.close() #回收 pool.join() #獲取事件函數返回值 for i in result: print(i.get())
pool.map(func, iter)
功能:
將要執行的
事件放入進程池
參數:
func 要執行的
函數
iter
可迭代對象
示例:
from multiprocessing import Pool import time def fun(n): time.sleep(1) print("執行 pool map事件",n) return n ** 2 pool = Pool(4) #在進程池放入6個事件 r = pool.map(fun,range(6)) # map高階函數 fun和iter執行6次 print("返回值列表:",r) pool.close() pool.join()
進程間的通信(IPC)
由於進程
空間獨立,
資源無法共享,
此時在
進程間通訊就
需要專門的通訊方法
通信方法:
管道、消息隊列、共享內存
信號、信號量、套接字
管道通信:
在內存中
開辟一塊內存空間,
形成管道結構
多個進程使用同一個管道,即可通過
對管道的
讀寫操作進行通訊
multiprocessing --> Pipe
fd1,fd2 = Pipe(duplex=True)
功能:
創建管道
參數:
默認表示
雙向管道
如果設置為
False則為
單向管道
返回值:
倆個管道對象的,分別表示管道的兩端
如果是
雙向管道則均可讀寫
如果是
單向管道則
fd1只讀,
fd2只寫
fd.recv()
功能:從管道
讀取信息
返回值:讀取到的內容
當
管道為空則阻塞
fd.send(data)
功能:
向管道寫入內容
參數:要寫入的內容
當
管道滿時會阻塞
可以
寫入幾乎所有
Python所有數據類型
隊列通信:
在
內存中
開辟隊列結構空間,多個進程可見,
多個進程操作同一個隊列對象可以
實現消息存取工作
在取出時
必須按照存入
順序取出(
先進先出)
q = Queue(maxsize=0)
功能:
創建隊列對象
參數:
maxsize 默認表示根據系統分配空間
儲存消息
如果
傳入一個正整數則
表示最多
存放多少條消息
返回值:隊列對象
q.put(data,[block,timeout])
功能:向隊列
存入消息
參數:
data:存入消息(
支持Python數據類型)
block:默認
True表示當隊
滿時阻塞
設置為
False 則為
非阻塞
timeout:當
block為True是表示
超時檢測
data = q.get([block,timeout])
功能:取出消息
參數:
block:設置為
True 當隊列為
空時阻塞
設置為
False表示
非阻塞
timeout:
當
block為True是表示
超時檢測
q.full()
判斷隊列是否為滿
q.empty() 判斷隊列
是否為空
q.qsize()
獲取隊列中
消息的數量
q.close()
關閉隊列
共享內存通信:
在
內存中開辟一段空間存儲數據對多個進程可見,
每次寫入共享內存中的內容
都會覆蓋之前內容
對內存的
讀操作不會改變內存中的內容
form multiprocessing import Value,Array
shm = Value(ctype,obj)
功能:
共享內存共享
空間
參數:
ctype:字符串 要轉換的c語言的數據類型
obj:共享內存的
初始數據
返回值:返回共享內存對象
shm.value:
表示共享內存
的值
示例:
from multiprocessing import Process,Value import time import random #創建共享內存 money = Value('i',6000) #存錢 def deposite(): for i in range(100): time.sleep(0.05) #對value的修改就是對共享內存的修改 money.value += random.randint(1,200) #花銷 def withdraw(): for i in range(100): time.sleep(0.04) #對value的修改就是對共享內存的修改 money.value -= random.randint(1,200) d = Process(target = deposite) w = Process(target = withdraw) d.start() w.start() d.join() w.join() print(money.value)
shm = Array(ctype,obj)
功能:
開辟共享內存
空間
參數:
ctype:要轉換的數據類型
obj:
要存入共享內容的的數據(
結構化數據)
列表、字符串
表示要存入得內容
要求
數據結構內的
類型相同
整數
表示要
開辟幾個單元的空間
返回值:
返回共享內存對象
可迭代對象
示例:
from multiprocessing import Process,Array import time #創建共享內存 shm = Array('c',b"hello") #字符類型要求是bytes #開辟5個整形單元的共享內存空間 # shm = Array('i',5) def fun(): for i in shm: print(i) shm[0] = b"H" p = Process(target = fun) p.start() p.join() print(shm.value) #從首地址打印字符串 # for i in shm: # print(i)
三種進程間通信區別:
管道通信: 消息隊列: 共享內存:
開辟空間: 內存 內存 內存
讀寫方式: 兩端讀寫 先進先出 每次覆蓋上次內容
單向/雙向
效率: 一般 一般 較快
應用: 多用於父子進程 應用靈活廣泛 復雜,需要同步互斥
