3.協程篇¶
去年微信公眾號就陸陸續續發布了,我一直以為博客也匯總同步了,這幾天有朋友說一直沒找到,遂發現,的確是漏了,所以補上一篇
在線預覽:https://github.lesschina.com/python/base/concurrency/4.並發編程-協程篇.html
示例代碼:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutine
多進程和多線程切換之間也是有資源浪費的,相比而言協程更輕量級
3.1.知識回顧¶
1.裝飾器¶
往期文章:https://www.cnblogs.com/dotnetcrazy/p/9333792.html#2.Python裝飾器
基礎拓展篇已經講的很透徹了,就不再雷同了,貼一個簡單案例,然后擴展說說可迭代
、迭代器
和生成器
% time
from functools import wraps
def log(func):
@wraps(func)
def wrapper(*args,**kv):
print("%s log_info..." % func.__name__)
return func(*args,**kv)
return wrapper
@log
def login_out():
print("已經退出登錄")
def main():
# @wraps(func) 可以使得裝飾前后,方法簽名一致
print(f"方法簽名:{login_out.__name__}")
login_out()
# @wraps能讓你通過屬性 __wrapped__ 直接訪問被包裝函數
login_out.__wrapped__() # 執行原來的函數
if __name__ == '__main__':
main()
2.迭代器¶
往期文章:https://www.cnblogs.com/dotnetcrazy/p/9278573.html#6.Python迭代器
過於基礎的就不說了,簡單說下,然后舉一個OOP
的Demo
:
- 判斷是否可迭代:(能不能for遍歷)
from collections.abc import Iterable
isinstance(xxx, Iterable)
- 判斷是否是迭代器:(能不能
next(xxx)
遍歷)from collections.abc import Iterator
isinstance(xxx, Iterable)
- PS:迭代器是一定可以迭代的
- 可迭代對象轉迭代器:(生成器都是迭代器)
- 把
list、dict、str
等Iterable
變成Iterator
可以使用iter()
函數 eg:iter([])
(節省資源) - PS:生成器都是
Iterator
對象,但list、dict、str雖然是Iterable
,卻不是Iterator
- 把
提醒一下:from collections import Iterable, Iterator # 現在已經不推薦使用了(3.8會棄用)
查看一下typing.py
的源碼就知道了:
# 模仿collections.abc中的那些(Python3.7目前只是過渡的兼容版,沒有具體實現)
def _alias(origin, params, inst=True):
return _GenericAlias(origin, params, special=True, inst=inst)
T_co = TypeVar('T_co', covariant=True) # Any type covariant containers.
Iterable = _alias(collections.abc.Iterable, T_co)
Iterator = _alias(collections.abc.Iterator, T_co)
之前說了個 CSharp 的 OOP Demo,這次來個Python
的,我們來一步步演變:
% time
# 導入相關模塊
from collections.abc import Iterable, Iterator
# from collections import Iterable, Iterator # 現在已經不推薦使用了(3.8會棄用)
# 定義一個Class
class MyArray(object):
pass
# 是否可迭代 False
isinstance(MyArray(),Iterable)
# 是否是迭代器 False
isinstance(MyArray(),Iterator)
# 如果Class里面含有`__iter__`方法就是可迭代的
# 重新定義測試:
class MyArray(object):
def __iter__(self):
pass
# 是否可迭代 False
isinstance(MyArray(),Iterable)
# 是否是迭代器 False
isinstance(MyArray(),Iterator)
這時候依然不是迭代器
這個可以類比C#:
- 能不能foreach就看你遍歷對象有沒有實現IEnumerable,就說明你是不是一個可枚舉類型(enumerator type)
- 是不是個枚舉器(enumerator)就看你實現了IEnumerator接口沒
// 能不能foreach就看你遍歷對象有沒有實現IEnumerable,就說明你是不是一個可枚舉類型
public interface IEnumerable
{
IEnumerator GetEnumerator();
}
// 是不是個枚舉器(enumerator)就看你實現了IEnumerator接口沒
public interface IEnumerator
{
object Current { get; }
bool MoveNext();
void Reset();
}
先看看Python對於的類吧:
# https://github.com/lotapp/cpython3/blob/master/Lib/_collections_abc.py
class Iterable(metaclass=ABCMeta):
__slots__ = ()
@abstractmethod
def __iter__(self):
while False:
yield None
@classmethod
def __subclasshook__(cls, C):
if cls is Iterable:
return _check_methods(C, "__iter__")
return NotImplemented
class Iterator(Iterable):
__slots__ = ()
@abstractmethod
def __next__(self):
'Return the next item from the iterator. When exhausted, raise StopIteration'
raise StopIteration
def __iter__(self):
return self
@classmethod
def __subclasshook__(cls, C):
if cls is Iterator:
return _check_methods(C, '__iter__', '__next__')
return NotImplemented
讀源碼的好處來了==>抽象方法:@abstractmethod(子類必須實現)
,上次漏講了吧~
上面說迭代器肯定可以迭代,說很抽象,代碼太直觀了 (繼承):class Iterator(Iterable)
現在我們來模仿並實現一個Python
版本的迭代器
:
% time
# 先搭個空架子
class MyIterator(Iterator):
def __next__(self):
pass
class MyArray(Iterable):
def __iter__(self):
return MyIterator() # 返回一個迭代器
def main():
# 可迭代 True
print(isinstance(MyArray(), Iterable))
# 迭代器也是可迭代的 True
print(isinstance(MyIterator(), Iterable))
# 是迭代器 True
print(isinstance(MyIterator(), Iterator))
if __name__ == '__main__':
main()
% time
# 把迭代器簡化合並
class MyIterator(Iterator):
def __next__(self):
pass
def __iter__(self):
return self # 返回一個迭代器(現在就是它自己了)
def main():
print(isinstance(MyIterator(), Iterable))
print(isinstance(MyIterator(), Iterator))
if __name__ == '__main__':
main()
% time
# 馬上進入正題了,先回顧一下Fibona
def fibona(n):
a, b = 0, 1
for i in range(n):
a, b = b, a+b
print(a)
# 獲取10個斐波拉契數列
fibona(10)
% time
# 改造成迭代器
from collections.abc import Iterable, Iterator
class FibonaIterator(Iterator):
def __init__(self, n):
self.__a = 0
self.__b = 1
self.__n = n # 獲取多少個
self.__index = 0 # 當前索引
def __next__(self):
if self.__index < self.__n:
self.__index += 1
# 生成下一波
self.__a, self.__b = self.__b, self.__a + self.__b
return self.__a
else:
raise StopIteration # for循環結束條件
def main():
print(FibonaIterator(10))
for i in FibonaIterator(10):
print(i)
if __name__ == "__main__":
main()
3.生成器¶
往期文章:https://www.cnblogs.com/dotnetcrazy/p/9278573.html#5.Python生成器
生成器是啥?看源碼就秒懂了:(迭代器的基礎上再封裝)
class Generator(Iterator):
__slots__ = ()
def __next__(self):
"""從生成器返回下一個item,結束的時候拋出 StopIteration"""
return self.send(None)
@abstractmethod
def send(self, value):
"""將值發送到生成器。返回下一個產生的值或拋出StopIteration"""
raise StopIteration
@abstractmethod
def throw(self, typ, val=None, tb=None):
"""在生成器中引發異常。返回下一個產生的值或拋出StopIteration"""
if val is None:
if tb is None:
raise typ
val = typ()
if tb is not None:
val = val.with_traceback(tb)
raise val
# 現在知道之前close后為啥沒異常了吧~
def close(self):
"""屏蔽異常"""
try:
self.throw(GeneratorExit)
except (GeneratorExit, StopIteration):
pass
else:
raise RuntimeError("generator ignored GeneratorExit")
@classmethod
def __subclasshook__(cls, C):
if cls is Generator:
return _check_methods(C, '__iter__', '__next__',
'send', 'throw', 'close')
return NotImplemented
迭代器的基礎上再封裝了兩個抽象方法send
、throw
和屏蔽異常的方法close
現在用生成器的方式改寫下斐波拉契數列:(列表推導式改成小括號是最簡單的一種生成器)
% time
# 代碼瞬間就簡潔了
def fibona(n):
a = 0
b = 1
for _ in range(n):
a, b = b, a + b
yield a # 加個yiel就變成生成器了
def main():
print(fibona(10))
for i in fibona(10):
print(i)
if __name__ == "__main__":
main()
注意下這幾點:
- generator剛啟動的時候,要么 next(),要么 send(None),不然會引發:
TypeError: can't send non-None value to a just-started generator
- 在一個generator函數中,遇到return或者break則直接拋出StopIteration終止迭代
- 如果沒有則默認執行至函數完畢
- 如果想要拿到返回值,必須捕獲
StopIteration
錯誤,返回值包含在StopIteration
的value
中
def test_send(n):
for i in range(n):
if i==2:
return "i==2"
yield i
g = test_send(5)
while True:
try:
tmp = next(g)
print(tmp)
except StopIteration as ex:
print(ex.value)
break
輸出:
0
1
i==2
其他的也沒什么好說的了,讀完源碼再看看之前講的內容別有一番滋味在心頭
哦~
3.2.概念篇¶
上集回顧:網絡:靜態服務器+壓測
1.同步與異步¶
同步是指一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成。
異步是指不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什么工作。然后繼續執行下面代碼邏輯,只要自己完成了整個任務就算完成了(異步一般使用狀態、通知和回調)
PS:項目里面一般是這樣的:(個人經驗)
- 同步架構:一般都是和錢相關的需求,需要實時返回的業務
- 異步架構:更多是對寫要求比較高時的場景(同步變異步)
- 讀一般都是實時返回,代碼一般都是
await xxx()
- 讀一般都是實時返回,代碼一般都是
- 想象個情景就清楚了:
- 異步:現在用戶寫了篇文章,可以異步操作,就算沒真正寫到數據庫也可以返回:發表成功(大不了失敗提示一下)
- 同步:用戶獲取訂單信息,你如果異步就會這樣了:提示下獲取成功,然后一片空白...用戶不卸載就怪了...
2.阻塞與非阻塞¶
阻塞是指調用結果返回之前,當前線程會被掛起,一直處於等待消息通知,不能夠執行其他業務(大部分代碼都是這樣的)
非阻塞是指在不能立刻得到結果之前,該函數不會阻塞當前線程,而會立刻返回(繼續執行下面代碼,或者重試機制走起)
PS:項目里面重試機制為啥一般都是3次?
- 第一次重試,兩台PC掛了也是有可能的
- 第二次重試,負載均衡分配的三台機器同時掛的可能性不是很大,這時候就有可能是網絡有點擁堵了
- 最后一次重試,再失敗就沒意義了,日記寫起來,再重試網絡負擔就加大了,得不償失了
3.五種IO模型¶
對於一次IO訪問,數據會先被拷貝到內核的緩沖區中,然后才會從內核的緩沖區拷貝到應用程序的地址空間。需要經歷兩個階段:
- 准備數據
- 將數據從內核緩沖區拷貝到進程地址空間
由於存在這兩個階段,Linux產生了下面五種IO模型(以socket為例
)
- 阻塞式IO:
- 當用戶進程調用了
recvfrom
等阻塞方法時,內核進入IO的第1個階段:准備數據(內核需要等待足夠的數據再拷貝)這個過程需要等待,用戶進程會被阻塞,等內核將數據准備好,然后拷貝到用戶地址空間,內核返回結果,用戶進程才從阻塞態進入就緒態 - Linux中默認情況下所有的socket都是阻塞的
- 當用戶進程調用了
- 非阻塞式IO:
- 當用戶進程發出read操作時,如果
kernel
中的數據還沒有准備好,那么它並不會block
用戶進程,而是立刻返回一個error
。 - 用戶進程判斷結果是一個
error
時,它就知道數據還沒有准備好,於是它可以再次發送read操作 - 一旦
kernel
中的數據准備好了,並且又再次收到了用戶進程的system call
,那么它馬上就將數據拷貝到了用戶內存,然后返回 - 非阻塞IO模式下用戶進程需要不斷地詢問內核的數據准備好了沒有
- 當用戶進程發出read操作時,如果
- IO多路復用:
- 通過一種機制,一個進程可以監視多個文件描述符(套接字描述符)一旦某個文件描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作(這樣就不需要每個用戶進程不斷的詢問內核數據准備好了沒)
- 常用的IO多路復用方式有
select
、poll
和epoll
- 信號驅動IO:(之前我們講進程先導篇的時候說過)
- 內核文件描述符就緒后,通過信號通知用戶進程,用戶進程再通過系統調用讀取數據。
- 此方式屬於同步IO(實際讀取數據到用戶進程緩存的工作仍然是由用戶進程自己負責的)
- 異步IO(
POSIX
的aio_
系列函數)- 用戶進程發起read操作之后,立刻就可以開始去做其它的事。內核收到一個異步
IO read
之后,會立刻返回,不會阻塞用戶進程。 - 內核會等待數據准備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,內核會給用戶進程發送一個
signal
告訴它read操作完成了
- 用戶進程發起read操作之后,立刻就可以開始去做其它的事。內核收到一個異步
4.Unix圖示¶
貼一下Unix編程里面的圖:
3.3.IO多路復用¶
開始之前咱們通過非阻塞IO引入一下:(來個簡單例子socket.setblocking(False)
)
import time
import socket
def select(socket_addr_list):
for client_socket, client_addr in socket_addr_list:
try:
data = client_socket.recv(2048)
if data:
print(f"[來自{client_addr}的消息:]\n")
print(data.decode("utf-8"))
client_socket.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
)
else:
# 沒有消息是觸發異常,空消息是斷開連接
client_socket.close() # 關閉客戶端連接
socket_addr_list.remove((client_socket, client_addr))
print(f"[客戶端{client_addr}已斷開連接,當前連接數:{len(socket_addr_list)}]")
except Exception:
pass
def main():
# 存放客戶端集合
socket_addr_list = list()
with socket.socket() as tcp_server:
# 防止端口綁定的設置
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
tcp_server.setblocking(False) # 服務端非阻塞
while True:
try:
client_socket, client_addr = tcp_server.accept()
client_socket.setblocking(False) # 客戶端非阻塞
socket_addr_list.append((client_socket, client_addr))
except Exception:
pass
else:
print(f"[來自{client_addr}的連接,當前連接數:{len(socket_addr_list)}]")
# 防止客戶端斷開后出錯
if socket_addr_list:
# 輪詢查看客戶端有沒有消息
select(socket_addr_list) # 引用傳參
time.sleep(0.01)
if __name__ == "__main__":
main()
輸出:
可以思考下:
- 為什么Server也要設置為非阻塞?
- PS:一個線程里面只能有一個死循環,現在程序需要兩個死循環,so ==> 放一起咯
- 斷開連接怎么判斷?
- PS:沒有消息是觸發異常,空消息是斷開連接
- client_socket為什么不用dict存放?
- PS:dict在循環的過程中,del會引發異常
1.Select¶
select和上面的有點類似,就是輪詢的過程交給了操作系統:
kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程
來個和上面等同的案例:
import select
import socket
def main():
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
socket_info_dict = dict()
socket_list = [tcp_server] # 監測列表
while True:
# 劣勢:select列表數量有限制
read_list, write_list, error_list = select.select(
socket_list, [], [])
for item in read_list:
# 服務端迎接新的連接
if item == tcp_server:
client_socket, client_address = item.accept()
socket_list.append(client_socket)
socket_info_dict[client_socket] = client_address
print(f"[{client_address}已連接,當前連接數:{len(socket_list)-1}]")
# 客戶端發來
else:
data = item.recv(2048)
if data:
print(data.decode("utf-8"))
item.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
)
else:
item.close()
socket_list.remove(item)
info = socket_info_dict[item]
print(f"[{info}已斷開,當前連接數:{len(socket_list)-1}]")
if __name__ == "__main__":
main()
輸出和上面一樣
擴展說明:
select 函數監視的文件描述符分3類,分別是
writefds
、readfds
、和exceptfds
。調用后select函數會阻塞,直到有描述符就緒函數返回(有數據可讀、可寫、或者有except)或者超時(timeout指定等待時間,如果立即返回設為null即可)select的一個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024(64位=>2048)
然后Poll就出現了,就是把上限給去掉了,本質並沒變,還是使用的輪詢
2.EPoll¶
epoll在內核2.6中提出(Linux獨有),使用一個文件描述符管理多個描述符,將用戶關心的文件描述符的事件存放到內核的一個事件表中,采用監聽回調的機制,這樣在用戶空間和內核空間的copy只需一次,避免再次遍歷就緒的文件描述符列表
先來看個案例吧:(輸出和上面一樣)
import socket
import select
def main():
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
# epoll是linux獨有的
epoll = select.epoll()
# tcp_server注冊到epoll中
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
# key-value
fd_socket_dict = dict()
# 回調需要自己處理
while True:
# 返回可讀寫的socket fd 集合
poll_list = epoll.poll()
for fd, event in poll_list:
# 服務器的socket
if fd == tcp_server.fileno():
client_socket, client_addr = tcp_server.accept()
fd = client_socket.fileno()
fd_socket_dict[fd] = (client_socket, client_addr)
# 把客戶端注冊進epoll中
epoll.register(fd, select.EPOLLIN | select.EPOLLET)
else: # 客戶端
client_socket, client_addr = fd_socket_dict[fd]
data = client_socket.recv(2048)
print(
f"[來自{client_addr}的消息,當前連接數:{len(fd_socket_dict)}]\n")
if data:
print(data.decode("utf-8"))
client_socket.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
)
else:
del fd_socket_dict[fd]
print(
f"[{client_addr}已離線,當前連接數:{len(fd_socket_dict)}]\n"
)
# 從epoll中注銷
epoll.unregister(fd)
client_socket.close()
if __name__ == "__main__":
main()
擴展:epoll的兩種工作模式
LT(level trigger,水平觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序可以不立即處理該事件。下次調用epoll_wait時,會再次響應應用程序並通知此事件。LT模式是默認的工作模式。 LT模式同時支持阻塞和非阻塞socket。
ET(edge trigger,邊緣觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序必須立即處理該事件。如果不處理,下次調用epoll_wait時,不會再次響應應用程序並通知此事件。 ET是高速工作方式,只支持非阻塞socket(ET模式減少了epoll事件被重復觸發的次數,因此效率要比LT模式高)
Code提煉一下:
- 實例化對象:
epoll = select.epoll()
- 注冊對象:
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
- 注銷對象:
epoll.unregister(fd)
PS:epoll
不一定比Select
性能高,一般都是分場景的:
- 高並發下,連接活躍度不高時:epoll比Select性能高(eg:web請求,頁面隨時關閉)
- 並發不高,連接活躍度比較高:Select更合適(eg:小游戲)
- Select是win和linux通用的,而epoll只有linux有
其實IO多路復用還有一個kqueue
,和epoll
類似,下面的通用寫法中有包含
3.通用寫法(Selector
)¶
一般來說:Linux下使用epoll,Win下使用select(IO多路復用會這個通用的即可)
先看看Python源代碼:
# 選擇級別:epoll|kqueue|devpoll > poll > select
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
實戰案例:(可讀和可寫可以不分開)
import socket
import selectors
# Linux下使用epoll,Win下使用select
Selector = selectors.DefaultSelector()
class Task(object):
def __init__(self):
# 存放客戶端fd和socket鍵值對
self.fd_socket_dict = dict()
def run(self):
self.server = socket.socket()
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(('', 8080))
self.server.listen()
# 把Server注冊到epoll
Selector.register(self.server.fileno(), selectors.EVENT_READ,
self.connected)
def connected(self, key):
"""客戶端連接時處理"""
client_socket, client_address = self.server.accept()
fd = client_socket.fileno()
self.fd_socket_dict[fd] = (client_socket, client_address)
# 注冊一個客戶端讀的事件(服務端去讀消息)
Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)
print(f"{client_address}已連接,當前連接數:{len(self.fd_socket_dict)}")
def call_back_reads(self, key):
"""客戶端可讀時處理"""
# 一個fd只能注冊一次,監測可寫的時候需要把可讀給注銷
Selector.unregister(key.fd)
client_socket, client_address = self.fd_socket_dict[key.fd]
print(f"[來自{client_address}的消息:]\n")
data = client_socket.recv(2048)
if data:
print(data.decode("utf-8"))
# 注冊一個客戶端寫的事件(服務端去發消息)
Selector.register(key.fd, selectors.EVENT_WRITE,
self.call_back_writes)
else:
client_socket.close()
del self.fd_socket_dict[key.fd]
print(f"{client_address}已斷開,當前連接數:{len(self.fd_socket_dict)}")
def call_back_writes(self, key):
"""客戶端可寫時處理"""
Selector.unregister(key.fd)
client_socket, client_address = self.fd_socket_dict[key.fd]
client_socket.send(b"ok")
Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)
def main():
t = Task()
t.run()
while True:
ready = Selector.select()
for key, obj in ready:
# 需要自己回調
call_back = key.data
call_back(key)
if __name__ == "__main__":
main()
Code提煉一下:
- 實例化對象:
Selector = selectors.DefaultSelector()
- 注冊對象:
Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
- 注銷對象:
Selector.unregister(key.fd)
- 注意一下:一個fd只能注冊一次,監測可寫的時候需要把可讀給注銷(反之一樣)
業余拓展:
select, iocp, epoll,kqueue及各種I/O復用機制
https://blog.csdn.net/shallwake/article/details/5265287
kqueue用法簡介
http://www.cnblogs.com/luminocean/p/5631336.html
3.4.協程引入¶
1.yield from¶
我們經常有這樣的需求:讀取兩個分表的數據列表,然后合並之后進行一些處理
平時可以借用itertools.chain
來遍歷:
# https://docs.python.org/3/library/itertools.html#itertools.chain
import itertools
def main():
# 模擬分表后的兩個查詢結果
user1 = ["小張", "小明"]
user2 = ["小潘", "小周"]
# dict只能遍歷key(這種情況需要自己封裝合並方法並處理下)
user3 = {"name": "test1", "name1": "test2"}
# 需求:合並並遍歷
for item in itertools.chain(user1, user2, user3):
print(item)
if __name__ == '__main__':
main()
輸出:
小張
小明
小潘
小周
name
name1
它的內部實現其實是這樣的:(相當於兩層遍歷,用yield返回
)
def my_chain(*args, **kwargs):
for items in args:
for item in items:
yield item
def main():
# 模擬分表后的兩個查詢結果
user1 = ["小張", "小明"]
user2 = ["小潘", "小周"]
# dict只能遍歷key(這種情況需要自己封裝合並方法並處理下)
user3 = {"name": "test1", "name1": "test2"}
# 需求:合並並遍歷
for item in my_chain(user1, user2, user3):
print(item)
if __name__ == '__main__':
main()
然后Python3.3
之后語法再一步簡化(yield from iterable對象
)
def my_chain(*args, **kwargs):
for items in args:
yield from items
def main():
# 模擬分表后的兩個查詢結果
user1 = ["小張", "小明"]
user2 = ["小潘", "小周"]
# 需求:合並並遍歷
for item in my_chain(user1, user2):
print(item)
if __name__ == '__main__':
main()
輸出:
小張
小明
小潘
小周
test1
test2
擴展(可忽略)¶
其實知道了內部實現,很容易就寫上一段應對的處理:
def my_chain(*args, **kwargs):
for my_iterable in args:
# 如果是字典類型就返回value
if isinstance(my_iterable, dict):
my_iterable = my_iterable.values()
for item in my_iterable:
yield item
def main():
# 模擬分表后的兩個查詢結果
user1 = ["小張", "小明"]
user2 = ["小潘", "小周"]
# dict只能遍歷key(這種情況需要自己封裝合並方法並處理下)
user3 = {"name": "test1", "name1": "test2"}
# 需求:合並並遍歷
for item in my_chain(user1, user2, user3):
print(item)
if __name__ == '__main__':
main()
輸出:
小張
小明
小潘
小周
test1
test2
擴展的正確處理¶
PS:一般不會這么干的,一般都是[{},{}]
遍歷並處理:
import itertools
def main():
# 模擬分表后的兩個查詢結果
user1 = [{"name": "小張"}, {"name": "小明"}]
user2 = [{"name": "小潘"}, {"name": "小周"}]
user3 = [{"name": "test1"}, {"name": "test2"}]
# 需求:合並並遍歷
for item in itertools.chain(user1, user2, user3):
# 一般都是直接在這里進行處理
for key, value in item.items():
print(value)
if __name__ == '__main__':
main()
1.yield版協程¶
協程的目的其實很簡單:像寫同步代碼那樣實現異步編程
先看個需求:生成繪圖的數據(max,min,avg
)
比如說原來數據是這樣的:
products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
處理之后:
new_products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99],
"max": 120,
"min": 76,
"avg": 96.0
},
{
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89],
"max": 89,
"min": 30,
"avg": 61.25
}]
處理過的數據一般用來畫圖,實際效果類似於:
如果不借助協程,我們一般這么處理:(數據庫獲取過程省略)
# 生成新的dict數據
def get_new_item(item):
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
return item
def get_new_data(data):
newdata = []
for item in data:
new_item = get_new_item(item)
# print(new_item) # 處理后的新dict
newdata.append(new_item)
return newdata
def main():
# 需求:生成繪圖的數據(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
new_products = get_new_data(products)
print(new_products)
if __name__ == "__main__":
main()
改成yield版的協程也很方便,基本上代碼沒有變,也不用像IO多路復用那樣來回的回調
# 生成新的dict數據
def get_new_item(item):
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
yield item
def get_new_data(data):
for item in data:
yield from get_new_item(item)
def main():
# 需求:生成繪圖的數據(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
new_products = list()
# 如果需要返回值就捕獲StopIteration異常
for item in get_new_data(products):
new_products.append(item)
print(new_products)
if __name__ == "__main__":
main()
簡單解析一下:(用yield from
的目的就是為了引出等會說的async/await
)
yield from
(委托生成器get_new_data
)的好處就是讓調用方(main
)和yield
子生成器(get_new_item
)直接建立一個雙向通道
你也可以把yield from
當作一個中介(如果不理解就把yield from
想象成await
就容易理解了),本質就是下面代碼:
# 生成新的數據
def get_new_data(data):
for item in data:
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
yield item
def main():
# 需求:生成繪圖的數據(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
new_products = list()
for item in get_new_data(products):
new_products.append(item)
print(new_products)
if __name__ == "__main__":
main()
PEP 380(含分析)¶
yield from
內部其實在yield
基礎上做了很多事情(比如一些異常的處理),具體可以看看 PEP 380
先提煉一個簡版
的:
# 正常調用
RESULT = yield from EXPR
# _i:子生成器(也是個迭代器)
# _y:子生成器生產的值
# _r:yield from 表達式最終結果
# _s:調用方通過send發送的值
# _e:異常對象
# 內部原理
_i = iter(EXPR) # EXPR是一個可迭代對象,_i是子生成器
try:
# 第一次不能send值,只能next() or send(None),並把產生的值放到_y中
_y = next(_i)
except StopIteration as _e:
# 如果子生成器直接就return了,那就會拋出異常,通過value可以拿到子生成器的返回值
_r = _e.value
else:
# 嘗試進行循環(調用方和子生成器交互過程),yield from這個生成器會阻塞(委托生成器)
while 1:
# 這時候子生成器已經和調用方建立了雙向通道,在等待調用方send(value),把這個值保存在_s中
_s = yield _y # 這邊還會進行一系列異常處理,我先刪掉,等會看
try:
# 如果send(None),那么繼續next遍歷
if _s is None:
_y = next(_i) # 把子生成器結果放到 _y 中
else:
_y = _i.send(_s) # 如果調用方send一個值,就轉發到子生成器
except StopIteration as _e:
_r = _e.value # 如果子生成器遍歷完了,就把返回值給_r
break
RESULT = _r # 最終的返回值(yield from 最終的返回值)
現在再來看完整版
壓力就沒有那么大了:
# 正常調用
RESULT = yield from EXPR
# _i:子生成器(也是個迭代器)
# _y:子生成器生產的值
# _r:yield from 表達式最終結果
# _s:調用方通過send發送的值
# _e:異常對象
# 內部原理
_i = iter(EXPR) # EXPR是一個可迭代對象,_i是子生成器
try:
# 第一次不能send值,只能next() or send(None),並把產生的值放到_y中
_y = next(_i)
except StopIteration as _e:
# 如果子生成器直接就return了,那就會拋出異常,通過value可以拿到子生成器的返回值
_r = _e.value
else:
# 嘗試進行循環(調用方和子生成器交互過程),yield from這個生成器會阻塞(委托生成器)
while 1:
try:
# 這時候子生成器已經和調用方建立了雙向通道,在等待調用方send(value),把這個值保存在_s中
_s = yield _y
# 【現在補全】有這么幾種情況需要處理
# 1.子生成器可能只是一個迭代器,並不能作為協程的生成器(不支持throw和close)
# 2.子生成器雖然支持了throw和close,但在子生成器內部兩種方法都會拋出異常
# 3.調用法調用了gen.throw(),想讓子生成器自己拋異常
# 這時候就要處理 gen.close() 和 gen.throw()的情況
# 生成器close()異常的處理
except GeneratorExit as _e:
try:
_m = _i.close
except AttributeError:
pass # 屏蔽close的異常
else:
_m()
raise _e # 上拋異常
# 生成器throw()異常的處理
except BaseException as _e:
_x = sys.exc_info()
try:
_m = _i.throw
except AttributeError:
raise _e
else:
try:
_y = _m(*_x)
except StopIteration as _e:
_r = _e.value
break
else:
try:
# 如果send(None),那么繼續next遍歷
if _s is None:
_y = next(_i) # 把子生成器結果放到 _y 中
else:
_y = _i.send(_s) # 如果調用方send一個值,就轉發到子生成器
except StopIteration as _e:
_r = _e.value # 如果子生成器遍歷完了,就把返回值給_r
break
RESULT = _r # 最終的返回值(yield from 最終的返回值)
2.async/await¶
把上面的原生代碼用async和await
改裝一下:(協程的目的就是像寫同步代碼一樣寫異步,這個才算是真做到了)
import asyncio
# 生成新的dict數據
async def get_new_item(item):
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
return item
async def get_new_data(data):
newdata = []
for item in data:
new_item = await get_new_item(item)
# print(new_item) # 處理后的新dict
newdata.append(new_item)
return newdata
def main():
# 需求:生成繪圖的數據(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
# python 3.7
new_products = asyncio.run(get_new_data(products))
print(new_products)
if __name__ == "__main__":
main()
輸出:(是不是很原生代碼沒啥區別?)
[{'id': 2344, 'title': '御泥坊補水面膜', 'price': [89, 76, 120, 99], 'avg': 96.0, 'max': 120, 'min': 76},
{'id': 2345, 'title': '御泥坊火山泥面膜', 'price': [30, 56, 70, 89], 'avg': 61.25, 'max': 89, 'min': 30}]
下級預估:asyncio
3.5.asyncio¶
官方文檔:https://docs.python.org/3/library/asyncio.html
開發中常見錯誤:https://docs.python.org/3/library/asyncio-dev.html
代碼示例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutine
PS:asyncio
是Python
用於解決異步IO
編程的一整套
解決方案
3.5.1.上節回顧¶
上次說了下協程演變過程,這次繼續,先接着上次的說:
像JS
是可以生成器和async
和await
混用的,那Python
呢?(NetCore不可以混用)
import types
# 和生成器完全分開了,不過可以理解為yield from
@types.coroutine
def get_value(value):
yield value
async def get_name(name):
# 一系列邏輯處理
return await get_value(name)
if __name__ == '__main__':
gen = get_name("小明")
print(gen.send(None))
# 直接混用會報錯:TypeError: object generator can't be used in 'await' expression
我們的async
和await
雖然和yield from
不是一個概念,但是可以理解為yield from
上面這段代碼你可以理解為:
import types
def get_value(value):
yield value
# 這個async和await替換成yield from
def get_name(name):
# 一系列邏輯處理
yield from get_value(name)
if __name__ == '__main__':
gen = get_name("小明")
print(gen.send(None))
PS:Python默認和NetCore一樣,不能直接混用,如果你一定要混用,那么得處理下(使用@asyncio.coroutine
也行)
3.5.2.asyncio引入¶
在今天之前,協程我們是這么實現的:事件循環(loop)
+回調(驅動生成器)
+IO多路復用(epoll)
現在可以通過官方提供的asyncio
(可以理解為協程池)來實現了(第三方還有一個uvloop
【基於C寫的libuv
庫(nodejs
也是基於這個庫)】)
PS:uvloop
的使用非常簡單,只要在獲取事件循環前將asyncio
的事件循環策略設置為uvloop
的:asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
1.簡單案例¶
先看個簡單的協程案例:
import types
import asyncio
# 模擬一個耗時操作
async def test():
print("start...")
# 不能再使用以前阻塞的暫停了
await asyncio.sleep(2)
print("end...")
return "ok"
if __name__ == '__main__':
import time
start_time = time.time()
# # >=python3.4
# # 返回asyncio的事件循環
# loop = asyncio.get_event_loop()
# # 運行事件循環,直到指定的future運行完畢,返回結果
# result = loop.run_until_complete(test())
# print(result)
# python3.7
result = asyncio.run(test())
print(result)
print(time.time() - start_time)
輸出:
start...
end...
ok
2.001772403717041
簡單說下,asyncio.run
是python3.7才簡化出來的語法(類比NetCore的Task.Run
)看看源碼就知道了:
# https://github.com/lotapp/cpython3/blob/master/Lib/asyncio/runners.py
def run(main, *, debug=False):
# 以前是直接使用"asyncio.get_event_loop()"(開發人員一般都習慣這個了)
# 3.7開始推薦使用"asyncio.get_running_loop()"來獲取正在運行的loop(獲取不到就拋異常)
if events._get_running_loop() is not None:
raise RuntimeError("無法從正在運行的事件循環中調用asyncio.run()")
if not coroutines.iscoroutine(main):
raise ValueError("{!r}應該是一個協程".format(main))
loop = events.new_event_loop() # 創建一個新的事件循環
try:
events.set_event_loop(loop) # 設置事件循環
loop.set_debug(debug) # 是否調試運行(默認否)
return loop.run_until_complete(main) # 等待運行
finally:
try:
_cancel_all_tasks(loop) # 取消其他任務
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
新版本其實就是使用了一個新的loop
去啟動run_until_complete
PS:uvloop
也可以這樣去使用:獲取looploop = uvloop.new_event_loop()
再替換原生的loopasyncio.set_event_loop(loop)
3.5.3.批量任務¶
1.舊版本實現¶
import asyncio
# 模擬一個耗時操作
async def test(i):
print("start...")
# 不能再使用以前阻塞的暫停了
await asyncio.sleep(2)
print("end...")
return i
if __name__ == '__main__':
import time
start_time = time.time()
# # >=python3.4
loop = asyncio.get_event_loop()
# tasks = [asyncio.ensure_future(test(i)) for i in range(10)]
# 注意:是loop的方法,而不是asyncio的,不然就會引發RuntimeError:no running event loop
tasks = [loop.create_task(test(i)) for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print(task.result())
print(time.time() - start_time)
輸出:(tasks替換成這個也一樣:tasks = [asyncio.ensure_future(test(i)) for i in range(10)]
)
start...
start...
start...
start...
start...
start...
start...
start...
start...
start...
end...
end...
end...
end...
end...
end...
end...
end...
end...
end...
0
1
2
3
4
5
6
7
8
9
2.028331995010376
然后我們再看看這個asyncio.wait
是個啥:(回顧:https://www.cnblogs.com/dotnetcrazy/p/9528315.html#wait()說明)
# return_when 這個參數和之前一樣
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
# 官方准備在未來版本廢棄它的loop參數
# 和concurrent.futures里面的wait不一樣,這邊是個協程
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
平時使用可以用高級APIasyncio.gather(*tasks)
來替換asyncio.wait(tasks)
1.舊版另用¶
PS:官方推薦使用create_task的方式
來創建一個任務
import asyncio
# 模擬一個耗時操作
async def test(i):
print("start...")
# 不能再使用以前阻塞的暫停了
await asyncio.sleep(2)
print("end...")
return i
async def main():
tasks = [test(i) for i in range(10)]
# await task 可以得到返回值(得到結果或者異常)
# for task in asyncio.as_completed(tasks):
# try:
# print(await task)
# except Exception as ex:
# print(ex)
return [await task for task in asyncio.as_completed(tasks)]
if __name__ == '__main__':
import time
start_time = time.time()
# old推薦使用
loop = asyncio.get_event_loop()
result_list = loop.run_until_complete(main())
print(result_list)
print(time.time() - start_time)
輸出:(PS:用asyncio.gather(*tasks)
直接替換asyncio.wait(tasks)
也行)
start...
start...
start...
start...
start...
start...
start...
start...
start...
start...
end...
end...
end...
end...
end...
end...
end...
end...
end...
end...
[1, 6, 4, 5, 0, 7, 8, 3, 2, 9]
2.0242035388946533
其實理解起來很簡單,而且和NetCore
以及NodeJS
它們統一了,只要是await xxx
就返回一個(結果
|異常
),不await
就是一個task對象
2.新版本實現¶
import asyncio
# 模擬一個耗時操作
async def test(i):
print("start...")
await asyncio.sleep(2)
print("end...")
return i
async def main():
tasks = [test(i) for i in range(10)]
# 給`協程/futures`返回一個future聚合結果
return await asyncio.gather(*tasks) # 記得加*來解包
if __name__ == '__main__':
import time
start_time = time.time()
# python3.7
result_list = asyncio.run(main())
print(result_list)
# 2.0259485244750977
print(time.time() - start_time)
輸出:(語法簡化太多了,用起來特別簡單)
start...
start...
start...
start...
start...
start...
start...
start...
start...
start...
end...
end...
end...
end...
end...
end...
end...
end...
end...
end...
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2.00840163230896
關於參數需要加*
解包的說明 ==> 看看函數定義就秒懂了:
# 給 協程/futures 返回一個future聚合結果
def gather(*coros_or_futures, loop=None, return_exceptions=False):
pass
# 把協程或者awaitable對象包裹成task
def ensure_future(coro_or_future, *, loop=None):
pass
# 傳入一個協程對象,返回一個task對象
class BaseEventLoop(events.AbstractEventLoop):
def create_task(self, coro):
pass
關於高級和低級API的說明¶
asyncio的高級(high-level
)API一般用於這幾個方面:(開發基本夠用了)
- 並行運行Python協同程序並完全控制它們的執行
- 網絡通信(
IO
)和進程間通信(IPC
) - 子進程(
subprocesses
)相關 - 通過隊列(
Queue
)分配任務(Tasks
) - 同步(
synchronize
)並發代碼
低級(low-level
)API一般這么用:(事件循環和回調會用下,其他基本不用)
- 創建和管理事件循環,為網絡、子進程、信號處理(
Signal
)等提供異步(asynchronous
)API - 為傳輸使用高效協議
- 使用
async/await
語法橋接基於回調的庫和代碼
3.5.4.回調函數¶
回調一般不利於代碼維護,現在基本上是盡量不用了(異步代碼用起來都和同步沒多大差別了,回調也就沒那么大用處了)
1.回調函數獲取返回值¶
上面說的獲取返回值,其實也可以通過回調函數來獲取:
# 低級API示例
import asyncio
async def get_html(url):
print(f"get {url} ing")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
def call_back(task):
print(type(task))
print(task.result())
if __name__ == "__main__":
import time
start_time = time.time()
urls = [
"https://www.baidu.com", "https://www.sogou.com",
"https://www.python.org", "https://www.asp.net"
]
tasks = set() # 任務集合
loop = asyncio.get_event_loop()
for url in urls:
# task = asyncio.ensure_future(get_html(url))
task = loop.create_task(get_html(url))
# 設置回調函數
task.add_done_callback(call_back)
# 添加到任務集合中
tasks.add(task)
# 批量執行
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time() - start_time)
輸出:(task.add_done_callback(回調函數)
)
get https://www.baidu.com ing
get https://www.sogou.com ing
get https://www.python.org ing
get https://www.asp.net ing
<class '_asyncio.Task'>
<h1>This is a test for https://www.baidu.com</h1>
<class '_asyncio.Task'>
<h1>This is a test for https://www.python.org</h1>
<class '_asyncio.Task'>
<h1>This is a test for https://www.sogou.com</h1>
<class '_asyncio.Task'>
<h1>This is a test for https://www.asp.net</h1>
2.0168468952178955
2.回調函數傳參擴展¶
實例:
import asyncio
import functools
async def get_html(url):
await asyncio.sleep(2)
return "This is a test for"
# 注意一個東西:通過偏函數傳過來的參數在最前面
def call_back(url, task):
# do something
print(type(task))
print(task.result(), url)
if __name__ == "__main__":
import time
start_time = time.time()
urls = [
"https://www.baidu.com", "https://www.sogou.com",
"https://www.python.org", "https://www.asp.net"
]
tasks = set() # 任務集合
loop = asyncio.get_event_loop()
for url in urls:
# task = asyncio.ensure_future(get_html(url))
task = loop.create_task(get_html(url))
# 設置回調函數 (不支持傳參數,我們就利用偏函數來傳遞)
task.add_done_callback(functools.partial(call_back, url))
# 添加到任務集合中
tasks.add(task)
# 批量執行
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time() - start_time)
輸出:(PS:通過偏函數傳過來的參數在最前面)
<class '_asyncio.Task'>
This is a test for https://www.baidu.com
<class '_asyncio.Task'>
This is a test for https://www.python.org
<class '_asyncio.Task'>
This is a test for https://www.sogou.com
<class '_asyncio.Task'>
This is a test for https://www.asp.net
2.0167236328125
3.5.5.異常相關¶
之前說的await task
可能得到結果也可能得到異常有些人可能還不明白 ==> 其實你把他看出同步代碼(PS:協程的目的就是像寫同步代碼一樣進行異步編程)就好理解了,函數執行要么得到結果要么得到返回值
看個異常的案例:
import asyncio
async def get_html(url):
print(f"get {url} ing")
if url == "https://www.asp.net":
raise Exception("Exception is over")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
tasks = [get_html(url) for url in urls]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
import time
start_time = time.time()
try:
asyncio.run(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
輸出:(和同步代碼沒差別,可能出異常的部分加個異常捕獲即可)
get https://www.baidu.com ing
get https://www.asp.net ing
get https://www.python.org ing
get https://www.sogou.com ing
Exception is over
0.008000373840332031
再一眼舊版怎么用:(PS:基本差不多,下次全部用新用法了)
import asyncio
async def get_html(url):
print(f"get {url} ing")
if url == "https://www.asp.net":
raise Exception("Exception is over")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
tasks = set() # 任務集合
tasks = [get_html(url) for url in urls]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
try:
# 批量執行
loop.run_until_complete(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
常見異常¶
Python3調試過程中的常見異常:https://www.cnblogs.com/dotnetcrazy/p/9192089.html
asyncio中常見異常¶
官方文檔:https://docs.python.org/3/library/asyncio-exceptions.html
asyncio.TimeoutError(Exception.Error)
:- 任務超時引發的異常
asyncio.CancelledError(Exception.Error)
:- 任務取消引發的異常
asyncio.InvalidStateError(Exception.Error)
:Task/Future
內部狀態無效引發
asyncio.IncompleteReadError(Exception.Error)
:讀取未完成引發的錯誤:- 不完整: 在到達流結束之前讀取字節字符串(讀取了不完整的字符串就轉換了)
- 不清楚讀多少: 預期讀取的字節總數未知
asyncio.LimitOverrunError(Exception)
:- 超出緩沖區引發的異常
asyncio.SendfileNotAvailableError(Exception.ReferenceError.RuntimeError)
:- 系統調用不適用於給定的套接字或文件類型(系統調用類型不匹配導致的)
Python常見異常¶
有些異常官方沒有寫進去,我補了一些常用的異常:https://docs.python.org/3/library/exceptions.html
BaseException
SystemExit
:sys.exit()
引發的異常(目的:讓Python解釋器退出)KeyboardInterrupt
:用戶Ctrl+C終止程序引發的異常GeneratorExit
:生成器或者協程關閉的時候產生的異常(特別注意)Exception
:所有內置異常(非系統退出)或者用戶定義異常的基類asyncio.Error
asyncio.CancelledError
asyncio.TimeoutError
:和Exception.OSError.TimeoutError
區分開asyncio.InvalidStateError
:Task/Future
內部狀態無效引發
asyncio.LimitOverrunError
:超出緩沖區引發的異常StopIteration
:next()、send()
引發的異常:https://www.cnblogs.com/dotnetcrazy/p/9278573.html#6.Python迭代器
StopAsyncIteration
:__anext__()
引發的異常- ArithmeticError
- FloatingPointError
- OverflowError
- ZeroDivisionError
AssertionError
:當斷言assert
語句失敗時引發AttributeError
:當屬性引用或賦值失敗時引發- BufferError
EOFError
asyncio.IncompleteReadError
:讀取操作未完成引發的錯誤
- ImportError
- ModuleNotFoundError
- LookupError
- IndexError
- KeyError
- MemoryError
- NameError
- UnboundLocalError
OSError
:當系統函數返回與系統相關的錯誤時引發- BlockingIOError
- ChildProcessError
- ConnectionError
- BrokenPipeError
- ConnectionAbortedError
- ConnectionRefusedError
- ConnectionResetError
- FileExistsError
- FileNotFoundError
- InterruptedError
- IsADirectoryError
- NotADirectoryError
- PermissionError
- ProcessLookupError
TimeoutError
:系統函數執行超時時觸發
ReferenceError
:引用錯誤(對象被資源回收或者刪除了)RuntimeError
:出錯了,但是檢測不到錯誤類別時觸發NotImplementedError
:為實現報錯(比如調用了某個不存在的子類方法)RecursionError
:遞歸程度太深引發的異常asyncio.SendfileNotAvailableError
:系統調用不適用於給定的套接字或文件類型
SyntaxError
:語法錯誤時引發(粘貼代碼經常遇到)IndentationError
:縮進有問題TabError
:當縮進包含不一致的制表符和空格使用時引發
- SystemError
TypeError
:類型錯誤- ValueError
- UnicodeError
- UnicodeDecodeError
- UnicodeEncodeError
- UnicodeTranslateError
- Warning
- DeprecationWarning
- PendingDeprecationWarning
- RuntimeWarning
- SyntaxWarning
- UserWarning
- FutureWarning
- ImportWarning
- UnicodeWarning
- BytesWarning
- ResourceWarning
新語法的說明¶
Net方向的同志記得對比當時寫的 Python3 與 C# 並發編程之~Net篇:https://www.cnblogs.com/dunitian/p/9419325.html
1.概念¶
先說說概念:
event_loop
事件循環:- 程序開啟一個無限的循環,程序員會把一些函數(協程)注冊到事件循環上
- 當滿足事件發生的時候,調用相應的協程函數
coroutine
協程:- 協程對象,指一個使用
async
關鍵字定義的函數,它的調用不會立即執行函數,而是會返回一個協程對象 - 協程對象需要注冊到事件循環,由事件循環調用
- 協程對象,指一個使用
future
對象:- 代表將來執行或沒有執行的任務的結果(它和task上沒有本質的區別)
task
任務:- 一個協程對象就是一個原生可以掛起的函數,Task則是對協程進一步封裝,其中包含任務的各種狀態
Task
對象是Future
的子類,它將coroutine
和Future
聯系在一起,將coroutine
封裝成一個Future
對象
async/await
關鍵字:- 定義協程的關鍵字,
async
定義一個協程,await
用於掛起阻塞的異步調用接口 - 類似於
yield from
(都是在調用方與子協程之間直接建立一個雙向通道)
- 定義協程的關鍵字,
2.語法¶
為了避免讀者混亂於新舊代碼的使用,從下面開始就直接使用最新的語法的
- 運行asyncio:
asyncio.run(main())
- 只運行一次(
if __name__ == "__main__"
)
- 只運行一次(
- 創建一個任務:
asyncio.create_task(func())
- Python3.8會多一個name的別名參數
- 批量執行任務:
asyncio.gather(*tasks)
- return_exceptions=True可以屏蔽這批任務的異常,並把異常結果返回
- 如果有類似於(第一個任務完成|第一個異常產生后)進行相應的操作,則推薦
asyncio.wait
- 獲取loop:
asyncio.get_event_loop()
- 優先考慮:
asyncio.get_running_loop()
(獲取不到會拋異常)
- 優先考慮:
# 如果和舊版本混用,就應該這么寫了(麻煩)
try:
loop = asyncio.get_running_loop()
except RuntimeError as ex:
print(ex) # no running event loop
loop = asyncio.get_event_loop()
...
loop.run_until_complete(xxx)
新語法:
async def main():
loop = asyncio.get_running_loop()
...
asyncio.run(main())
3.狀態¶
Task基本上就是這幾個狀態(生成器、Future也是):
Pending
:創建Task,還未執行Running
:事件循環正在調用執行任務Done
:Task執行完畢Cancelled
:Task被取消后的狀態
4.時序圖¶
Python3.7之前官方貼了張時序圖,我們拿來理解上面的話:https://docs.python.org/3.6/library/asyncio-task.html
import asyncio
async def compute(x, y):
print(f"計算 {x}+{y}...")
await asyncio.sleep(1.0)
return x + y
async def main(x, y):
result = await compute(x, y)
print(f"{x}+{y}={result}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main(1, 2))
loop.close()
3.5.4.回調函數(新用法)¶
和舊版本比起來其實就是創建一個task
,然后為task
添加一個回調函數add_done_callback
import asyncio
async def get_html(url):
print(f"get {url} ing")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
def callback_func(task):
print(type(task))
if task.done():
print(f"done") # print(task.result())
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
# asyncio.create_task來創建一個Task
tasks = [asyncio.create_task(get_html(url)) for url in urls]
# 給每個任務都加一個回調函數
for task in tasks:
task.add_done_callback(callback_func)
# 批量執行任務
result = await asyncio.gather(*tasks)
print(result) # 返回 result list
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
get https://www.baidu.com ing
get https://www.asp.net ing
get https://www.python.org ing
get https://www.sogou.com ing
<class '_asyncio.Task'>
done
<class '_asyncio.Task'>
done
<class '_asyncio.Task'>
done
<class '_asyncio.Task'>
done
['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>', '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>']
2.0189685821533203
注意:`add_signal_handler`是loop獨有的方法,Task中沒有,eg:loop.add_signal_handler(signal.SIGINT, callback_handle, *args)
3.5.5.異常相關擴展¶
關於批量任務的異常處理:
- 默認:同一批次有一個task產生了異常,這一批次任務就全部結束了
return_exceptions=True
:不影響其他任務,異常消息也放在結果列表中- 當
gather
被取消的時候,不管True or False,這批次任務全部取消
import asyncio
async def get_html(url):
print(f"get {url} ing")
if url == "https://www.asp.net":
raise Exception("Exception is over")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
def callback_func(task):
if task.done():
print(f"done") # print(task.result())
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
# asyncio.create_task來創建一個Task
tasks = [asyncio.create_task(get_html(url)) for url in urls]
# 給每個任務都加一個回調函數
for task in tasks:
task.add_done_callback(callback_func)
# 批量執行任務
result = await asyncio.gather(*tasks, return_exceptions=True)
print(result) # 返回 result list
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
get https://www.baidu.com ing
get https://www.asp.net ing
get https://www.python.org ing
get https://www.sogou.com ing
done
done
done
done
['<h1>This is a test for https://www.baidu.com</h1>', Exception('Exception is over'), '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>']
2.013272523880005
3.5.6.任務分組、取消¶
1.分組¶
看個簡單的任務分組案例:
import asyncio
async def get_html(url):
print(f"get url for{url}")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls1 = ["https://www.baidu.com", "https://www.asp.net"]
urls2 = ["https://www.python.org", "https://www.sogou.com"]
tasks1 = [asyncio.create_task(get_html(url)) for url in urls1]
tasks2 = [asyncio.create_task(get_html(url)) for url in urls2]
# 等待兩組都完成,然后返回聚合結果
result = await asyncio.gather(*tasks1, *tasks2)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
try:
asyncio.run(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
輸出:(兩個分組結果被一起放到了list中)
get url forhttps://www.baidu.com
get url forhttps://www.asp.net
get url forhttps://www.python.org
get url forhttps://www.sogou.com
['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>', '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>']
2.0099380016326904
2.取消¶
如果想要對Group1
和Group2
進行更多的自定化,可以再包裹一層gather
方法:
import asyncio
async def get_html(url):
print(f"get url for{url}")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls1 = ["https://www.baidu.com", "https://www.asp.net"]
urls2 = ["https://www.python.org", "https://www.sogou.com"]
tasks1 = [asyncio.create_task(get_html(url)) for url in urls1]
tasks2 = [asyncio.create_task(get_html(url)) for url in urls2]
group1 = asyncio.gather(*tasks1)
group2 = asyncio.gather(*tasks2)
# 分組2因為某原因被取消任務了(模擬)
group2.cancel()
# 等待兩組都完成,然后返回聚合結果
result = await asyncio.gather(group1, group2, return_exceptions=True)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
try:
asyncio.run(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
輸出:
get url forhttps://www.baidu.com
get url forhttps://www.asp.net
[['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>'], CancelledError()]
2.0090348720550537
再看個單個任務的案例:
import asyncio
async def test():
print("start...")
await asyncio.sleep(10)
print("end...")
async def main():
task = asyncio.create_task(test())
await asyncio.sleep(1)
# 取消task任務
task.cancel()
try:
await task
except asyncio.CancelledError:
print(f"任務已經被取消:{task.cancelled()}")
print(f"任務是因為異常而完成:{task.done()}")
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
start...
任務已經被取消:True
任務是因為異常而完成:True
1.0133979320526123
簡單說明下:
task.done()
:任務是否完成- 任務完成:
task.done() ==> true
:- 任務正常完成
- 觸發異常而被標記為任務完成
- 任務完成:
task.cancelled()
:用來判斷是否成功取消
為什么這么說?看看源碼:
# 完成包含了正常+異常
if outer.done():
# 把因為異常完成的任務打個標記
if not fut.cancelled():
fut.exception() # 標記檢索的異常
PS:官方推薦asyncio.all_tasks
(loop中尚未完成的Task集合):
- 原來是通過:
asyncio.Task.all_tasks
來獲取(返回loop的所有Task集合)
wait_for and wait¶
1.一個任務限時等待(wait_for)¶
超時等待:asyncio.wait_for(task, timeout)
import asyncio
async def test(time):
print("start...")
await asyncio.sleep(time)
print("end...")
return time
async def main():
task = asyncio.create_task(test(3))
try:
result = await asyncio.wait_for(task, timeout=2)
print(result)
except asyncio.CancelledError:
print("Cancel")
except asyncio.TimeoutError:
print("超時取消")
except Exception as ex:
print(ex)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
start...
超時取消
2.007002115249634
2.多個任務限時等待(wait)¶
wait
是比gather
更底層的api,比如現在這個多任務限時等待gather
並不能滿足:
import asyncio
async def test(time):
print("start...")
await asyncio.sleep(time)
print("end...")
return time
async def main():
tasks = [asyncio.create_task(test(i)) for i in range(10)]
# 已完成的任務(包含異常),未完成的任務
done, pending = await asyncio.wait(tasks, timeout=2)
# 任務總數(我用了3種表示)PS:`all_tasks()`的時候記得去除main的那個
print(
f"任務總數:{len(tasks)}=={len(done)+len(pending)}=={len(asyncio.Task.all_tasks())-1}"
)
# 所有未完成的task:asyncio.all_tasks(),記得去掉run(main())
print(f"未完成Task:{len(pending)}=={len(asyncio.all_tasks()) - 1}")
print(await asyncio.gather(*done))
# for task in done:
# print(await task)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
start...
start...
start...
start...
start...
start...
start...
start...
start...
start...
end...
end...
end...
任務總數:10==10==10
未完成Task:7==7
[0, 1, 2]
2.0071778297424316
wait的擴展¶
用法其實和Future一樣(https://www.cnblogs.com/dotnetcrazy/p/9528315.html#Future對象),這邊就當再普及下新語法了
第一個任務執行完成則結束此批次任務¶
項目里經常有這么一個場景:同時調用多個同效果的API,有一個返回后取消其他請求
,看個引入案例
import asyncio
async def test(i):
print(f"start...task{i}")
await asyncio.sleep(i)
print(f"end...task{i}")
return "ok"
# 第一個任務執行完成則結束此批次任務
async def main():
tasks = [asyncio.create_task(test(i)) for i in range(10)]
# 項目里經常有這么一個場景:同時調用多個同效果的API,有一個返回后取消其他請求
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
# print(await asyncio.gather(*done))
for task in done:
print(await task)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
start...task0
start...task1
start...task2
start...task3
start...task4
start...task5
start...task6
start...task7
start...task8
start...task9
end...task0
ok
0.017002105712890625
課后拓展:(asyncio.shield
保護等待對象不被取消) https://docs.python.org/3/library/asyncio-task.html#shielding-from-cancellation
下級預估:舊代碼兼容、同步語、Socket新用
代碼答疑¶
之前有人問我,這個asyncio.get_running_loop()
到底是用還是不用?為什么一會asyncio.get_event_loop()
一會又是asyncio.get_running_loop()
,一會是loop.run_until_complete()
一會又是asyncio.run()
的,有點混亂了。
之前逆天簡單的提了一下,可能說的還是不太詳細,這邊再舉幾個例子說說:
首先:如果你用的是Python3.7
之前的版本,那么你用不到loop = asyncio.get_running_loop()
和asyncio.run()
的
如果是老版本你就使用asyncio.get_event_loop()
來獲取loop
,用loop.run_until_complete()
來運行:
import asyncio
async def test():
print("start ...")
await asyncio.sleep(2)
print("end ...")
# 如果你用`get_running_loop`就不要和`loop.run_until_complete`混用
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
輸出:(混用需要捕獲Runtime的異常)
start ...
end ...
上節課說使用asyncio.get_running_loop()
麻煩的情景是這個:(這種情況倒不如直接asyncio.get_event_loop()
獲取loop了)
# 如果和舊版本混用,就應該這么寫了(麻煩)
try:
loop = asyncio.get_running_loop()
except RuntimeError as ex:
loop = asyncio.get_event_loop()
...
asyncio.run(test())
官方推薦的新語法是這樣的:(>=Python3.7
)
async def main():
loop = asyncio.get_running_loop()
...
asyncio.run(main())
PS:記住一句就行:asyncio.get_running_loop()
和asyncio.run()
成對出現
可以這么理解:asyncio.run
里會創建對應的loop
,所以你才能獲取正在運行的loop
:
# https://github.com/lotapp/cpython3/blob/master/Lib/asyncio/runners.py
def run(main, *, debug=False):
if events._get_running_loop() is not None:
raise RuntimeError("無法從正在運行的事件循環中調用asyncio.run()")
if not coroutines.iscoroutine(main):
raise ValueError("{!r}應該是一個協程".format(main))
# 創建一個新的事件循環
loop = events.new_event_loop()
try:
events.set_event_loop(loop) # 設置事件循環
loop.set_debug(debug) # 是否調試運行(默認否)
return loop.run_until_complete(main) # 等待運行
finally:
try:
_cancel_all_tasks(loop) # 取消其他任務
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
就是怕大家混亂,上節課開始就直接使用的最新語法,舊語法文章里盡量不使用了,本節也是
3.5.7.兼容舊代碼 or 運行阻塞代碼¶
部分可以參考官方文檔:https://docs.python.org/3/library/asyncio-eventloop.html
學了協程GIL
的問題其實也不是多大的事情了,多進程+協程就可以了,asyncio
現在也提供了線程安全的run
方法:asyncio.run_coroutine_threadsafe(coro)
(也算是對GIL給出的官方解決方法了)
1.協程 and 線程池¶
前面我們說過了並發編程(線程+進程)的通用解決方案:並發編程:concurrent.futures專欄
asyncio
框架雖然幾乎包含了所有常用功能,但畢竟是新事物,舊代碼怎么辦?協程只是單線程工作,理論上不能使用阻塞代碼,那庫或者api只能提供阻塞的調用方式怎么辦? ~ 不用慌,可以使用官方提供的兼容方法,先看個案例:
1.回顧下一起的通用方案:¶
import asyncio
import concurrent.futures
# 模擬一個耗時操作
def test(n):
return sum(i * i for i in range(10**n))
# old main
def main():
with concurrent.futures.ThreadPoolExecutor() as pool:
# 注意:future和asyncio.future是不一樣的
future = pool.submit(test, 7)
result = future.result()
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
main() # old
print(time.time() - start_time)
輸出:(注意:future
和asyncio.future
不是一個東西,只是類似而已)
333333283333335000000
15.230607032775879
2.兼容版新用法:¶
import asyncio
import concurrent.futures
# 模擬一個耗時操作
def test(n):
return sum(i * i for i in range(10**n))
async def main():
# 獲取loop
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
# 新版兼任代碼
result = await loop.run_in_executor(pool, test, 7)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main()) # new
print(time.time() - start_time)
輸出:(不談其他的,至少運行速度快了)
333333283333335000000
15.283994913101196
源碼分析¶
我們來看看run_in_executor
的內部邏輯是啥:
class BaseEventLoop(events.AbstractEventLoop):
def run_in_executor(self, executor, func, *args):
# 檢查loop是否關閉,如果關閉就拋`RuntimeError`異常
self._check_closed()
if self._debug:
self._check_callback(func, 'run_in_executor')
# 如果不傳一個executor,就會使用默認的executor
# 換句話說:你可以不傳`線程池`
if executor is None:
executor = self._default_executor
if executor is None:
executor = concurrent.futures.ThreadPoolExecutor()
self._default_executor = executor
# 把`concurrent.futures.Future`對象封裝成`asyncio.futures.Future`對象
return futures.wrap_future(executor.submit(func, *args), loop=self)
看完源碼就發現,代碼還可以進一步簡化:
import asyncio
# 模擬一個耗時操作
def test(n):
return sum(i * i for i in range(10**n))
async def main():
# 獲取loop
loop = asyncio.get_running_loop()
# 新版兼任代碼
result = await loop.run_in_executor(None, test, 7)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
333333283333335000000
15.367998838424683
PS:協程里面不應該出現傳統的阻塞代碼,如果只能用那些代碼,那么這個就是一個兼任的措施了
2.回調擴展¶
這個沒有之前講的那些常用,就當了解下,框架里面碰到不至於懵逼:
- Task執行完后執行:
add_done_callback(回調函數)
task.add_done_callback()
orloop.add_done_callback()
- 想要傳參數可以使用:
functools.partial(call_back, url)
- PS:通過偏函數傳過來的參數在最前面:
call_back(url,task)
- 盡快執行:
call_soon(callback,*args)
loop.call_soon()
、線程安全:loop.call_soon_threadsafe()
- 可以看成是
loop.call_later(0,callback,*args)
- 一般臨時插入一個任務的時候會用到
- 指定時間后執行:
loop.call_later(delay,callback,*args)
- 延遲可以是int或float,以秒為單位(相對於當前時間)
- 返回的對象可以使用cancel()方法來取消任務
- 指定協程時間后執行:
loop.call_at(絕對時間,callback,*args)
- 和call_later差不多,時間使用絕對時間(這個絕對時間是loop的time()方法)
注意點:首先要保證任務執行前loop不斷開,比如你call_later(2,xxx)
,這時候loop退出了,那么任務肯定完成不了
這個比較簡單,看個案例:
import asyncio
def test(name):
print(f"start {name}...")
print(f"end {name}...")
async def main():
# 正在執行某個任務
loop = asyncio.get_running_loop()
# 插入一個更要緊的任務
# loop.call_later(0, callback, *args)
task1 = loop.call_soon(test, "task1")
# 多少秒后執行
task2 = loop.call_later(2, test, "task2")
# 內部時鍾時間
task3 = loop.call_at(loop.time() + 3, test, "task3")
print(type(task1))
print(type(task2))
print(type(task3))
# 保證loop在執行完畢后才關閉
await asyncio.sleep(5)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:(回調函數一般都是普通函數
)
<class 'asyncio.events.Handle'>
<class 'asyncio.events.TimerHandle'>
<class 'asyncio.events.TimerHandle'>
start task1...
end task1...
start task2...
end task2...
start task3...
end task3...
4.9966819286346436
PS:關於返回值的說明可以看官方文檔:https://docs.python.org/3/library/asyncio-eventloop.html#callback-handles
然后說下call_later
(這個執行過程會按照時間排個先后順序,然后再批次運行)
import asyncio
# 回調函數一般都是普通函數
def test(name):
print(name)
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
# 新版本限制了時間不能超過24h(防止有些人當定時任務來亂用)
# 這個執行過程會安裝時間排個先后順序,然后再批次運行
task4 = loop.call_later(4, test, "task2-4")
task2 = loop.call_later(2, test, "task2-2")
task3 = loop.call_later(3, test, "task2-3")
task1 = loop.call_later(1, test, "task2-1")
# 取消測試
task4.cancel()
# close是直接丟棄任務然后關閉loop
loop.call_later(4, loop.stop) # 等任務執行完成結束任務 loop.stop()
# run內部運行的是run_until_complete,而run_until_complete內部運行的是run_forever
loop.run_forever()
print(time.time() - start_time)
輸出:(asyncio.get_running_loop()
不要和舊代碼混用)
task2-1
task2-2
task2-3
4.009201526641846
PS:run
內部運行的是run_until_complete
,而run_until_complete
內部運行的是run_forever
Task答疑¶
從開始說新語法之后,我們創建任務都直接用asyncio.create_task
來包裹一層,有人問我這個Task
除了是Future
的子類外,有啥用?為啥不直接使用Future
呢?貌似也沒語法啊?
看一個案例:
import asyncio
# 不是協程就加個裝飾器
@asyncio.coroutine
def test():
print("this is a test")
async def test_async():
print("this is a async test")
await asyncio.sleep(1)
async def main():
# 傳入一個協程對象,返回一個task
task1 = asyncio.create_task(test())
task2 = asyncio.create_task(test_async())
await asyncio.gather(task1, task2)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
this is a test
this is a async test
1.0070011615753174
我們來看看asyncio.create_task
的源碼:(關鍵在Task
類)
# 傳入一個協程對象,返回一個Task對象
def create_task(self, coro):
self._check_closed()
if self._task_factory is None:
# look:核心點
task = tasks.Task(coro, loop=self)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
return task
看看核心類Task
:
class Task(futures._PyFuture):
def __init__(self, coro, *, loop=None):
super().__init__(loop=loop)
...
# 安排了一個盡快執行的回調方法:self.__step
self._loop.call_soon(self.__step, context=self._context)
def __step(self, exc=None):
try:
if exc is None:
# 協程初始化(生成器或者協程初始化 next(xxx))
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# 在停止之前取消任務
self._must_cancel = False
super().set_exception(futures.CancelledError())
else:
# 拿到了協程/生成器的結果
super().set_result(exc.value)
except futures.CancelledError:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
super().set_exception(exc)
except BaseException as exc:
super().set_exception(exc)
raise
...
PS:那么很明顯了,Task
的作用就類似於future
和協程
的中間人了(屏蔽某些差異)
3.5.8.Socket新用法¶
官方文檔:https://docs.python.org/3/library/asyncio-stream.html
asyncio
實現了TCP、UDP、SSL
等協議,aiohttp
則是基於asyncio
實現的HTTP框架,我們簡單演示一下(PS:網絡通信基本上都是使用aiohttp
)
1.簡單案例¶
服務端:
import asyncio
async def handler(client_reader, client_writer):
# 沒有數據就阻塞等(主線程做其他事情去了)
data = await client_reader.read(2048)
print(data.decode("utf-8"))
client_writer.write("驪山語罷清宵半,淚雨霖鈴終不怨\n何如薄幸錦衣郎,比翼連枝當日願".encode("utf-8"))
await client_writer.drain() # 等待緩沖區(緩沖區沒占滿就直接返回)
client_writer.close() # 關閉連接
async def main():
server = await asyncio.start_server(handler, "127.0.0.1", 8080)
print("Server已經啟動,端口:8080")
# 實現了協程方法`__aenter__`和`__aexit__`的可以使用`async with`
async with server:
# async def serve_forever(self):pass ==> use await
await server.serve_forever() # 異步方法
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
客戶端:
import asyncio
async def main():
reader, writer = await asyncio.open_connection("127.0.0.1", 8080)
writer.write("人生若只如初見,何事秋風悲畫扇\n等閑變卻故人心,卻道故人心易變".encode("utf-8"))
data = await reader.read(2048)
if data:
print(data.decode("utf-8"))
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出圖示:
2.HTTP案例¶
再舉個HTTP的案例:
import asyncio
async def get_html(host):
print("get_html %s..." % host)
reader, writer = await asyncio.open_connection(host, 80)
writer.write(f"GET / HTTP/1.1\r\nHost: {host}\r\n\r\n".encode('utf-8'))
await writer.drain() # 等待緩沖區
html_list = []
async for line in reader:
html_list.append(line.decode("utf-8"))
writer.close() # 關閉連接
return "\n".join(html_list)
async def main():
tasks = [
asyncio.create_task(get_html(url))
for url in ['dotnetcrazy.cnblogs.com', 'dunitian.cnblogs.com']
]
html_list = await asyncio.gather(*tasks)
print(html_list)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
get_html dotnetcrazy.cnblogs.com...
get_html dunitian.cnblogs.com...
[html內容省略,html內容省略]
5.092018604278564
GIF過程圖:
PS:(后面會繼續說的)
- 實現了協程方法
__anext__
的可以使用async for
- 實現了協程方法
__aenter__
和__aexit__
的可以使用async with
3.源碼分析¶
還記得之前IO多路復用的時候自己寫的非阻塞Server
不,簡單梳理下流程,然后咱們再一起看看asyncio
對應的源碼:
- 設置
Socket
為非阻塞(socket.setblocking(False)
) - 利用輪詢用來監視文件描述符
fd
(register
) - 對可讀寫的
socket
進行相應操作 - 取消輪詢的監聽(
unregister
)
看看await asyncio.open_connection(ip,port)
的源碼:
# asyncio.streams.py
async def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds):
if loop is None:
loop = events.get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
# 核心點
transport, _ = await loop.create_connection(lambda: protocol, host, port, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
發現,其實內部核心在loop.create_connection
中
# asyncio.base_events.py
# 連接TCP服務器
class BaseEventLoop(events.AbstractEventLoop):
async def create_connection(self,
protocol_factory,
host=None,
port=None,
*,
ssl=None,
family=0,
proto=0,
flags=0,
sock=None,
local_addr=None,
server_hostname=None,
ssl_handshake_timeout=None):
...
# 主要邏輯
if host is not None or port is not None:
exceptions = []
# 主要邏輯
for family, type, proto, cname, address in infos:
try:
sock = socket.socket(family=family, type=type, proto=proto)
sock.setblocking(False) # 1.設置非阻塞 <<<< look
if local_addr is not None:
for _, _, _, _, laddr in laddr_infos:
try:
sock.bind(laddr) # 端口綁定
break
except OSError as exc:
msg = (f'error while attempting to bind on '
f'address {laddr!r}: '
f'{exc.strerror.lower()}')
exc = OSError(exc.errno, msg)
exceptions.append(exc)
else:
sock.close()
sock = None
continue
if self._debug:
logger.debug("connect %r to %r", sock, address)
# 在selector_events中
await self.sock_connect(sock, address) # <<< look
except OSError as exc:
if sock is not None:
sock.close()
exceptions.append(exc)
except:
if sock is not None:
sock.close()
raise
else:
break
發現源碼中設置了socket為非阻塞,調用了sock_connect
async def sock_connect(self, sock, address):
"""連接遠程socket地址(協程方法)"""
# 非阻塞檢查
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
...
fut = self.create_future()
self._sock_connect(fut, sock, address)
return await fut
def _sock_connect(self, fut, sock, address):
fd = sock.fileno() # 獲取socket的文件描述符 <<< look
try:
sock.connect(address)
except (BlockingIOError, InterruptedError):
# 設置future的回調函數_sock_connect_done(用來注銷的)<<< look
fut.add_done_callback(functools.partial(self._sock_connect_done, fd))
# 注冊selector.register
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except Exception as exc:
fut.set_exception(exc)
else:
fut.set_result(None)
先看下sock_connect
中調用的add_writer
(注冊)
def add_writer(self, fd, callback, *args):
"""添加一個寫的回調"""
self._ensure_fd_no_transport(fd)
return self._add_writer(fd, callback, *args)
def _add_writer(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self, None)
try:
key = self._selector.get_key(fd)
except KeyError:
self._selector.register(fd, selectors.EVENT_WRITE,
(None, handle)) # selector.register
else:
mask, (reader, writer) = key.events, key.data
self._selector.modify(fd, mask | selectors.EVENT_WRITE,
(reader, handle))
if writer is not None:
writer.cancel()
再看下sock_connect
中設置的回調函數_sock_connect_done
(注銷)
def _sock_connect_done(self, fd, fut):
# 取消注冊selector.unregister
self.remove_writer(fd)
def remove_writer(self, fd):
"""移除寫的回調"""
self._ensure_fd_no_transport(fd)
return self._remove_writer(fd)
def _remove_writer(self, fd):
if self.is_closed():
return False
try:
key = self._selector.get_key(fd)
except KeyError:
return False
else:
mask, (reader, writer) = key.events, key.data
mask &= ~selectors.EVENT_WRITE
if not mask:
self._selector.unregister(fd) # 注銷 <<< look
else:
self._selector.modify(fd, mask, (reader, None))
if writer is not None:
writer.cancel()
return True
else:
return False
PS:嵌套的非常深,而且底層代碼一致在變(Python3.6到Python3.7這個新小更新就變化很大)
關於源碼的說明¶
之前並發編程的基礎知識已經講的很清楚了,也分析了很多源碼,你可以自己去拓展一下(Python3
的asyncio
模塊的源碼一直在優化改進的路上)我這邊就不一一分析了(源碼很亂,估計幾個版本后會清晰,現在是多層混套用),你可以參考部分源碼解析:https://github.com/lotapp/cpython3/tree/master/Lib/asyncio
課后拓展:
https://docs.python.org/3/library/asyncio-protocol.html#examples
https://docs.python.org/3/library/asyncio-eventloop.html#creating-network-servers
下節預估:同步與通信、aiohttp版爬蟲
3.5.9.同步與通信¶
官方文檔:
https://docs.python.org/3/library/asyncio-sync.html
https://docs.python.org/3/library/asyncio-queue.html
寫在前面:
- 下面的方式不是線程安全的(協程就一個線程)
- 這些同步原語的方法不接受超時參數; 使用asyncio.wait_for(協程方法,超時時間)函數執行超時操作
asyncio
具有以下基本同步原語:Lock、Event、Condition、Semaphore、BoundedSemaphore
1.引導示例¶
1.1.old code¶
先看個原來的引導案例:估計的結果是0,而不借助lock得出的結果往往出乎意料
import concurrent.futures
num = 0
def test(i):
global num
for _ in range(10000000):
num += i
def main():
with concurrent.futures.ThreadPoolExecutor() as executor:
print("start submit...")
future1 = executor.submit(test, 1)
future2 = executor.submit(test, -1)
concurrent.futures.wait([future1, future2]) # wait some time
print("end submit...")
global num
print(num)
if __name__ == "__main__":
import time
start_time = time.time()
main()
print(f"time:{time.time()-start_time}")
輸出:(但是代碼並不是線程安全的,所以結果往往不是我們想要的)
start submit...
end submit...
82705
time:5.032064199447632
1.2.new code¶
再看看協程的案例:
import asyncio
num = 0
async def test(i):
global num
for _ in range(10000000):
num += i
async def main():
print("start tasks...")
task1 = asyncio.create_task(test(1))
task2 = asyncio.create_task(test(-1))
await asyncio.gather(task1, task2)
print("end tasks...")
global num
print(num)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(f"time:{time.time()-start_time}")
輸出:(就一個線程,當然安全)
start tasks...
end tasks...
0
time:4.860997438430786
1.3.注意點¶
PS:你使用協程的兼容代碼,並不能解決線程不安全的問題
import asyncio
import concurrent.futures
num = 0
def test(i):
global num
for _ in range(10000000):
num += i
async def main():
# 獲取當前loop
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
print("start submit...")
future1 = loop.run_in_executor(executor, test, 1)
future2 = loop.run_in_executor(executor, test, -1)
# await asyncio.wait([future1,future2])
await asyncio.gather(future1, future2)
print("end submit...")
global num
print(num)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(f"time:{time.time()-start_time}")
輸出:
start submit...
end submit...
-1411610
time:5.0279998779296875
2.為什么需要同步機制?¶
咋一看,單線程不用管線程安全啥的啊,要啥同步機制?其實在業務場景里面還是會出現諸如重復請求的情況,這個時候就需要一個同步機制了:
import asyncio
# 用來存放頁面緩存
cache_dict = {}
# 模擬一個獲取html的過程
async def fetch(url):
# 每次網絡訪問,時間其實不確定的
import random
time = random.randint(2, 5)
print(time)
await asyncio.sleep(time)
return f"<h2>{url}</h2>"
async def get_html(url):
# 如果緩存存在,則返回緩存的頁面
for url in cache_dict:
return cache_dict[url]
# 否則獲取頁面源碼並緩存
html = await fetch(url)
cache_dict[url] = html
return html
async def parse_js(url):
html = await get_html(url)
# do somthing
return len(html)
async def parse_html(url):
html = await get_html(url)
# do somthing
return html
async def main():
# 提交兩個Task任務
task1 = asyncio.create_task(parse_js("www.baidu.com"))
task2 = asyncio.create_task(parse_html("www.baidu.com"))
# 等待任務結束
result_list = await asyncio.gather(task1, task2)
print(result_list)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:(fetch
方法訪問了兩次 ==> 兩次網絡請求)
2
3
[22, '<h2>www.baidu.com</h2>']
3.0100157260894775
簡單說明:baidu.com
一開始沒緩存,那當解析js和解析html的任務提交時,就會進行兩次網絡請求(網絡IO比較耗時),這樣更容易觸發反爬蟲機制
3.Lock(互斥鎖)¶
線程相關的Lock復習:https://www.cnblogs.com/dotnetcrazy/p/9528315.html#2.2.1.線程同步~互斥鎖Lock
協程是線程安全的,那么這個Lock
肯定是和多線程/進程
里面的Lock
是不一樣的,我們先看一下提煉版的源碼:
class Lock(_ContextManagerMixin):
def __init__(self, *, loop=None):
self._waiters = collections.deque()
self._locked = False
if loop is not None:
self._loop = loop
else:
self._loop = events.get_event_loop()
async def acquire(self):
if not self._locked:
self._locked = True # 改變標識
...
return self._locked
def release(self):
if self._locked:
self._locked = False
...
PS:源碼看完秒懂了,asyncio里面的lock其實就是一個標識而已
修改一下上面的例子:
import asyncio
# 用來存放頁面緩存
cache_dict = {}
lock = None # 你可以試試在這邊直接寫`asyncio.Lock()`
# 模擬一個獲取html的過程
async def fetch(url):
# 每次網絡訪問,時間其實不確定的
import random
time = random.randint(2, 5)
print(time)
await asyncio.sleep(time)
return f"<h2>{url}</h2>"
async def get_html(url):
async with lock:
# 如果緩存存在,則返回緩存的頁面
for url in cache_dict:
return cache_dict[url]
# 否則獲取頁面源碼並緩存
html = await fetch(url)
cache_dict[url] = html
return html
async def parse_js(url):
html = await get_html(url)
# do somthing
return len(html)
async def parse_html(url):
html = await get_html(url)
# do somthing
return html
async def main():
global lock
lock = asyncio.Lock() # 如果在開頭就定義,那么lock的loop和方法的loop就會不一致了
# 提交兩個Task任務
task1 = asyncio.create_task(parse_js("www.baidu.com"))
task2 = asyncio.create_task(parse_html("www.baidu.com"))
# 等待任務結束
result_list = await asyncio.gather(task1, task2)
print(result_list)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:(fetch
方法訪問了1次 ==> 1次網絡請求)
3
[22, '<h2>www.baidu.com</h2>']
3.0020127296447754
4.Semaphore(信號量)¶
線程篇Semaphore
:https://www.cnblogs.com/dotnetcrazy/p/9528315.html#2.2.5.線程同步~信號量Semaphore(互斥鎖的高級版)
這個用的比較多,簡單回顧下之前講的概念案例:
通俗講就是:在互斥鎖的基礎上封裝了下,實現一定程度的並行
舉個例子,以前使用互斥鎖的時候:(廁所就一個坑位,必須等里面的人出來才能讓另一個人上廁所)
使用信號量Semaphore
之后:廁所坑位增加到5個(自己指定),這樣可以5個人一起上廁所了==> 實現了一定程度的並發控制
先看下縮略的源碼:(可以這么想:內部維護了一個引用計數,每次來個任務就-1,一個任務結束計數就+1)
class Semaphore(_ContextManagerMixin):
def __init__(self, value=1, *, loop=None):
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._value = value
self._waiters = collections.deque()
if loop is not None:
self._loop = loop
else:
self._loop = events.get_event_loop()
async def acquire(self):
while self._value <= 0:
fut = self._loop.create_future()
self._waiters.append(fut) # 把當前任務放入Queue中
try:
await fut # 等待一個任務的完成再繼續
except:
fut.cancel() # 任務取消
if self._value > 0 and not fut.cancelled():
self._wake_up_next() # 喚醒下一個任務
raise
self._value -= 1 # 用掉一個並發量
return True
def release(self):
self._value += 1 # 恢復一個並發量
self._wake_up_next() # 喚醒下一個任務
現在舉個常見的場景:比如調用某個免費的api,該api限制並發數為5
import asyncio
sem = None
# 模擬api請求
async def api_test(i):
async with sem:
await asyncio.sleep(1)
print(f"The Task {i} is done")
async def main():
global sem
sem = asyncio.Semaphore(5) # 設置並發數為5
tasks = [asyncio.create_task(api_test(i)) for i in range(20)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
動態輸出:
PS:BoundedSemaphore
是Semaphore
的一個版本,在調用release()
時檢查計數器的值是否超過了計數器的初始值,如果超過了就拋出一個異常
5.Event(事件)¶
線程篇Event
:https://www.cnblogs.com/dotnetcrazy/p/9528315.html#2.2.8.線程同步~Event
之前講的很詳細了,舉個爬蟲批量更新
的例子就一筆帶過:
import asyncio
event = None
html_dict = {}
async def updates():
# event.wait()是協程方法,需要await
await event.wait()
# 入庫操作省略 html_dict >> DB
return "html_dict >> DB done"
async def get_html(url):
# 摸擬網絡請求
await asyncio.sleep(2)
html_dict[url] = f"<h1>{url}</h1>" # 可以暫時寫入臨時文件中
event.set() # 標記完成,普通方法
return f"{url} done"
async def main():
global event
event = asyncio.Event() # 初始化 event 對象
# 創建批量任務
tasks = [
asyncio.create_task(get_html(f"www.mmd.com/a/{i}"))
for i in range(1, 10)
]
# 批量更新操作
tasks.append(asyncio.create_task(updates()))
result = await asyncio.gather(*tasks)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
['www.mmd.com/a/1 done', 'www.mmd.com/a/2 done', 'www.mmd.com/a/3 done', 'www.mmd.com/a/4 done', 'www.mmd.com/a/5 done', 'www.mmd.com/a/6 done', 'www.mmd.com/a/7 done', 'www.mmd.com/a/8 done', 'www.mmd.com/a/9 done', 'html_dict >> DB done']
2.0012683868408203
跟之前基本上一樣,就一個地方不太一樣:async def wait(self)
,wait
方法現在是協程方法了,使用的時候需要await
coroutine wait()
- 等待事件內部標志被設置為
True
- 如果事件的內部內部標志已設置,則立即返回
True
。否則,一直阻塞,直到另外的任務調用set()
- 等待事件內部標志被設置為
set()
- 設置事件內部標志為
True
- 所有等待事件的任務將會立即被觸發
- 設置事件內部標志為
clear()
- 清除事件內部標志(即重置為
False
) - 等待事件的任務將會阻塞,直到
set()
方法被再次調用
- 清除事件內部標志(即重置為
is_set()
- 如果事件內部標志被設置為
True
,則返回True
- 如果事件內部標志被設置為
6.Condition(條件變量)¶
線程篇Condition
:https://www.cnblogs.com/dotnetcrazy/p/9528315.html#2.2.4.線程同步~條件變量Condition
先簡單看看方法列表:
coroutine acquire()
:- 獲取底層鎖。該方法一直等待,直到底層鎖處於未鎖定狀態,然后設置其為鎖定狀態,並且返回True
notify(n=1)
:- 喚醒至多n個等待條件的任務。如果沒有正在等待的任務,則該方法無操作。
- 在調用該方法之前,必須先調用
acquire()
獲取鎖,並在調用該方法之后釋放鎖。 - 如果在鎖為鎖定的情況下調用此方法,會引發
RuntimeError
異常。
locked()
:- 如果底層鎖已獲取,則返回True。
notify_all()
:- 喚醒所有正在等待該條件的任務。該方法與notify()類似,區別只在它會喚醒所有正在等待的任務。
release()
:- 釋放底層鎖。在未鎖定的鎖上調用時,會引發RuntimeError異常。
coroutine wait()
:- 等待通知。如果調用此方法的任務沒有獲取到鎖,則引發RuntimeError異常。
- 此方法釋放底層鎖,然后保持阻塞,直至被notify()或notify_all()喚醒。被喚醒之后,條件對象重新申請鎖,該方法返回True。
coroutine wait_for(predicate)
- 等待
predicate
變為True。predicate
必須可調用,它的執行結果會被解釋為布爾值,並作為最終結果返回。
- 等待
PS:Condition
結合了Event
和Lock
的功能(也可以使多個Condition對象共享一個Lock,允許不同任務之間協調對共享資源的獨占訪問)
看個生產消費者的案例:
import asyncio
cond = None
p_list = []
# 生產者
async def producer(n):
for i in range(5):
async with cond:
p_list.append(f"{n}-{i}")
print(f"[生產者{n}]生產商品{n}-{i}")
# 通知任意一個消費者
cond.notify() # 通知全部消費者:cond.notify_all()
# 摸擬一個耗時操作
await asyncio.sleep(0.01)
# 消費者
async def consumer(i):
while True:
async with cond:
if p_list:
print(f"列表商品:{p_list}")
name = p_list.pop() # 消費商品
print(f"[消費者{i}]消費商品{name}")
print(f"列表剩余:{p_list}")
# 摸擬一個耗時操作
await asyncio.sleep(0.01)
else:
await cond.wait()
async def main():
global cond
cond = asyncio.Condition() # 初始化condition
p_tasks = [asyncio.create_task(producer(i)) for i in range(2)] # 兩個生產者
c_tasks = [asyncio.create_task(consumer(i)) for i in range(5)] # 五個消費者
await asyncio.gather(*p_tasks, *c_tasks)
if __name__ == "__main__":
asyncio.run(main())
輸出:
[生產者0]生產商品0-0
[生產者1]生產商品1-0
列表商品:['0-0', '1-0']
[消費者0]消費商品1-0
列表剩余:['0-0']
列表商品:['0-0']
[消費者1]消費商品0-0
列表剩余:[]
[生產者0]生產商品0-1
[生產者1]生產商品1-1
列表商品:['0-1', '1-1']
[消費者0]消費商品1-1
列表剩余:['0-1']
列表商品:['0-1']
[消費者1]消費商品0-1
列表剩余:[]
[生產者0]生產商品0-2
[生產者1]生產商品1-2
列表商品:['0-2', '1-2']
[消費者0]消費商品1-2
列表剩余:['0-2']
列表商品:['0-2']
[消費者1]消費商品0-2
列表剩余:[]
[生產者0]生產商品0-3
[生產者1]生產商品1-3
列表商品:['0-3', '1-3']
[消費者0]消費商品1-3
列表剩余:['0-3']
列表商品:['0-3']
[消費者1]消費商品0-3
列表剩余:[]
[生產者0]生產商品0-4
[生產者1]生產商品1-4
列表商品:['0-4', '1-4']
[消費者0]消費商品1-4
列表剩余:['0-4']
列表商品:['0-4']
[消費者1]消費商品0-4
列表剩余:[]
PS:第七條的簡單說明:(來看看wait_for
方法的源碼)
# 一直等到函數返回true(從返回結果來說:要么一直阻塞,要么返回true)
async def wait_for(self, predicate):
result = predicate()
# 如果不是返回true就繼續等待
while not result:
await self.wait()
result = predicate()
return result
課后拓展:async_timeout
(兼容async的超時的上下文管理器) https://github.com/lotapp/BaseCode/blob/master/python/5.concurrent/ZCoroutine/async_timeout_timeout.py
7.Queue(隊列)¶
官方文檔:https://docs.python.org/3/library/asyncio-queue.html
線程篇Queue
:https://www.cnblogs.com/dotnetcrazy/p/9528315.html#2.2.6.線程同步~Queue-引入
其實你不考慮限流的情況下,協程里面的queue和list基本上差不多(ps:asyncio.Queue(num)
可以指定數量)
舉個經典的生產消費者案例:
import random
import asyncio
async def producer(q, i):
for i in range(5):
num = random.random()
await q.put(num)
print(f"[生產者{i}]商品{num}出廠了")
await asyncio.sleep(num)
async def consumer(q, i):
while True:
data = await q.get()
print(f"[消費者{i}]商品{data}搶光了")
async def main():
queue = asyncio.Queue(10) # 為了演示,我這邊限制一下
p_tasks = [asyncio.create_task(producer(queue, i)) for i in range(2)] # 兩個生產者
c_tasks = [asyncio.create_task(consumer(queue, i)) for i in range(5)] # 五個消費者
await asyncio.gather(*p_tasks, *c_tasks)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:(注意一下get
和put
方法都是協程方法即可)
[生產者0]商品0.20252203397767787出廠了
[生產者0]商品0.9641503458079388出廠了
[消費者0]商品0.20252203397767787搶光了
[消費者0]商品0.9641503458079388搶光了
[生產者1]商品0.8049655468032324出廠了
[消費者0]商品0.8049655468032324搶光了
[生產者1]商品0.6032743557097342出廠了
[消費者1]商品0.6032743557097342搶光了
[生產者2]商品0.08818326334746773出廠了
[消費者2]商品0.08818326334746773搶光了
[生產者3]商品0.3747289313977561出廠了
[消費者3]商品0.3747289313977561搶光了
[生產者4]商品0.3948823110071299出廠了
[消費者4]商品0.3948823110071299搶光了
[生產者2]商品0.5775767044660681出廠了
[消費者0]商品0.5775767044660681搶光了
[生產者3]商品0.500537752889471出廠了
[消費者1]商品0.500537752889471搶光了
[生產者4]商品0.9921528527523727出廠了
[消費者2]商品0.9921528527523727搶光了
PS:協程也提供了Priority Queue
優先級隊列 and LifoQueue
后進先出隊列,這邊就不再啰嗦了(前面我們畫圖演示並手動實現過)
課后拓展:https://docs.python.org/3/library/asyncio-queue.html#examples
擴展:Subprocesses¶
官方文檔:https://docs.python.org/3/library/asyncio-subprocess.html
這個之前進程篇的時候說過,不是我們今天的重點,我貼一個官方demo:
import asyncio
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
asyncio.run(run('ls /zzz'))
輸出:
['ls /zzz' exited with 1]
[stderr]
ls: /zzz: No such file or directory
下節預告:asyncio
+aiohttp
版爬蟲
4.aiohttp¶
代碼:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutine/z_spider
asyncio
庫只有TCP
和UDP
服務,並不支持HTTP
,aiohttp
就可以理解為是基於asyncio
的http
服務
4.1.入門案例¶
先來個獲取頁面html的demo:
import asyncio
import aiohttp
error_urls = set()
# 獲取頁面html
async def fetch(session, url):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
error_urls.add(url) # 添加到待處理集合中
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, "http://www.biquge.cm/12/12097/")
if html: # 獲取到html
print(len(html))
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
24287
0.5429983139038086
4.2.html解析¶
推薦一款輕量級網頁解析庫:pyquery
(一個類似jquery的python庫)
4.2.1.列表頁¶
在上面基礎上簡單提取:(pq.items("dd a")
==> 類比JQ選擇器)
import asyncio
import aiohttp
from pyquery import PyQuery
error_urls = set()
# 獲取頁面html
async def fetch(session, url):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
error_urls.add(url) # 待處理的url集合
# 阻塞方法
def saves(results):
with open("www.biquge.cm.txt", "a+", encoding="utf-8") as fs:
fs.writelines(results)
print("ok")
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, "http://www.biquge.cm/12/12097/")
pq = PyQuery(html)
results = [
item.text() + ":" + item.attr("href") + "\n"
for item in pq.items("dd a")
]
# print(pq("dd a").text())
# 兼容阻塞舊代碼
await asyncio.get_running_loop().run_in_executor(None, saves, results)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:www.biquge.cm.txt
新書的一些話:/12/12097/7563947.html
第一章論壇里的鬼故事。:/12/12097/7563949.html
第二章臨時講課:/12/12097/7563950.html
第三章鬼域。:/12/12097/7563951.html
第四章恐怖敲門鬼:/12/12097/7565568.html
第五章迷路:/12/12097/7565569.html
第六章廁所中的手:/12/12097/7565570.html
第七章身后的腳步:/12/12097/7565571.html
第八章奇怪的樹:/12/12097/7565572.html
第九章鬼嬰:/12/12097/7565573.html
第十章惡鬼之力:/12/12097/7565574.html
...
第三百二十七章三口箱子:/12/12097/7950281.html
第三百二十八章鬼櫥里的照片:/12/12097/7952145.html
第三百二十九章中山市事件:/12/12097/7955244.html
第三百三十章兩條信息:/12/12097/7956401.html
第三百三十一章進入中山市:/12/12097/7959077.html
第三百三十二章出乎意料:/12/12097/7962119.html
第三百三十四章酒店的二樓:/12/12097/7964192.html
第三百三十五章黑色的燭火:/12/12097/7969058.html
第三百三十六章微笑的屍體:/12/12097/7973826.html
4.2.2.詳情頁¶
獲取一個詳情頁看看:
import asyncio
import aiohttp
from pyquery import PyQuery
error_urls = set()
# 獲取頁面html
async def fetch(session, url):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
error_urls.add(url) # 待處理的url集合
# 詳情頁獲取測試
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session,
"http://www.biquge.cm//12/12097/7563949.html")
pq = PyQuery(html)
print(pq("#content").text())
# results = [item.text() for item in pq.items("#content")]
# print(results)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
老夫掐指一算,你現在正在床上看小說,而且還是側身,搞不好手機還在充電。
正在讀高三的楊間此刻正躺在被窩里無聊的翻看着手機,他隨手點開了一個帖子,下面有不少網友在回帖。
“卧槽,樓主真乃神人也,這都被樓主猜中了。”
“呵,你會告訴你們我現在正在廁所蹲坑么?不用問了,腳麻了。”
......
0.6684205532073975
PS:Win下Py包安裝出錯就去這個網站下對應包 https://www.lfd.uci.edu/~gohlke/pythonlibs/
4.3.爬蟲小案例¶
4.3.1.小說網站實戰¶
限流以及反爬蟲和如何應對反爬蟲機制,后面我們會繼續說,這邊簡單舉個小說離線的例子:
import asyncio
import aiohttp
from pyquery import PyQuery
sem = None
error_urls = set()
# 獲取html
async def fetch(session, url):
async with sem:
async with session.get(url) as response:
if response.status == 200:
# aiohttp遇到非法字符的處理
return await response.text("gbk", "ignore") # 忽略非法字符
else:
error_urls.add(url) # 待處理的url集合
# 獲取文章正文
async def get_text(session, url):
# 把相對路徑改成域名+路徑
if not url.startswith("http://www.biquge.cm"):
url = "http://www.biquge.cm" + url
html = await fetch(session, url)
pq = PyQuery(html)
return pq("#content").text()
# 普通阻塞方法
def save(title, text):
with open("恐怖復蘇.md", "a+", encoding="gbk") as fs:
fs.write(f"## {title}\n\n{text}\n\n")
print(f"{title} done...")
async def main():
global sem
sem = asyncio.Semaphore(3) # 控制並發數反而更快
loop = asyncio.get_running_loop()
async with aiohttp.ClientSession() as session:
html = await fetch(session, "http://www.biquge.cm/12/12097/")
pq = PyQuery(html)
for item in pq.items("dd a"):
title = item.text()
text = await get_text(session, item.attr("href"))
# 兼容阻塞舊代碼
await loop.run_in_executor(None, save, title, text)
print("task over")
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:(爬取整站就不用我說了吧:提取a標簽中的src,url去重后爬取內容
)
新書的一些話 done...
第一章論壇里的鬼故事。 done...
第二章臨時講課 done...
第三章鬼域。 done...
第四章恐怖敲門鬼 done...
第五章迷路 done...
第六章廁所中的手 done...
第七章身后的腳步 done...
第八章奇怪的樹 done...
第九章鬼嬰 done...
第十章惡鬼之力 done...
第十一章逐漸復蘇 done...
第十二章宛如智障 done...
第十三章羊皮紙 done...
第十四章詭異的紙 done...
......
第三百二十八章鬼櫥里的照片 done...
第三百二十九章中山市事件 done...
第三百三十章兩條信息 done...
第三百三十一章進入中山市 done...
第三百三十二章出乎意料 done...
第三百三十四章酒店的二樓 done...
第三百三十五章黑色的燭火 done...
第三百三十六章微笑的屍體 done...
task over
動態展示:
閑言碎語¶
【推薦】Python高性能異步框架:https://github.com/LessChina/sanic
逆天點評:(只看主線,只說我的見識)
- 原來大家都是使用大一統的
Django
(方便) - 后來因為性能不佳,FaceBook開發了
Tornado
(IO多路復用)來代替 - 再后來時代主流是敏捷開發,於是就有了
Flask
(簡單) - 后來Node和Go火了,NetCore也出山了,Python的Flask等同步框架總是被吊打
- 於是被逼出了
Japronto
,瞬間驚艷和吊打的所有開發語言,但是只是冒了泡就不怎么維護了 - 后來就是AI爆發時期,Python直接打上了AI的標簽了,而Web也逐漸被打上了初創公司的標配
- 之后官方看不下去了,自己搞了一套異步框架
asyncio
andaiohttp
(Node兄弟這么優秀,憑啥我們不行) - 民間看不下去了來了個
asyncio
替代品uvloop
(C實現的程度比官方多(誰多誰高效),PS:官方用法太丑陋了3.7才給足了語法糖) - 解決方案雖然各種出,但是web框架不行啊,於是又冒了個主流
sanic
(語法和Flask
很像,性能不亞於Japronto
) - 現在又剛冒出
vibora
(都是C實現)有超過sanic
的趨勢(PS:等過幾個版本再試水,不過現在很多開發者都是Go + Python
了)
最后BB一句:
gevent
用猴子補丁的確很方便,但很多內部異常就被屏蔽了,而且性能現在不是最高tornado
為了兼容py2
和py3
,內部還是通過生成器來實現異步的,效率相對低點asyncio
是未來的主流方向,sanic
是目前最火的異步框架(vibora
還在觀察中)
PS:Django
、Flask
是阻塞式IO,web框架一般不會直接部署(它自帶的解決方案只是方便調試),一般使用uwsgi
or gunicorn
+ nginx
來部署(tornado可以直接部署)
參考鏈接:
python異步編程之asyncio
https://www.cnblogs.com/shenh/p/9090586.html
uWSGI, Gunicorn, 啥玩意兒?
https://www.cnblogs.com/gdkl/p/6807667.html
asyncio異步IO中文翻譯:
http://www.cnblogs.com/mamingqian/p/10008279.html
https://www.cnblogs.com/mamingqian/p/10075444.html
https://www.cnblogs.com/mamingqian/p/10044730.html
PyQuery基礎:
https://www.cnblogs.com/zhaof/p/6935473.html
https://www.cnblogs.com/lei0213/p/7676254.html