一、並發編程之多進程
1.multiprocessing模塊介紹
python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。
multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。
2.Process類的介紹
創建進程的類
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)
強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹
group參數未使用,值始終為None
target表示調用對象,即子進程要執行的任務
args表示調用對象的位置參數元組,args=(1,2,'egon',)
kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
name為子進程的名稱
方法介紹
p.start():啟動進程,並調用該子進程中的p.run()
p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
屬性介紹
p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
3.Process類的使用
注意:在windows中Process()必須放到# if name == 'main':下
由於Windows沒有fork,多處理模塊啟動一個新的Python進程並導入調用模塊。
如果在導入時調用Process(),那么這將啟動無限繼承的新進程(或直到機器耗盡資源)。
這是隱藏對Process()內部調用的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。
3.1 創建開啟子進程的兩種方式
方式一
from multiprocessing import Process
import time
def task(name):
print(f"{name} is running")
time.sleep(2)
print(f"{name} is gone")
if __name__ == '__main__':
#在windos中,開啟進程必須在__name__ == '__main__'下面
p = Process(target=task,args=("zbb",)) #創建一個進程對象
p.start()
#只是向操作系統發出一個開辟子進程的信號,然后執行下一行
# 這個信號操作系統接收到之后,會從內存中開辟一個子進程空間,
# 然后在將主進程所有數據copy加載到子進程,然后在調用cpu去執行.
# 開辟子進程開銷是很大的.
print("==主開始")
time.sleep(3)
print("主結束")
# ==主開始
# zbb is running
# zbb is gone
# 主結束
方式二(了解不常用)
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self): #必須為run 否則會執行父類的,但是父類的為None
print(f"{self.name} is running")
time.sleep(2)
print(f"{self.name} is gone ")
if __name__ == '__main__':
p = MyProcess("zbb")
p.start()
print("==主")
# ==主
# zbb is running
# zbb is gone
3.2 獲取進程pid
pid是進程在內存中唯一的標識
列如,linux中 kill pid
代碼獲取
from multiprocessing import Process
import os
def task(name):
print(f'子進程:{os.getpid()}')
print(f'主進程:{os.getppid()}')
if __name__ == '__main__':
p = Process(target=task,args=('zbb',)) # 創建一個進程對象
p.start()
print(f'====主{os.getpid()}')
# ====主13548
# 子進程:1832
# 主進程:13548
win命令行獲取pid
linux中獲取
3.3驗證進程之間的空間隔離
子進程和主進程在不同的空間
from multiprocessing import Process
import time
name = '追夢NAN'
def task():
global name
name = 'zbb'
print(f'子進程{name}')
if __name__ == '__main__':
p = Process(target=task) # 創建一個進程對象
p.start()
# print('==主開始')
time.sleep(3)
print(f'主:{name}')
# 子進程zbb
# 主:追夢NAN
3.4 進程對象的join方法
join讓主進程等待子進程結束之后,在執行主進程.
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
p = Process(target=task,args=('zbb',)) # 創建一個進程對象
p.start()
p.join() #
print('==主開始')
多個子進程使用join
驗證一
from multiprocessing import Process
import time
def task(name,sec):
print(f'{name}is running')
time.sleep(sec)
print(f'{name} is gone')
if __name__ == '__main__':
start_time = time.time()
p1 = Process(target=task,args=('1',1))
p2 = Process(target=task,args=('2',2))
p3 = Process(target=task,args=('3',3))
p1.start()
p2.start()
p3.start()
p1.join() # join只針對主進程,如果join下面多次join 他是不阻塞的.
p2.join()
p3.join()
print(f'==主{time.time()-start_time}')
# 1is running
# 2is running
# 3is running
# 1 is gone
# 2 is gone
# 3 is gone
# ==主3.186117172241211
驗證2
# 多個子進程使用join
from multiprocessing import Process
import time
def task(name,sec):
print(f'{name}is running')
time.sleep(sec)
print(f'{name} is gone')
if __name__ == '__main__':
start_time = time.time()
p1 = Process(target=task,args=('1',3))
p2 = Process(target=task,args=('2',2))
p3 = Process(target=task,args=('3',1))
p1.start()
p2.start()
p3.start()
p1.join() #p1就是阻塞 走完周后才走主
print(f'==主1-{time.time() - start_time}')
p2.join()
print(f'==主2-{time.time() - start_time}')
p3.join()
print(f'==主3-{time.time()-start_time}')
# 1is running
# 2is running
# 3is running
# 3 is gone
# 2 is gone
# 1 is gone
# ==主1-3.152977705001831
# ==主2-3.152977705001831
# ==主3-3.152977705001831
優化上面代碼
from multiprocessing import Process
import time
def task(name,sec):
print(f'{name}is running')
time.sleep(sec)
print(f'{name} is gone')
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(1,4):
p=Process(target=task,args=("zbb",1))
l1.append(p)
p.start()
for i in l1:
i.join()
print(f'==主{time.time() - start_time}')
print(l1)
# zbbis running
# zbbis running
# zbbis running
# zbb is gone
# zbb is gone
# zbb is gone
# ==主1.1665570735931396
# [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>]
join就是阻塞,主進程有join,主進程下面的代碼一律不執行,直到進程執行完畢之后,在執行.
3.5進程對象的其他屬性(了解)
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
p = Process(target=task,args=('cc',),name='aaa') # 創建一個進程對象
print(p.name) #給子進程起名字
p.start()
# time.sleep(1)
p.terminate() #殺死子進程
time.sleep(0.5) #先睡一會要不然判斷還是活的
print(p.is_alive())#判斷子進程是否存活
# p.name = 'sb' #改子進程的名字
print('==主開始')
3.6 僵屍進程和孤兒進程(了解)
基於unix環境(linux,macOS)
正常:主進程需要等待子進程結束之后,主進程才結束
主進程時刻監測子進程的運行狀態,當子進程結束之后,一段時間之內,將子進程進行回收.
為什么主進程不在子進程結束后馬上對其回收呢?
1. 主進程與子進程是異步關系.主進程無法馬上捕獲子進程什么時候結束.
2. 如果子進程結束之后馬上再內存中釋放資源,主進程就沒有辦法監測子進程的狀態了.
unix針對於上面的問題,提供了一個機制.
所有的子進程結束之后,立馬會釋放掉文件的操作鏈接,內存的大部分數據,但是會保留一些內容: 進程號,結束時間,運行狀態,等待主進程監測,回收.
所有的子進程結束之后,在被主進程回收之前,都會進入僵屍進程狀態.
一:僵屍進程(有害)
僵屍進程:一個進程使用fork創建子進程,如果子進程退出,而父進程並沒有調用wait或waitpid獲取子進程的狀態信息,那么子進程的進程描述符仍然保存在系統中。這種進程稱之為僵死進程。
如果父進程不對僵屍進程進行回收(wait/waitpid),產生大量的僵屍進程,這樣就會占用內存,占用進程pid號.
僵屍進程如何解決???
父進程產生了大量子進程,但是不回收,這樣就會形成大量的僵屍進程,解決方式就是直接殺死父進程,將所有的僵屍進程變成孤兒進程進程,由init進行回收
二:孤兒進程(無害)
孤兒進程:一個父進程退出,而它的一個或多個子進程還在運行,那么那些子進程將成為孤兒進程。孤兒進程將被init進程(進程號為1)所收養,並由init進程對它們完成狀態收集工作。
4.守護進程
子進程守護着主進程,只要主進程結束,子進程跟着就結束
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
# 在windows環境下, 開啟進程必須在 __name__ == '__main__' 下面
p = Process(target=task,args=('zb',)) # 創建一個進程對象
p.daemon = True # 將p子進程設置成守護進程,只要主進程結束,守護進程馬上結束.
p.start()
time.sleep(1)
print('===主')
5.互斥鎖(進程同步控制)
多個用戶搶占一個資源時,第一個用戶先搶到了,加上鎖,用完之后才給第二個用戶使用
問題
三個同事 同時用一個打印機打印內容.
三個進程模擬三個同事, 輸出平台模擬打印機.
#版本一:
#並發是以效率優先的,但是目前我們的需求: 順序優先.
#多個進程共強一個資源時, 要保證順序優先: 串行,一個一個來.
from multiprocessing import Process
import time
import random
import os
def task1(p):
print(f'{p}開始打印了')
time.sleep(random.randint(1,3))
print(f'{p}打印結束了')
def task2(p):
print(f'{p}開始打印了')
time.sleep(random.randint(1,3))
print(f'{p}打印結束了')
if __name__ == '__main__':
p1 = Process(target=task1,args=('p1',))
p2 = Process(target=task2,args=('p2',))
p2.start()
p2.join()
p1.start()
p1.join()
#我們利用join 解決串行的問題,保證了順序優先,但是這個誰先誰后是固定的.
#這樣不合理. 你在爭搶同一個資源的時候,應該是先到先得,保證公平.
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
import os
def task1(p,lock):
'''
一把鎖不能連續鎖兩次
'''
lock.acquire()
print(f'{p}開始打印了')
time.sleep(random.randint(1,3))
print(f'{p}打印結束了')
lock.release()
def task2(p,lock):
lock.acquire()
print(f'{p}開始打印了')
time.sleep(random.randint(1,3))
print(f'{p}打印結束了')
lock.release()
if __name__ == '__main__':
mutex = Lock()
p1 = Process(target=task1,args=('p1',mutex))
p2 = Process(target=task2,args=('p2',mutex))
p2.start()
p1.start()
lock與join的區別.
共同點: 都可以把並發變成串行, 保證了順序.
不同點: join人為設定順序,lock讓其爭搶順序,保證了公平性.
6.進程之間的通信
1.基於文件通信
# 搶票系統.
# 先可以查票.查詢余票數. 並發
# 進行購買,向服務端發送請求,服務端接收請求,在后端將票數-1,返回到前端. 串行.
# 當多個進程共強一個數據時,如果要保證數據的安全,必須要串行.
# 要想讓購買環節進行串行,我們必須要加鎖處理.
from multiprocessing import Process
from multiprocessing import Lock
import json
import time
import os
import random
def search():
time.sleep(random.randint(1,3)) # 模擬網絡延遲(查詢環節)
with open('ticket.json',encoding='utf-8') as f1:
dic = json.load(f1)
print(f'{os.getpid()} 查看了票數,剩余{dic["count"]}')
def paid():
with open('ticket.json', encoding='utf-8') as f1:
dic = json.load(f1)
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(random.randint(1,3)) # 模擬網絡延遲(購買環節)
with open('ticket.json', encoding='utf-8',mode='w') as f1:
json.dump(dic,f1)
print(f'{os.getpid()} 購買成功')
def task(lock):
search()
lock.acquire()
paid()
lock.release()
if __name__ == '__main__':
mutex = Lock()
for i in range(6):
p = Process(target=task,args=(mutex,))
p.start()
# 當很多進程共強一個資源(數據)時, 你要保證順序(數據的安全),一定要串行.
# 互斥鎖: 可以公平性的保證順序以及數據的安全.
# 基於文件的進程之間的通信:
# 效率低.
# 自己加鎖麻煩而且很容易出現死鎖.
2.基於隊列通信
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
隊列: 把隊列理解成一個容器,這個容器可以承載一些數據,
隊列的特性: 先進先出永遠保持這個數據.
.
創建隊列的類(底層就是以管道和鎖定的方式實現):
1 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
參數介紹:
1 maxsize是隊列中允許最大項數,省略則無大小限制。
主要方法:
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常
q.get方法可以從隊列讀取並且刪除一個元素。
同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。
如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
9 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
其他方法(了解):
q.cancel_join_thread():不會在進程退出時自動連接后台線程。可以防止join_thread()方法阻塞
q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,后台線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
q.join_thread():連接隊列的后台線程。此方法用於在調用q.close()方法之后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為
from multiprocessing import Queue
q = Queue(3) # maxsize
q.put(1)
q.put('alex')
q.put([1,2,3])
# q.put(5555,block=False) # 當隊列滿了時,在進程put數據就會阻塞.
#
print(q.get())
print(q.get())
print(q.get())
print(q.get(timeout=3)) # 阻塞3秒,3秒之后還阻塞直接報錯.
# print(q.get(block=False)) # 當數據取完時,在進程get數據也會出現阻塞,直到某一個進程put數據.
# block=False 只要遇到阻塞就會報錯.
3.基於管道
管道是有問題的,管道會造成數據的不安全,官方給予的解釋是管道有可能會造成數據損壞。
7.生產者和消費者
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
1.為什么要使用生產者和消費者模式
線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
2.什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
起到緩沖的作用,平衡生產力與消費力,解耦.
from multiprocessing import Process
from multiprocessing import Queue
import time
import random
def producer(q,name):
for i in range(1,6):
time.sleep(random.randint(1,2))
res = f'{i}號包子'
q.put(res)
print(f'生產者{name} 生產了{res}')
def consumer(q,name):
while 1:
try:
food = q.get(timeout=3)
time.sleep(random.randint(1, 3))
print(f'\033[31;0m消費者{name} 吃了{food}\033[0m')
except Exception:
return
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer,args=(q,'路人甲'))
p2 = Process(target=consumer,args=(q,'路人乙'))
p1.start()
p2.start()
二、並發編程之多線程
1. threading模塊介紹
multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面,有很大的相似性,因而不再詳細介紹
官方解釋https://docs.python.org/3/library/threading.html?highlight=threading#
2.開啟線程的兩種方式
from threading import Thread
import time
def task(name):
print(f"{name} is runing!")
time.sleep(2)
print(f"{name} is stop")
t1 =Thread(target=task,args=("mysql",))
t1.start()
print('===主線程') # 線程是沒有主次之分的(為了好記)
第二種了解即可(不常用)
from threading import Thread
import time
class MyThread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
print(f'{self.name} is running!')
time.sleep(1)
print(f'{self.name} is stop')
t1 = MyThread("mysql")
t1.start()
print('主')
3.線程vs進程的代碼對比
1.多線程與多進程開啟速度區別
多進程
先執行主進程
from multiprocessing import Process
def work():
print('hello')
if __name__ == '__main__':
#在主進程下開啟線程
t=Process(target=work)
t.start()
print('主線程/主進程')
多線程
先執行子線程
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(1)
print(f'{name} is gone')
if __name__ == '__main__':
t1 = Thread(target=task,args=('A',))
t1.start()
print('===主線程')
2.對比pid
進程
from multiprocessing import Process
import time
import os
def task(name):
print(f'子進程: {os.getpid()}')
print(f'主進程: {os.getppid()}')
if __name__ == '__main__':
p1 = Process(target=task,args=('A',)) # 創建一個進程對象
p2 = Process(target=task,args=('B',)) # 創建一個進程對象
p1.start()
p2.start()
print(f'==主{os.getpid()}')
# ==主17444
# 子進程: 8636
# 主進程: 17444
# 子進程: 14200
# 主進程: 17444
線程
資源共享
from threading import Thread
import os
def task():
print(os.getpid())
if __name__ == '__main__':
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
print(f'===主線程{os.getpid()}')
# 18712
# 18712
# ===主線程18712
3.同一進程內線程共享內部數據
from threading import Thread
import os
x = 3
def task():
global x
x = 100
if __name__ == '__main__':
t1 = Thread(target=task)
t1.start()
# t1.join()
print(f'===主線程{x}')
# 同一進程內的資源數據對於這個進程的多個線程來說是共享的.
4.線程的其他方法
Thread實例對象的方法
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設置線程名。
threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
from threading import currentThread
from threading import enumerate
from threading import activeCount
import os
import time
x = 3
def task():
# print(currentThread())
time.sleep(1)
print('666')
print(123)
if __name__ == '__main__':
t1 = Thread(target=task,name='線程1')
t2 = Thread(target=task,name='線程2')
# name 設置線程名
t1.start()
t2.start()
# time.sleep(2)
print(t1.isAlive()) # 判斷線程是否活着
print(t1.getName()) # 獲取線程名
t1.setName('子線程-1')
print(t1.name) # 獲取線程名 ***
# threading方法
print(currentThread()) # 獲取當前線程的對象
print(enumerate()) # 返回一個列表,包含所有的線程對象
print(activeCount()) # ***返回正在運行的線程數量,
print(f'===主線程{os.getpid()}')
5.join 線程中的用法
join: 阻塞 告知主線程要等待我子線程執行完畢之后再執行主線程
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(1)
print(f'{name} is gone')
if __name__ == '__main__':
start_time = time.time()
t1 = Thread(target=task,args=('A',))
t2 = Thread(target=task,args=('B',))
t1.start()
t1.join()
t2.start()
t2.join()
print(f'===主線程{time.time() - start_time}')
6.守護線程
需要強調的是:運行完畢並非終止運行
#1.對主進程來說,運行完畢指的是主進程代碼運行完畢
#2.對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢
如果守護線程的生命周期小於其他線程,則啊肯定先結束,否則等待其他非守護線程和主線程結束
詳細解釋:
#1 主進程在其代碼結束后就已經算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產生僵屍進程),才會結束,
#2 主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味着進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢后才能結束。
from threading import Thread
import time
def sayhi(name):
print('你滾!')
time.sleep(2)
print('%s say hello' %name) #主線程結束了不執行
if __name__ == '__main__':
t = Thread(target=sayhi,args=('A',))
# t.setDaemon(True) #必須在t.start()之前設置
t.daemon = True
t.start() # 線程的開啟速度比進程快很多
print('主線程')
# 你滾!
# 主線程
主線程什么時候結束???
守護線程 等待非守護子線程以及主線程結束之后,結束.
from threading import Thread
import time
def foo():
print(123) # 1
time.sleep(3)
print("end123") #等帶時間太長了了,子進程已經執行完了,守護進程掛掉不執行
def bar():
print(456) # 2
time.sleep(1)
print("end456") # 4
t1=Thread(target=foo)
t2=Thread(target=bar)
t1.daemon=True
t1.start()
t2.start()
print("main-------") # 3
# 123
# 456
# main-------
# end456
7.互斥鎖
多線程的同步鎖與多進程的同步鎖是一個道理,就是多個線程搶占同一個數據(資源)時,我們要保證數據的安全,合理的順序。
多個任務公搶一個數據,保證數據的安全的目的,要讓其串行
from threading import Thread
from threading import Lock
import time
import random
x = 100
def task(lock):
lock.acquire()
# time.sleep(random.randint(1,2))
global x
temp = x
time.sleep(0.01)
temp = temp - 1
x = temp
lock.release()
if __name__ == '__main__':
mutex = Lock()
l1 = []
for i in range(100):
t = Thread(target=task,args=(mutex,))
l1.append(t)
t.start()
time.sleep(3)
print(f'主線程{x}')
8.死鎖現象和遞歸鎖
進程也有死鎖與遞歸鎖,進程的死鎖和遞歸鎖與線程的死鎖遞歸鎖同理
from threading import Thread
from threading import Lock
import time
lock_A = Lock()
lock_B = Lock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f'{self.name}拿到了A鎖')
lock_B.acquire()
print(f'{self.name}拿到了B鎖')
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f'{self.name}拿到了B鎖')
time.sleep(0.1)
lock_A.acquire()
print(f'{self.name}拿到了A鎖')
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
遞歸鎖有一個計數的功能, 原數字為0,上一次鎖,計數+1,釋放一次鎖,計數-1
只要遞歸鎖上面的數字不為零,其他線程就不能搶鎖.
#遞歸鎖可以解決死鎖現象,業務需要多個鎖時,先要考慮遞歸鎖.
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f'{self.name}拿到了A鎖')
lock_B.acquire()
print(f'{self.name}拿到了B鎖')
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f'{self.name}拿到了B鎖')
lock_A.acquire()
print(f'{self.name}拿到了A鎖')
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
9.信號量
也是一種鎖, 控制並發數量
同進程的一樣
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。
實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):
from threading import Thread, Semaphore, current_thread
import time
import random
sem = Semaphore(5) #廁所只有五個五位置 出一個進入一個
def task():
sem.acquire()
print(f'{current_thread().name} 廁所')
time.sleep(random.randint(1,3))
sem.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task,)
t.start()
10.Python GIL
GIL全局解釋器鎖
好多自稱大神的說,GIL鎖就是python的致命缺陷,Python不能多核,並發不行等等 .....
這篇文章透徹的剖析了GIL對python多線程的影響,
強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf
為什么加鎖?
-
當時都是單核時代,而且cpu價格非常貴.
-
如果不加全局解釋器鎖, 開發Cpython解釋器的程序員就會在源碼內部各種主動加鎖,解鎖,非常麻煩,各種死鎖現象等等.他為了省事兒,直接進入解釋器時給線程加一個鎖.
優點: 保證了Cpython解釋器的數據資源的安全.
缺點: 單個進程的多線程不能利用多核.
Jpython沒有GIL鎖.
pypy也沒有GIL鎖.
現在多核時代, 我將Cpython的GIL鎖去掉行么?
因為Cpython解釋器所有的業務邏輯都是圍繞着單個線程實現的,去掉這個GIL鎖,幾乎不可能.
單個進程的多線程可以並發,但是不能利用多核,不能並行.
多個進程可以並發,並行.
IO密集型
計算密集型
11.GIL與lock鎖的區別
相同點: 都是同種鎖,互斥鎖.
不同點:
GIL鎖全局解釋器鎖,保護解釋器內部的資源數據的安全.
GIL鎖 上鎖,釋放無需手動操作.
自己代碼中定義的互斥鎖保護進程中的資源數據的安全.
自己定義的互斥鎖必須自己手動上鎖,釋放鎖.
詳解聯系
因為Python解釋器幫你自動定期進行內存回收,你可以理解為python解釋器里有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內存數據是可以被清空的,此時你自己的程序 里的線程和 py解釋器自己的線程是並發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這可以說是Python早期版本的遺留問題。
12.驗證計算密集型IO密集型的效率
io密集型:
from threading import Thread
from multiprocessing import Process
import time
import random
def task():
count = 0
time.sleep(random.randint(1,3))
count += 1
if __name__ == '__main__':
#多進程的並發,並行
start_time = time.time()
l1 = []
for i in range(50):
p = Process(target=task,)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f'執行效率:{time.time()- start_time}') # 4.41826057434082
#多線程的並發
start_time = time.time()
l1 = []
for i in range(50):
p = Thread(target=task,)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f'執行效率:{time.time()- start_time}') # 3.0294392108917236
# 對於IO密集型: 單個進程的多線程的並發效率高.
計算密集型:
from threading import Thread
from multiprocessing import Process
import time
def task():
count = 0
for i in range(10000000):
count += 1
if __name__ == '__main__':
#多進程的並發,並行
start_time = time.time()
l1 = []
for i in range(4):
p = Process(target=task,)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f'執行效率:{time.time()- start_time}') # 1.1186981201171875
#多線程的並發
start_time = time.time()
l1 = []
for i in range(4):
p = Thread(target=task,)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f'執行效率:{time.time()- start_time}') # 2.729006767272949
# 總結: 計算密集型: 多進程的並發並行效率高.
13.多線程實現socket通信
無論是多線程還是多進程,如果按照,來一個客戶端請求,我就開一個線程,來一個請求開一個線程,
應該是這樣: 你的計算機允許范圍內,開啟的線程進程數量越多越好.
服務端
import socket
from threading import Thread
def communicate(conn,addr):
while 1:
try:
from_client_data = conn.recv(1024)
print(f'來自客戶端{addr[1]}的消息: {from_client_data.decode("utf-8")}')
to_client_data = input('>>>').strip()
conn.send(to_client_data.encode('utf-8'))
except Exception:
break
conn.close()
def _accept():
server = socket.socket()
server.bind(('127.0.0.1', 8848))
server.listen(5)
while 1:
conn, addr = server.accept()
t = Thread(target=communicate,args=(conn,addr))
t.start()
if __name__ == '__main__':
_accept()
客戶端
import socket
client = socket.socket()
client.connect(('127.0.0.1',8848))
while 1:
try:
to_server_data = input('>>>').strip()
client.send(to_server_data.encode('utf-8'))
from_server_data = client.recv(1024)
print(f'來自服務端的消息: {from_server_data.decode("utf-8")}')
except Exception:
break
client.close()
14 . 進程池,線程池
為什么要將進程池和線程池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures導入就可以直接用他們兩個了
線程池: 一個容器,這個容器限制住你開啟線程的數量,比如4個
,第一次肯定只能並發的處理4個任務,只要有任務完成,線程馬上就會接下一個任務.
以時間換空間.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
import random
def task(n):
print(f'{os.getpid()} 接客')
time.sleep(random.randint(1,3))
if __name__ == '__main__':
# 開啟進程池 (並行(並行+並發))
p = ProcessPoolExecutor() # 默認不寫,進程池里面的進程數與cpu個數相等
for i in range(20): #發布20個任務,cpu個數的進程處理
p.submit(task,i)
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
import random
def task(n):
print(f'{os.getpid()} 接客')
time.sleep(random.randint(1,3))
if __name__ == '__main__':
# 開啟進程池 (並行(並行+並發))
p = ThreadPoolExecutor() # 默認不寫,進程池里面的線程數與cpu個數相等
for i in range(20): #發布20個任務,cpu個數的進程處理
p.submit(task,i)
15.阻塞,非阻塞,同步,異步
執行的角度
阻塞:程序運行時,遇到IO、程序掛起CPU被切走
非阻塞:程序沒有遇到IO,程序遇到IO但是通過某種手段,讓cpu強行運行我的程序。
提交任務的角度
同步:提交一個任務,自任務開始運行直到此任務結束(可能有IO),返回一個返回值之后,我再提交下一個任務
異步:一次性提交多個任務,然后我就直接執行下一行代碼
16.同步調用,異步調用
異步調用
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f'{os.getpid()}開始任務')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}任務結束')
return i
if __name__ == '__main__':
# 異步調用
pool = ProcessPoolExecutor()
for i in range(10):
pool.submit(task,i)
pool.shutdown(wait=True)
# shutdown: 讓我的主進程等待進程池中所有的子進程都結束任務之后,在執行. 有點類似與join.
# shutdown: 在上一個進程池沒有完成所有的任務之前,不允許添加新的任務.
# 一個任務是通過一個函數實現的,任務完成了他的返回值就是函數的返回值.
print('===主')
同步調用
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f'{os.getpid()}開始任務')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}任務結束')
return i
if __name__ == '__main__':
# 同步調用
pool = ProcessPoolExecutor()
for i in range(10):
obj = pool.submit(task,i)
# obj是一個動態對象,返回的當前的對象的狀態,有可能運行中,可能(就緒阻塞),還可能是結束了.
# obj.result() 必須等到這個任務完成后,返回了結果之后,在執行下一個任務.
print(f'任務結果:{obj.result()}')
pool.shutdown(wait=True)
print('===主')
17.異步調用+回調函數
瀏覽器工作原理, 向服務端發送一個請求,服務端驗證你的請求,如果正確,給你的瀏覽器返回一個文件,
瀏覽器接收到文件,將文件里面的代碼渲染成你看到的漂亮美麗的模樣.
什么叫爬蟲?
- 利用代碼模擬一個瀏覽器,進行瀏覽器的工作流程得到一堆源代碼.
- 對源代碼進行數據清洗得到我想要數據.
pip install requests
import requests
ret = requests.get('http://www.baidu.com')
if ret.status_code == 200:
print(ret.text)
基於 異步調用回收所有任務的結果我要做到實時回收結果,
並發執行任務每個任務只是處理IO阻塞的,不能增加新得功能.
異步調用 + 回調函數
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import requests
def task(url):
'''模擬的就是爬取多個源代碼 一定有IO操作'''
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(obj):
'''模擬對數據進行分析 一般沒有IO'''
print(len(obj.result()))
if __name__ == '__main__':
# 開啟線程池,並發並行的執行
url_list = [
'http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/',
]
pool = ThreadPoolExecutor(4)
for url in url_list:
obj = pool.submit(task, url)
obj.add_done_callback(parse) #回調函數
'''
線程池設置4個線程, 異步發起10個任務,每個任務是通過網頁獲取源碼, 並發執行,
當一個任務完成之后,將parse這個分析代碼的任務交由剩余的空閑的線程去執行,你這個線程繼續去處理其他任務.
如果進程池+回調: 回調函數由主進程去執行.
如果線程池+回調: 回到函數由空閑的線程去執行.
'''
異步 回調是一回事兒?
異步站在發布任務的角度,
站在接收結果的角度: 回調函數 按順序接收每個任務的結果,進行下一步處理.
異步 + 回調:
異步處理的IO類型.
回調處理非IO
18.線程隊列queque
1.先進先出
import queue
#先進先出
q = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
# print(q.get(block=False))#遇到阻塞直接報錯
q.get(timeout=3) #阻塞2秒沒有值直接報錯
2.后進先出,堆棧
q =queue.LifoQueue(4)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
print(q.get())
print(q.get())
print(q.get())
print(q.get())
3.優先級隊列
q = queue.PriorityQueue(4)
q.put((5,"z"))
q.put((0,"b"))
q.put((-1,"2"))
q.put((-1,"3"))
#同一級別按照accik碼 一般不會存在同一級別的
print(q.get())
print(q.get())
print(q.get())
print(q.get())
19.事件event
開啟兩個線程,一個線程運行到中間的某個階段,觸發另個線程執行.兩個線程增加了耦合性.
from threading import Thread,current_thread,Event
import time
event = Event()
def check():
print(f"{current_thread().name}檢測服務器是否開啟")
time.sleep(3)
# print(event.is_set())#判斷set是否存在
event.set()
# print(event.is_set())#再set之后顯示T
print('服務器已經開啟')
def connect():
print(f'{current_thread().name}等待連接..')
event.wait() #阻塞 知道event.set()執行后
# event.wait(1)#只阻塞1秒,一秒之后執行
print(f"{current_thread().name}連接成功")
t1 = Thread(target=check,)
t2 = Thread(target=connect,)
t1.start()
t2.start()
20.協程
協程:是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。
單個cpu: 10個任務,讓你給我並發的執行這個10個任務:
1. 方式一:開啟多進程並發執行, 操作系統切換+保持狀態.
2. 方式二:開啟多線程並發執行,操作系統切換+保持狀態.
3. 方式三:開啟協程並發的執行, 自己的程序 把控着cpu 在3個任務之間來回切換+保持狀態.
對3詳細解釋: 協程他切換速度非常快,蒙蔽操作系統的眼睛,讓操作系統認為cpu一直在運行你這一個線程(協程.)
需要強調的是:
#1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)
#2. 單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比操作系統控制線程的切換,用戶在單線程內控制協程的切換
優點如下:
#1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級
#2. 單線程內就可以實現並發的效果,最大限度地利用cpu
缺點如下:
#1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程
總結協程特點:
- 必須在只有一個單線程里實現並發
- 修改共享數據不需加鎖
- 用戶程序里自己保存多個控制流的上下文棧
1.Greenlet
如果我們在單個線程內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然后再調用send。。。非常麻煩),而使用greenlet模塊可以非常簡單地實現這20個任務直接的切換
#安裝
pip3 install greenlet
# 切換 +保持狀態(遇到IO不會主動切換)
#真正的協程模塊就是使用greenlet完成的切換
from greenlet import greenlet
def eat(name):
print('%s eat 1' %name) #2
g2.switch('taibai') #3
print('%s eat 2' %name) #6
g2.switch() #7
def play(name):
print('%s play 1' %name) #4
g1.switch() #5
print('%s play 2' %name) #8
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('taibai')#可以在第一次switch時傳入參數,以后都不需要 1
工作中:
一般在工作中我們都是進程+線程+協程的方式來實現並發,以達到最好的並發效果,如果是4核的cpu,一般起5個進程,每個進程中20個線程(5倍cpu數量),每個線程可以起500個協程,大規模爬取頁面的時候,等待網絡延遲的時間的時候,我們就可以用協程去實現並發。 並發數量 = 5 * 20 * 500 = 50000個並發,這是一般一個4cpu的機器最大的並發數。nginx在負載均衡的時候最大承載量就是5w個
單線程里的這20個任務的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模塊。
2.Gevent
#安裝
pip3 install gevent
Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
用法
g1=gevent.spawn(func,1,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,后面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的,spawn是異步提交任務
g2=gevent.spawn(func2)
g1.join() #等待g1結束
g2.join() #等待g2結束 有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,但是你會發現,如果g2里面的任務執行的時間長,但是不寫join的話,就不會執行完等到g2剩下的任務了
#或者上述兩步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
最終版本
import gevent
from gevent import monkey
monkey.patch_all() # 打補丁: 將下面的所有的任務的阻塞都打上標記
def eat(name):
print('%s eat 1' %name)
time.sleep(2)
print('%s eat 2' %name)
def play(name):
print('%s play 1' %name)
time.sleep(1)
print('%s play 2' %name)
g1 = gevent.spawn(eat,'egon')
g2 = gevent.spawn(play,name='egon')
# g1.join()
# g2.join()
gevent.joinall([g1,g2])