並發編程:
-
程序:
- 程序就是一堆文件
-
進程:
-
進程是分配資源的基本單位,為線程提供資源,一個程序可以開啟多個進程
-
-
進程被誰運行:
-
CPU最終運行你的程序
-
操作系統調度作用,將你磁盤上的程序加載到內存,然后交給CPU處理,一個CPU在運行的一個程序,就是開啟了一個進程
-
-
-
操作系統:
-
操作系統定義:
-
操作系統是存在於硬件與軟件之間,管理,協調,控制軟件與硬件的交互
-
-
操作系統的作用:
-
如果沒有操作系統,去寫一個程序,你要完成兩層功能:
- 第一層:你要學會底層硬件:CPU,內存,磁盤是如何工作使用的
- 第二層:去調度這些底層的硬件
-
操作系統兩個作用:
-
1,將一些復雜的硬件操作封裝成簡單的接口,便於使用
-
2,操作系統可以合理的調度分配多個進程與CPU的關系,讓其有序化
-
-
-
操作系統(計算機)的發展史:
-
第一代電子計算機:操作插線與你的程序結合
-
第二代計算機:磁帶存儲,批處理系
-
第三代計算機:集成電路,多道程序系統
-
-
知識點解析:
-
多道技術解決的問題:
-
多道技術是在不同任務間切換執行,由於計算機切換速度非常快,用戶是無感狀態
-
時間復用:
- 利用閑置時間,進行復用,一個進程占用cpu時間太長也會切換
-
空間復用:
- 一個內存可以加載多個進程,提高內存的利用率
-
數據隔離:
-
解決軟件之間的隔離,互不影響
-
-
進程:
-
程序就是一堆代碼
-
進程是分配資源的基本單位,為線程提供資源,一個程序可以開啟多個進程
-
概念:
- 串行:所有的進程有CPU一個一個解決
- 並行:多個CPU,真正的同時運行多個進程
- 並發:單個CPU,同時執行多個進程(來回切換),看起來像是同時運行,空間復用
- 阻塞:遇到IO(recv,input)才會阻塞
- 非阻塞:沒有IO
-
tail -f access.log |grep '404' 執行程序tail,開啟一個子進程,執行程序grep,開啟另外一個子進程,兩個進程之間基於管道'|'通訊,將tail的結果作為grep的輸入。 進程grep在等待輸入(即I/O)時的狀態稱為阻塞,此時grep命令都無法運行
-
進程的創建:
-
什么是開啟多個進程:socket:server,client兩個進程
-
python中,如果一次想開啟多個進程,必須是一個主進程,開啟多個子進程
-
linux,windows:有主進程開啟子進程
-
相同點:主進程開啟子進程,兩個進程都有相互隔離的獨立空間,互不影響
-
不同點:
-
linux:子進程空間的初始數據完全是從主(父)進程copy一份
-
windows:子進程空間初始數據完全是從主(父)進程copy一份,但是有所不同
-
-
創建進程的兩種方法:
-
函數-創建進程:
-
#這樣的實例雖然創建了子進程,但是在生產環境中子進程結束的時間不定 from multiprocessing import Process import time #當前py文件就是主進程,先運行主進程 def task(name): print(f"{name}is running") time.sleep(3) #阻塞 print(f"{name}is done") if __name__ == '__main__': #windows開啟必須寫在mian下面 p = Process(target=task,args=("海洋",)) #target要封裝的內容,對象args一定是個元祖形式 p.start() #子進程 通知操作系統在內存中開辟一個空間,將p這個進程放進去,讓cpu執行 print("___主進程")
-
類-創建進程(了解):
-
from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name): super().__init__() #放在最上面,必須要繼承父類init self.name = name def run(self): print(f"{self.name}is running") time.sleep(3) #阻塞 print(f"{self.name}is done") if __name__ == '__main__': p = MyProcess("海洋") p.start() print("====主進程")
進程PID:
-
tasklist | findstr pycharm win查看某個進程
-
import os print(os.getpid()) 查看當前的pid
-
import os print(os.getppid()) 查看父進程
進程之間數據隔離:
-
import time from multiprocessing import Process X = 1000 def task(): global x x = 2 if __name__ == '__main__': p1 = Process(target = task,) p1.start() time.sleep(1) print(f"主進程{X}") print(f"主進程{X}") import time from multiprocessing import Process X = 256 #滿足小數據池 def task(): print(f"子進程{id(X)}") if __name__ == '__main__': print(f"主進程{id(X)}") p1 = Process(target = task,) p1.start() time.sleep(1) print()
join方法:
-
join 主進程等待子進程結束之后,在執行
-
join開啟一個進程:
-
from multiprocessing import Process import time def task(name): time.sleep(1) print(f"{name}is running") if __name__ == '__main__': p = Process(target=task,args=("海洋",)) p.start() p.join() #告知主進程,p進程結束之后,主進程在結束,join有些阻塞的意思 print("___主進程") # p1.start() # p2.start() #p1,p2,p3三個子進程先后運行順序不定,start只是通知一下操作系統 # p3.start() #操作系統調用cpu先運行誰,誰先執行
-
-
join串行:
-
from multiprocessing import Process import time def task(name,sec): time.sleep(sec) print(f"{name}is running") if __name__ == '__main__': p1 = Process(target=task, args=("海洋",1)) p2 = Process(target=task, args=("俊麗",2)) p3 = Process(target=task ,args=("寶寶",3)) start_time = time.time() p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() print(f"主進程{time.time() - start_time}")
-
-
join並發:
-
from multiprocessing import Process import time def task(sec): time.sleep(sec) print(f"is running") if __name__ == '__main__': start_time = time.time() list = [] for i in range(1,4): p = Process(target=task, args=(i,)) p.start() list.append(p) for i in list: i.join() print(f"主進程{time.time() - start_time}")
-
進程對象的其他屬性:
-
屬性:
-
from multiprocessing import Process import time def task(name): print(f"{name}is running") time.sleep(3) print(f"{name}is done") if __name__ == '__main__': p = Process(target=task,args=("海洋",),name="俊麗") #name給進程對象設置name屬性 p.start() # print(p.pid) #獲取到進程號 time.sleep(1) #睡一秒,子進程已經執行完成 p.terminate() #強制結束子進程,強制執行也會有執行時間 #terminate跟start一樣工作原理,都要通知操作系統開啟子進程 #內存終止或者開啟都要需要時間的 time.sleep(1) #睡一秒,讓terminate殺死 print(p.is_alive()) #判斷子進程是否存活,只是查看內存中p子進程是否還運行 print("主進程")
-
-
僵屍進程:
-
init是所有進程的父進程: 僵屍進程,僵屍是什么,死而沒有消失 主進程創建多個短暫周期的子進程,當子進程退出,是需要等待父進程處理,而父進程沒有及時對子進程回收,那么子進程的進程符仍然保存在系統中,這種進程就是僵死進程 什么進程描述符:每一個進程都有描述符,io請求,數據指針 from multiprocessing import Process import time import os def task(name): print(f"{name}is running") print(f"子進程開始了:{os.getpid()}") time.sleep(50) if __name__ == '__main__': for i in range(100): p = Process(target=task, args=("海洋",)) p.start() print(f"___主進程:{os.getpid()}")
-
-
孤兒進程:
-
孤兒進程:孤兒進程是因為主進程的退出,他下面的所有子進程都變成孤兒進程了,init會對孤兒進行回收,釋 放掉占用系統的資源,這種回收也是為了節省內存。 孤兒進程無害,如果僵屍進程掛了,init會對孤兒進程回收,init是所有進程的祖進程,linux中為1,0系統
-
-
守護進程:
-
將一個子進程設置成守護進程,當父進程結束,子進程一定會結束,避免孤兒進程產生,應為回收機制
-
父進程不能創建子進程
-
#守護進程會在主進程代碼執行結束后終止,守護進程內無法在開啟子進程 from multiprocessing import Process import time import os def task(name): print(f"{name}is running") print(f"子進程開始了:{os.getpid()}") time.sleep(50) if __name__ == '__main__': p = Process(target=task,args=("海洋",)) p.daemon = True #將p子進程設置成守護進程,守護子進程,只要主進程結束 #子進程無論執行與否都馬上結束,daemon,開啟在start上面 p.start() print(f"___主進程:{os.getpid()}")
-
進程之間的通信方式:
-
第一種:基於文件+鎖的形式:效率低,麻煩
-
第二種:基於隊列,推薦的使用形式
-
第三種:基於管道,管道自己加鎖,底層可能會出現數據丟失損壞,隊列和管道都是將數據存放於內存中
互斥鎖:
-
互斥鎖保證了每次只有一個線程進行寫入操作,只有當這個線程解鎖,在運行其他資源,上鎖和解鎖都需要自己添加
-
三台電腦同時調用打印機去打印,開啟三個進程使用互斥鎖,實現公平搶占資源
-
#上鎖: #一定要是同一把鎖:只能按照這個規律,上鎖一次,解鎖一次 #互斥鎖與join區別: #共同點:都是完成了進程之間的串行 #區別:join認為控制進程的串行,互斥鎖是解決搶占的資源,保證公平性 from multiprocessing import Process from multiprocessing import Lock import time import os import random def task1(lock): print("test1") #驗證CPU遇到IO切換 lock.acquire() print("task1 開始打印") time.sleep(random.randint(1,3)) print("task1 打印完成") lock.release() def task2(lock): print("test2") lock.acquire() #上鎖 print("task2 開始打印") time.sleep(random.randint(1,3))#阻塞,cpu切換任務,別的任務都在鎖,回來繼續執行這個程序 print("task2 打印完成") lock.release() #解鎖 def task3(lock): print("test2") lock.acquire() # lock.acquire() #死鎖錯誤示例 print("task3 開始打印") time.sleep(random.randint(1,3)) print("task3 打印完成") lock.release() if __name__ == '__main__': lock = Lock() #一把鎖 p1 = Process(target=task1,args=(lock,)) #三個進程哪個先到先執行 p2 = Process(target=task2,args=(lock,)) p3 = Process(target=task3,args=(lock,)) p1.start() p2.start() p3.start()
-
-
互斥鎖買票示例:
-
#買票系統: #買票之前先要查票,在你查票的同時,100個人也在查看此票 #買票時,你要從服務端獲取到票數,票數>0 ,買票,然后服務端票數減一,中間有網絡延遲 #多進程原則上是不能互相通信的,他們在內存級別是有數據隔離,不代表磁盤上的數據隔離,他們可以共同操作一個文件 #多個進程搶占同一個資源,要想公平按照順序,只能串行 from multiprocessing import Process from multiprocessing import Lock import random import json import time import os def search(): time.sleep(random.random()) #一秒之內 with open("db.json", encoding="utf-8") as f1: dic = json.load(f1) print(f"剩余票數{dic['count']}") def get(): with open("db.json",encoding="utf-8") as f1: dic = json.load(f1) time.sleep(random.randint(1,3)) #時間延遲 if dic['count'] > 0: dic['count'] -= 1 with open("db.json",encoding="utf-8",mode="w") as f1: json.dump(dic,f1) print(f'{os.getpid()}用戶購買成功') else: print("沒票了") def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(5): p = Process(target=task,args=(lock,)) p.start() 缺點: 1.操作文件效率低 2.自己加鎖很麻煩,很容易出現死鎖,遞歸鎖
-
隊列:
-
進程之間的通信最好的方式是基於隊列
-
隊列是實現進程之間通信的工具,存在內存中的一個容器,最大的特點是符合先進先出的原則
-
隊列模式:
-
多個進程搶占一個資源:串行,有序以及數據安全,買票
-
多個進程實現並發的效果:生產者消費模型
-
隊列參數:
-
from multiprocessing import Queue q = Queue(3) #可以設置元素個數,當數據已經達到上限,在插入夯住 def func(): print("in func") q.put("海洋") #插入數據 q.put({"count":1}) q.put(func) q.put("333",block=False) #默認為True 當你插入的數據超過最大限度,默認阻塞 # q.put(333,timeout=8) #超過八秒在put不進數據,就會報錯 print(q.get()) print(q.get()) ret = q.get() ret() # q.get() #當你將數據取完,夯住,等待隊列put值,起另一個進程往隊列中插入數據 #q.put() #1,maxsize() #數據量不易過大,精簡的重要數據 #2,put bolck #默認為True阻塞 當你插入的數據超過最大限度,可以設置報錯 #3,put timeout #延時報錯,超過三秒在put不進數據,就會報錯 #get #2,get bolck #取值為空報錯 #3,get timeout #取值超過三秒報錯
搶售模型 (並行示例):
-
#小米:搶手機,預期發售10個,100人去搶 from multiprocessing import Queue from multiprocessing import Process import os def task(q): try: q.put(f'{os.getpid()}') except Exception: return if __name__ == '__main__': q = Queue(10) #創建隊列,可以存放十個人 for i in range(100): p = Process(target=task,args=(q,)) p.start() for i in range(1,11): #數量超過隊列會取 print(f'排名第{i}的用戶:{q.get()}') #獲取隊列中的信息,先進來的先取出來 #利用隊列進行進程之間的通信:簡單,方便,不用自己手動加鎖,隊列自帶阻塞,可持續化取數據
生產者消費者模型(並發示例):
-
利用隊列進行通信,生產者生產數據,消費者獲取數據使用,平衡了生產力和消費力,生產者和消費者是一種解耦合性(通過容器解決),可持續化取數據
-
模型,設計模式,歸一化設計,理論等等,教給你一個編程的思路,以后遇到類似的情況,以后直接調用就即可
-
生產者:生產數據的進程
-
消費者:生產出來的數據進行處理
-
#吃包子:廚師生產包子,不可能直接給你喂到嘴里,放在一個盆里,消費者從盆中取出包子食用 #三個主體:生產者(廚師),容器隊列(盤 緩沖區),消費者(人) #如果沒有容器,生產者與消費者強解耦性,不合理,所以我們要有一個容器,緩沖區平衡了生產力與消費力 # 生產者消費者多應用於並發: from multiprocessing import Queue from multiprocessing import Process import time import random def producer(name,q): for i in range(1,6): time.sleep(random.randint(1,3)) res = f'{i}號包子' q.put(res) print(f'生產者{name}:生產了{res}') def consumer(name,q): while 1: try: time.sleep(random.randint(1, 3)) ret = q.get(timeout = 5) #五秒還吃不到退出 print(f'消費者{name}:吃了{ret}') except Exception: return if __name__ == '__main__': q = Queue() #盆 p1 = Process(target=producer,args=("俊麗",q,)) #生產 p2 = Process(target=consumer,args=("海洋",q,)) #消費 p1.start() p2.start()
線程:
-
進程:進程是分配資源的基本單位,內存中開辟空間,為線程提供資源,一個程序可以開啟多個進程
-
線程:CPU調度的最小單位,執行單位,線程也被稱作為輕量級的進程,動態的
- 主線程是進程空間存活在內存中的一個必要條件
-
開啟QQ:開啟一個進程,在內存中開辟空間加載數據,啟動一個線程執行代碼
-
線程依賴進程的一個進程可以包含多個線程,但是一定有一個主線程,線程才是CPU執行的最小單元
-
進程線程對比:
-
1,開啟多進程開銷非常大,10-100倍,而開啟線程開銷非常小
-
2.開啟多進程速度慢,開啟多線程速度快
-
3.進程之間數據不共享,線程共享數據
-
-
多線程應用場景:
-
並發:一個CPU可以來回切換(線程之間切換),多進程並發,多線程的並發
-
多進程並發:開啟多個進程,並發的執行
-
多線程並發:開啟線程,並發的執行
-
如果遇到並發:多線程居多
-
開啟線程的兩種方式:
-
線程絕對要比進程開啟速度快
-
函數開啟:
-
#先打印海洋,線程要比進程速度快,如果是進程先打印主線程 from threading import Thread def task(name): print(f'{name} is running') if __name__ == '__main__': t = Thread(target=task,args=("海洋",)) t.start() print("主線程") #子進程睡眠3秒,先運行主進程 from threading import Thread import time x = 1000 def task(): time.sleep(3) print('子線程....') def main(): print('111') print('222') print('333') if __name__ == '__main__': t = Thread(target=task) t.start() main()
-
-
類開啟:
-
from threading import Thread class MyThread(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): print(f'{self.name} is running') if __name__ == '__main__': t = MyThread("海洋") t.start() print("主線程")
-
-
線程pid:
-
#主線程和子線程pid一樣 from threading import Thread import os def task(): print(f'子線程:{os.getpid()}') if __name__ == '__main__': t = Thread(target=task,) t.start() print(f"主線程:{os.getpid()}")
-
-
線程之間數據共享:
-
from threading import Thread x = 1000 def task(): global x x = 0 if __name__ == '__main__': t = Thread(target=task, ) t.start() t.join() # 告知主線程,等待子線程運行完畢在執行 print(f'主線程:{x}')
-
線程的方法:
-
from threading import Thread import threading import time def task(name): time.sleep(1) print(f'{name} is running') if __name__ == '__main__': for i in range(5): t = Thread(target=task,args=("海洋",)) t.start() #線程對象的方法 # print(t.is_alive()) #判斷線程是否存活 #threading模塊的方法 print(threading.current_thread().name) #返回線程對象.name print(threading.enumerate()) #返回列表,返回的是所有線程對象 print(threading.active_count()) #獲取活躍的線程數量(包括主線程) print("主線程")
守護線程:
-
守護線程必須等待主線程結束才結束,主線程必須等待所有的非守護線程結束才能結束,因為主線程的結束意味着進程的結束,這就是一個守護機制
-
多線程是同一個空間,同一個進程,進程代表,空間,資源,靜態的:
-
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必須在t.start()之前設置 t.start() print('主線程') print(t.is_alive()) #判斷進程是否存在也是主線程 from threading import Thread import time def foo(): print(123) time.sleep(3) print("end123") def bar(): print(456) time.sleep(1) print("end456") if __name__ == '__main__': t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon = True t1.start() t2.start() #t2非守護線程,主線程等待子線程結束 print("main-------")
線程互斥鎖:
-
join:
-
from threading import Thread import time x = 100 def task(name): global x temp = x time.sleep(3) temp -= 1 x = temp if __name__ == '__main__': t = Thread(target=task,args=("海洋",)) t.start() t.join() print(f"主線程{x}") #多個線程搶占一個資源 from threading import Thread import time x = 100 def task(name): global x temp = x time.sleep(3) temp -= 1 x = temp if __name__ == '__main__': tl = [] for i in range(100): t = Thread(target=task,args=("海洋",)) tl.append(t) t.start() for i in tl: i.join() print(f"主進程{x}") #多個線程搶占一個資源
互斥鎖:
-
所有線程串行執行,多個 線程共同搶占一個數據,保證了數據安全:
-
from threading import Thread from threading import Lock import time x = 100 def task(lock): lock.acquire() global x temp = x time.sleep(0.1) temp -= 1 x = temp lock.release() if __name__ == '__main__': lock = Lock() tl = [] for i in range(100): t = Thread(target=task,args=(lock,)) tl.append(t) t.start() for i in tl: i.join() print(f"主線程{x}") #多個線程搶占一個資源,join讓主線程等待子線程執行完成在執行,結果0
線程死鎖現象:
-
多個線程或者進程競爭資源,如果開啟的互斥鎖過多,遇到互相搶鎖造成互相等待情況,程序夯住,
-
還有一種是給同時給一個線程或者進程連續加鎖多次,利用遞歸鎖解決Rlock
-
from threading import Thread from threading import Lock import time lock_A = Lock() lock_B = Lock() class Mtthread(Thread): def run(self): self.f1() self.f2() def f1(self): lock_A.acquire() print(f"{self.name}誰拿到A鎖") lock_B.acquire() print(f"{self.name}誰拿到B鎖") lock_B.release() lock_A.release() def f2(self): lock_B.acquire() print(f"{self.name}誰拿到B鎖") time.sleep(1) lock_A.acquire() print(f"{self.name}誰拿到A鎖") lock_A.release() lock_B.release() if __name__ == '__main__': t1 = Mtthread() t1.start() t2 = Mtthread() t2.start() t3 = Mtthread() t3.start() print(f"主進程")
遞歸鎖:
-
遞歸鎖上有引用次數,每次引用計數+1,解鎖計數-1,只有計數為0.在運行下個進程
-
#遞歸鎖: #遞歸鎖是一把鎖,鎖上有記錄,只要acquire一次,鎖上就計數一次,acquire2次就計數兩次 #release 1次減一,只要遞歸鎖計數不為0,其他線程不能搶 from threading import Thread from threading import RLock import time lock_A = lock_B = RLock() class Mtthread(Thread): def run(self): # lock_A.acquire() # lock_B.acquire() # print(111) # lock_A.release() # lock_B.release() self.f1() self.f2() def f1(self): lock_A.acquire() print(f"{self.name}誰拿到A鎖") lock_B.acquire() print(f"{self.name}誰拿到B鎖") lock_B.release() lock_A.release() def f2(self): lock_B.acquire() print(f"{self.name}誰拿到B鎖") time.sleep(1) lock_A.acquire() print(f"{self.name}誰拿到A鎖") lock_A.release() lock_B.release() if __name__ == '__main__': t1 = Mtthread() t1.start() t2 = Mtthread() t2.start() t3 = Mtthread() t3.start() print(f"主進程")
信號量:
-
信號量准許多個線程或者進程同時進入
-
from threading import Thread from threading import current_thread from threading import Semaphore import time import random sm = Semaphore(4) def chi(): sm.acquire() print(f"{current_thread().name}正在吃飯") time.sleep(random.randint(1,3)) sm.release() if __name__ == '__main__': for i in range(20): t = Thread(target=chi) t.start()
GIL鎖:
-
全局解釋器鎖,就是一把互斥鎖,將並發變成串行,同一時刻只能有一個線程進入解釋器,自動加鎖和釋放鎖,犧牲效率保護python解釋器內部數據安全
-
優點:
-
強行加鎖,保證解釋器里面的數據安全
-
-
缺點:
-
多進程可以利用多核,多進程的每個進程里面都有python解釋器程序
-
單進程的多線程不能利用多核,python解釋器內部程序,不支持多線程同時解釋
-
-
討論:
-
python-單核處理IO阻塞的多線程,java多核處理IO阻塞問題,效率差不多
-
單核處理三個IO線程,多核處理三個IO線程,多核快些
-
-
代碼的執行:
-
CPython獨有GIL鎖:
-
將你的py文件當做實參傳送給解釋器傳換成c語言字節碼,在交給虛擬機轉換成010101機器碼,這些代碼都是線程執行,進程進行調度資源
-
-
lpython:交互式解釋器,可以補全代碼
-
Jpython:java語言字節碼,剩下的一樣
-
pypy:動態編譯,JAT技術,執行效率要比Cpython塊,但是技術還有缺陷bug
-
驗證Python開發效率:
-
單核CPU:
-
一核,都是單進程多線程並發快,因為單核開啟多進程也是串行。
-
-
多核CPU:
-
計算密集型:
-
多進程的並行比多線程的並發執行效率高很多(因為不同進程運行在不同核心上,並行執行)
-
-
IO密集型:
-
多線程要比多進程處理速度快,因為進程開銷大,而線程處理其實也是串行,只不過處理速度比進程更快些,線程一次只能處理一個事情(空間復用)
-
開啟150個進程(開銷大,速度慢),執行IO任務耗時長
-
開啟150個線程(開銷小,速度快),執行IO任務耗時短
-
-
-
如果你的任務是io密集型並且任務數量大,用單進程下的多線程處理阻塞效率高
-
計算密集型:
-
from multiprocessing import Process from threading import Thread import time import os # print(os.cpu_count()) def task1(): res = 1 for i in range(1, 100000000): res += i def task2(): res = 1 for i in range(1, 100000000): res += i def task3(): res = 1 for i in range(1, 100000000): res += i def task4(): res = 1 for i in range(1, 100000000): res += i if __name__ == '__main__': # 四個進程 四個cpu 並行 效率 start_time = time.time() p1 = Process(target=task1) p2 = Process(target=task2) p3 = Process(target=task3) p4 = Process(target=task4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() print(f'主: {time.time() - start_time}') # 10.125909328460693 # 一個進程 四個線程 # start_time = time.time() # p1 = Thread(target=task1) # p2 = Thread(target=task2) # p3 = Thread(target=task3) # p4 = Thread(target=task4) # # p1.start() # p2.start() # p3.start() # p4.start() # # p1.join() # p2.join() # p3.join() # p4.join() # print(f'主: {time.time() - start_time}') # 22.927688121795654
-
-
計算IO密集型:
-
from multiprocessing import Process from threading import Thread import time import os # print(os.cpu_count()) def task1(): res = 1 time.sleep(3) if __name__ == '__main__': # 開啟150個進程(開銷大,速度慢),執行IO任務, 耗時 8.382229089736938 # start_time = time.time() # l1 = [] # for i in range(150): # p = Process(target=task1) # l1.append(p) # p.start() # for i in l1: # i.join() # print(f'主: {time.time() - start_time}') # 開啟150個線程(開銷小,速度快),執行IO任務, 耗時 3.0261728763580322 # start_time = time.time() # l1 = [] # for i in range(150): # p = Thread(target=task1) # l1.append(p) # p.start() # for i in l1: # i.join() # print(f'主: {time.time() - start_time}')
-
GIL鎖和互斥鎖關系:
-
線程計算密集型:
- 當程序執行,開啟100個線程時,第一個線程先要拿到GIL鎖,然后拿到lock鎖,執行代碼,釋放lock鎖,最后釋放GIL鎖
-
線程IO密集型:
-
當程序執行,開啟100個線程時,第一個線程先要拿到GIL鎖,然后拿到lock鎖,遇到阻塞,CPU切走,GIL釋放,第一個線程掛起
-
第二個線程執行,搶到GIL鎖,進入要搶lock,但是lock鎖還沒釋放,阻塞掛起
-
-
自己加互斥鎖,一定要加在處理共享數據的地方,加的范圍不要擴大,范圍過大,影響並發
-
GIL鎖單進程的多線程不能利用多核,不能並行,但是可以並發
-
互斥鎖:
-
GIL自動上鎖解鎖,文件中的互斥鎖Lock,手動上鎖解鎖
-
GIL鎖,保護解釋器的數據安全,互斥鎖是保護的文件的數據安全
-
線程池:
-
線程池在系統啟動時創建了大量的空閑線程,線程執行直接調用線程池中已經開啟好的空閑線程,當線程執行結束,該線程不會死亡,而是將線程變成空閑狀態,放回進程池。
-
線程池提高效率,資源復用
-
進程池:放置進程的一個容器
-
線程池:放置線程的一個容器
-
完成一個簡單的socket通信,服務端必須與一個客戶端交流完畢,並且這個客戶端斷開連接之后,服務端才能接待下一個客戶:
-
#開啟進程池或者線程池: #線程池好還是進程池好:io阻塞或者計算密集型 from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor import time import os import random def task(name): # print(name) print(f"{os.getpid()}准備接客") time.sleep(random.randint(1,3)) if __name__ == '__main__': # p = ProcessPoolExecutor(max_workers=5) #限制進程數量,默認為cpu個數 p = ThreadPoolExecutor() #線程默認是CPU個數的五倍 for i in range(23): p.submit(task,1) #給進程池放置任務啟動,1為傳參
阻塞,非阻塞:
-
程序運行中的狀態,阻塞,運行,就緒
-
阻塞:當你程序遇到IO阻塞掛起,CPU切換,等到IO結束之后再執行
-
非阻塞:程序沒有IO,或者遇到IO通過某種手段讓cpu去執行其他任務,盡可能的占用CPU
同步:
-
任務發出去之后等待,直到這個任務最終結束之后,給我一個返回值,發布下一個任務
-
同步示例:
-
from concurrent.futures import ProcessPoolExecutor import os import time import random def task(): print(f"{os.getpid()}is running") time.sleep(1) return f'{os.getpid()} is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) for i in range(10): obj = p.submit(task,) print(obj.result()) #同步等待一個進程內容全部執行完成在執行下一個
異步:
-
將任務發給進程,不管任務如何,直接運行下一個
-
異步示例:
-
from concurrent.futures import ProcessPoolExecutor import os import time import random def task(): print(f'{os.getpid()} is running') time.sleep(random.randint(0,2)) return f'{os.getpid()} is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) obj_l1 = [] for i in range(10): obj = p.submit(task,) # 異步發出. obj_l1.append(obj) # time.sleep(3) p.shutdown(wait=True) # 1. 阻止在向進程池投放新任務, # 2. wait = True 十個任務是10,一個任務完成了-1,直至為零.進行下一行. for i in obj_l1: print(i.result())
異步+回調機制:
- 異步發布任務,就不管任務結果
- 回調:
- 回調是你異步發布任務執行完成后,將結果丟給回調函數add_done_callback,回調函數幫你分析結果,進程繼續完成下一個任務
- 回調就是對特定的事件或者條件進行響應
-
爬蟲:游覽器做的事情很簡單:
-
瀏覽器 封裝頭部,發送一個請求--->www.taobao.com ----> 服務器獲取到請求信息,分析正確--->給你返回一個文件,--->游覽器將這個文件的代碼渲染,就成了你看的樣子
-
爬蟲:利用reauests模塊功能模擬游覽器封裝頭,給服務器發送一個請求,騙過服務器之后,服務器也會給你返回一個文件,爬蟲拿到文件,進行數據清洗獲取到你想要的信息
-
-
爬蟲分兩步:
-
第一步:爬取服務器端的文件(IO阻塞)
-
第二部:拿到文件,進行數據分析(非IO,IO極少)
-
-
錯誤版本示例:
-
import requests from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process import time import random import os def get(url): response = requests.get(url) print(f'{os.getpid()} 正在爬取:{url}') time.sleep(random.randint(1,3)) if response.status_code == 200: return response.text def parse(text): print(f'{os.getpid()} 分析結果:{len(text)}') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.cnblogs.com/jin-xin/articles/11232151.html', 'https://www.cnblogs.com/jin-xin/articles/10078845.html', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] pool = ProcessPoolExecutor(4) obj_list = [] for url in url_list: obj = pool.submit(get, url) obj_list.append(obj) pool.shutdown(wait=True) for obj in obj_list: #抓取網頁是串行,輸出的結果 parse(obj.result()) #爬取一個網頁需要2s,並發爬取10個網頁:2.多s. #分析任務: 1s. 10s. 總共12.多秒. # 現在這個版本的過程: # 異步發出10個爬取網頁的任務,然后4個進程並發(並行)的先去完成4個爬取網頁的任務,然后誰先結束,誰進行下一個 # 爬取任務,直至10個任務全部爬取成功. # 將10個爬取結果放在一個列表中,串行的分析. import requests from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process import time import random import os def get(url): response = requests.get(url) print(f'{os.getpid()} 正在爬取:{url}') time.sleep(random.randint(1,3)) if response.status_code == 200: parse(response.text) def parse(text): print(f'{os.getpid()} 分析結果:{len(text)}') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.cnblogs.com/jin-xin/articles/11232151.html', 'https://www.cnblogs.com/jin-xin/articles/10078845.html', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get, url) # pool.shutdown(wait=True) print('主') #異步發出10個 爬取網頁+分析 的任務,然后4個進程並發(並行)的先去完成4個爬取網頁+分析 的任務, #然后誰先結束,誰進行下一個 爬取+分析 任務,直至10個爬取+分析 任務全部完成成功.
-
-
正確版本示例:
-
import requests from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process import time import random import os def get(url): response = requests.get(url) print(f'{os.getpid()} 正在爬取:{url}') if response.status_code == 200: return response.text def parse(obj): time.sleep(1) print(f'{os.getpid()} 分析結果:{len(obj.result())}') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.cnblogs.com/jin-xin/articles/11232151.html', 'https://www.cnblogs.com/jin-xin/articles/10078845.html', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] start_time = time.time() pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get, url) obj.add_done_callback(parse) # 增加一個回調函數 # 現在的進程完成的還是網絡爬取的任務,拿到了返回值之后,結果丟給回調函數add_done_callback, # 回調函數幫助你分析結果 # 進程繼續完成下一個任務. pool.shutdown(wait=True) #阻止發布新的任務,代替join print(f'主: {time.time() - start_time}') # 回調函數是主進程幫助你實現的, 回調函數幫你進行分析任務. 明確了進程的任務: 只有一個網絡爬取. # 分析任務: 回調函數執行了.對函數之間解耦. # 極值情況: 如果回調函數是IO任務,那么由於你的回調函數是主進程做的,所以有可能影響效率. # 回調不是萬能的,如果回調的任務是IO, # 那么異步 + 回調機制 不好.此時如果你要效率只能犧牲開銷,再開一個線程進程池.
-
隊列模式:
-
FIFO 先進先出原則:
-
import queue q = queue.Queue(3) q.put(1) q.put(2) q.put('海洋') print(q.get()) print(q.get()) print(q.get())
-
-
LIFO 棧.-先進后出:
-
import queue q = queue.LifoQueue() q.put(1) q.put(3) q.put('海洋') print(q.get()) print(q.get()) print(q.get())
-
-
優先級隊列:
-
# 需要元組的形式,(int,數據) int 代表優先級,數字越低,優先級越高. import queue q = queue.PriorityQueue(3) q.put((10, '垃圾消息')) q.put((-9, '緊急消息')) q.put((3, '一般消息')) print(q.get()) print(q.get()) print(q.get())
-
事件Event:
-
並發的執行某個任務,多進程多線程,幾乎同時執行,一個線程執行到中間時,通知另一個線程開始執行
-
import time from threading import Thread from threading import current_thread from threading import Event event = Event() # 默認是False def task(): print(f'{current_thread().name} 檢測服務器是否正常開啟....') time.sleep(3) # 先運行task阻塞三秒,在將event修改為True event.set() # 改成了True def task1(): print(f'{current_thread().name} 正在嘗試連接服務器') # event.wait() # 輪詢檢測event是否為True,當其為True,繼續下一行代碼. 阻塞 event.wait(1) # 設置超時時間,如果1s中以內,event改成True,代碼繼續執行. # 設置超時時間,如果超過1s中,event沒做改變,代碼繼續執行. print(f'{current_thread().name} 連接成功') if __name__ == '__main__': t1 = Thread(target=task1,) t2 = Thread(target=task1,) t3 = Thread(target=task1,) t = Thread(target=task) t.start() t1.start() t2.start() t3.start()
協程:
-
協程的本質也是一個線程,而使用協程目的是為了減少系統開銷,協程是我們通過程序來控制任務切換,協程速度比系統更快,最大限度的利用CPU,更加輕量級
-
線程協程的區別:
- 協程沒有鎖,協程又稱微線程
- 線程和協程不同的是,線程是搶占式調度切換,而協程是需要自己調度
- 線程和進程,調度是CPU決定的,而協程就是上帝,在一個線程中規定某個代碼塊的執行順序
-
1,協程切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,更加輕量級
-
2.單線程內就可以實現並發的效果,最大限度的利用CPU
-
3.修改共享的數據不需要加鎖
-
協程就像線程一樣也是在多任務間來回切換
-
在其他語言中,協程的意義不大,多線程即可以解決I/O問題,在python中有GIL鎖,在同一時間只有一個線程在工作,所以一個線程里面IO操作特別多,協程比較適用
-
串行:多個任務執行時,一個任務從開始執行,遇到IO等待,等待IO阻塞結束之后再執行下一個
-
並行:多核多個線程或者進程同時執行,四個CPU同時執行四個任務
-
並發:多個任務看起來是同時執行,CPU在多個任務之間來回切換,遇到IO阻塞,計算密集型執行時間過長
-
並發本質:遇到IO阻塞,計算密集型執行時間過長,保持原來的狀態
-
-
一個線程實現開發:
-
多進程:操作系統控制,多個進程的多個任務切換 + 保持狀態
-
多線程程:操作系統控制,多個線程的多個任務切換 + 保持狀態
-
協程:程序控制一個線程的多個任務的切換以及保持狀態
-
微並發,處理任務不宜過多
-
協程他會調度CPU,如果協程管控的任務中,遇到阻塞,他會快速的(比操作系統快),切換到另一個任務,並且能將上一個任務掛起(保持狀態),讓操作系統以為CPU一直在工作
-
-
-
串行和協程對比:
-
密集型數據串行和協程對比,肯定串行速度快,因為協程運行還要來回切換
-
import time def task1(): res = 1 for i in range(1,100000): res += i def task2(): res = 1 for i in range(1,100000): res -= i start_time = time.time() task1() task2() print(f'串行消耗時間:{time.time()-start_time}') # 串行消耗時間:0.012489557266235352 def task1(): res = 1 for i in range(1, 100000): res += i yield res def task2(): g = task1() res = 1 for i in range(1, 100000): res -= i next(g) start_time = time.time() task2() print(f'協程消耗時間:{time.time() - start_time}') # 協程消耗時間:0.02991938591003418
-
-
開啟協程:
-
遇到gevent阻塞切換:
-
import gevent import time def eat(name): print('%s eat 1' %name) # 1 gevent.sleep(2) #協程識別gevent,可以進行IO切換 # time.sleep(300) #協程不識別切換不了,不可切換 print('%s eat 2' %name) # 4 def play(name): print('%s play 1' %name) # 2 gevent.sleep(1) # time.sleep(3) print('%s play 2' %name) # 3 g1 = gevent.spawn(eat, '海洋') g2 = gevent.spawn(play, name='俊麗') #協程異步發布任務 # g1.join() # g2.join() #或者gevent.joinall([g1,g2]) gevent.joinall([g1,g2]) #主線程等待協程執行完畢 print('主') #5
-
所有IO阻塞都可以切換:
-
import threading from gevent import monkey monkey.patch_all() # 將你代碼中的所有的IO都標識. import gevent # 直接導入即可 import time def eat(): print(f'線程1:{threading.current_thread().getName()}') # 1 print('eat food 1') # 2 time.sleep(3) # 加上mokey就能夠識別到time模塊的sleep了 print('eat food 2') # 6 def play(): print(f'線程2:{threading.current_thread().getName()}') # 3 print('play 1') # 4 time.sleep(1) # 來回切換,直到一個I/O的時間結束,這里都是我們個gevent做得,不再是控制不了的操作系統了。 print('play 2') # 5 g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print(f'主:{threading.current_thread().getName()}') # 7
-