進擊のpython
並發編程——進程池線程池
本小節是對進程池線程池做一個了解同時也對回調函數有一個清晰的認識最后再提一下異步與同步
但是在提到這三個知識點之前,我們有必要基於線程的知識點進行一個練習
目的是為了能夠對以前的知識有個印象,對於接下來學這兩個知識點也有好處
練習
還是選擇套接字來進行練習,在上一部分的時候,基於套接字,我們沒有做到並發的概念
那么現在我們基於多線程來做一個並發的套接字通信:
簡單寫一個套接字通信:
# 服務端
import socket
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
while 1:
conn, address = server.accept()
while 1:
try:
msg = conn.recv(1024)
conn.send(msg.upper())
except:
break
conn.close()
server.close()
# 客戶端
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while 1:
msg = input()
client.send(msg.encode('utf-8'))
print(client.recv(1024).decode('utf-8'))
conn.close()
client.close()
然后我們嘗試着用多線程進行操作
import socket
from threading import Thread
def A():
server.bind(('127.0.0.1', 8080))
server.listen(5)
while 1:
conn, address = server.accept()
t = Thread(target=B, args=(conn,))
t.start()
def B(conn):
while 1:
try:
msg = conn.recv(1024)
conn.send(msg.upper())
except:
break
conn.close()
server.close()
if __name__ == '__main__':
server = socket.socket()
A()
很好,問題隨之就來了!
線程池進程池
根據上面的代碼,我們看,沒什么問題
但是隨着客戶端開的越來越多,就會導致客戶端的線程開的越來越多
要是成千上萬個呢?是不是電腦就死機了
所以說我們應該有一個辦法,讓服務端開啟的線程控制在一個范圍內,使電腦穩定運行
線程池進程池,我們還是可以鏡像來學,學一個,剩下的就會了
線程池開啟借助模塊里的的一個方法ThreadPoolExecutor
使用方法都是一樣的,畢竟python遵循鴨子類型
先進行實例化,參數是最大可開的線程池pool = ThreadPoolExecutor(3)
接着提交就不再是用Thread了,而是實例化對象的submit
方法,第一個參數是函數,第二個是傳參變量
示例如下:
import os
import random
import time
from concurrent.futures.thread import ThreadPoolExecutor
from threading import current_thread
def func():
print('%s is runing' % current_thread().ident)
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
pool = ThreadPoolExecutor(3)
for i in range(10):
t = pool.submit(func)
print('主')
注意調用的模塊concurrent.futures.thread
在線程中有join方法,其實在線程池也有一個類似於join的方法
shutdown()里面有一個wait的默認參數True,代表着等待子線程結束的
你可以試着運行一下下面的代碼,再把注釋打開,看看是什么樣的執行結果
import os
import random
import time
from concurrent.futures.thread import ThreadPoolExecutor
from threading import current_thread
def func():
print('%s is runing' % current_thread().ident)
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
pool = ThreadPoolExecutor(3)
for i in range(10):
t = pool.submit(func)
# pool.shutdown()
print('主')
進程池的使用和線程池是一樣的,除了ThreadPoolExecutor→ProcessPoolExecutor
那什么時候用進程池,什么時候用線程池呢?
其實就是跟什么時候用進程什么時候用線程一樣
當I/O密集的時候,用線程池;當計算密集的時候,用進程池
多提一嘴,在代碼的這部分:
for i in range(10):
t = pool.submit(func)
如果覺得麻煩,可以用
map(func,range(10))
但是這里有個陷阱,func函數也需要有一個接受值,可以不用,但是得有
import random
import time
from concurrent.futures.thread import ThreadPoolExecutor
from threading import current_thread
def func(n): # 注意這個n
print('%s is runing' % current_thread().ident)
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
pool = ThreadPoolExecutor(3)
pool.map(func, range(5))
print('主')
異步調用
跟異步調用的概念相對的還有同步調用這個概念
同步調用和異步調用其實是提交任務的兩種方式
假設我有一筐包子,然后以一幫人來吃,同步和異步就是兩種操作
同步調用
同步的特點是執行完任務之后就在原地等待,等待程序執行完畢,然后拿到結果在執行下一個任務:
import random
import time
from concurrent.futures.thread import ThreadPoolExecutor
def baozi(name):
time.sleep(random.randint(1, 3))
return {"name": name}
def zhanshi(res):
name = res["name"]
print(f'{name}吃了包子!')
if '__main__' == __name__:
t = ThreadPoolExecutor(3)
t1 = t.submit(baozi, '可樂').result()
zhanshi(t1)
t2 = t.submit(baozi, '雪碧').result()
zhanshi(t2)
t3 = t.submit(baozi, '檸檬茶').result()
zhanshi(t3)
所以這個吃包子就是這樣的:可樂過來吃,吃完了,雪碧過來吃,吃完了,檸檬茶,吃完了,程序結束
就可以發現同步調用,其實就變成串行了
異步調用
異步的特點是執行完任務之后不再等待:
import random
import time
from concurrent.futures.thread import ThreadPoolExecutor
def baozi(name):
time.sleep(random.randint(1, 3))
return {"name": name}
def zhanshi(res):
name = res["name"]
print(f'{name}吃了包子!')
if '__main__' == __name__:
t = ThreadPoolExecutor(3)
t1 = t.submit(baozi, '可樂')
t2 = t.submit(baozi, '雪碧')
t3 = t.submit(baozi, '檸檬茶')
執行了,也開了線程,但是拿不到結果,線程執行完才會有結果!那誰能知道線程執行是否完畢?
線程本身會知道!進程執行到了最后一行,就代表着線程執行完畢
此時就應該讓他去執行zhanshi()這個函數
那要怎么做呢?可能會有這樣的想法:我直接傳過去就可以了
import random
import time
from concurrent.futures.thread import ThreadPoolExecutor
def baozi(name):
time.sleep(random.randint(1, 3))
zhanshi({"name": name}) # 改動在這里
def zhanshi(res):
name = res["name"]
print(f'{name}吃了包子!')
if '__main__' == __name__:
t = ThreadPoolExecutor(3)
t1 = t.submit(baozi, '可樂')
t2 = t.submit(baozi, '雪碧')
t3 = t.submit(baozi, '檸檬茶')
這么做是沒有問題,但是在開發的考慮上來說,這兩個方法的耦合性增強了
不是說耦合增強一定不好,但是,能解耦寫就解耦寫
所以說我們應該有其他的辦法來解決這個問題:回調函數!
回調函數就是在線程執行完畢后自動執行另一個函數的方法add_done_callback()
import random
import time
from concurrent.futures.thread import ThreadPoolExecutor
def baozi(name):
time.sleep(random.randint(1, 3))
return {"name": name}
def zhanshi(res):
res = res.result()
name = res["name"]
print(f'{name}吃了包子!')
if '__main__' == __name__:
t = ThreadPoolExecutor(3)
t1 = t.submit(baozi, '可樂').add_done_callback(zhanshi)
t2 = t.submit(baozi, '雪碧').add_done_callback(zhanshi)
t3 = t.submit(baozi, '檸檬茶').add_done_callback(zhanshi)
當線程執行完了,自動執行add_done_callback()里面的函數,並將結果的對象傳給這個函數
注意!傳的是對象!而不是結果!,所以才會在函數里面有一句res = res.result()
單列出來一塊是為了解決一個問題:有的同學認為同步調用就是阻塞
首先你要知道什么是阻塞?I/O阻塞是吧,程序遇到I/O就會阻塞
那會不會有這種情況,程序的運行的是純計算的,這是不是就不存在I/O阻塞了?
但是同步提交任務,該等結果,還是得等結果!這就不是阻塞了
所以這種說法是不正確的!