來源:Redislabs
作者:Loris Cro
翻譯:Kevin (公眾號:中間件小哥)
近年來,許多編程語言都在努力改進它們的並發原語。Go 語言有 goroutines,Ruby 有 fibers,當然,還有 Node.js 幫助普及的 async/await,這是當今使用最為廣泛的並發操作類型。在本文中,我將以 python 為例討論 async/await 的基礎知識。我選擇python語言,是因為這個功能在python 3中比較新,很多用戶可能對它還不是很熟悉。使用 async/await 的主要原因是通過減少 I/O 執行時的空閑時間來提高程序的吞吐量。使用這個操作符的程序通過隱式地使用一個稱為事件循環的抽象來同時處理多個執行路徑。在某些方面,這些事件循環類似於多線程編程,但是事件循環通常存在於單個線程中,因此,它不能同時執行多個計算。正因為如此,單獨的事件循環不能提高計算密集型應用程序的性能。但是,對於進行大量網絡通信的程序,比如連接到Redis數據庫的應用程序,它可以極大地提高性能。每次程序向 Redis 發送一個命令時,它都會等待 Redis 的響應,如果 Redis 部署在另一台機器上,就會出現網絡延遲。而一個不使用事件循環的單線程應用程序在等待響應時處於空閑狀態,會占用大量的CPU周期。需要注意的是,網絡延遲是以毫秒為單位的,而 CPU 指令需要納秒來執行,這兩者相差六個數量級。這里舉個例子,下面的代碼樣例是用來跟蹤一個游戲的獲勝排行榜。每個流條目都包含獲勝者的名字,我們的程序會更新一個 Redis 的有序集合(Sorted Set),這個有序集合用來作為排行榜。這里我們主要關注的是阻塞代碼和非阻塞代碼的性能。
1 import redis 2 3 # The operation to perform for each event 4 def add_new_win(conn, winner): 5 conn.zincrby('wins_counter', 1, winner) 6 conn.incr('total_games_played') 7 8 def main(): 9 # Connect to Redis 10 conn = redis.Redis() 11 # Tail the event stream 12 last_id = '$' 13 while True: 14 events = conn.xread({'wins_stream': last_id}, block=0, count=10) 15 # Process each event by calling `add_new_win` 16 for _, e in events: 17 winner = e['winner'] 18 add_new_win(conn, winner) 19 last_id = e['id'] 20 21 if __name__ == '__main__': 22 main()
我們使用aio-libs/aioredis實現與上面代碼有相同效果的異步版本。aio-libs 社區正在重寫許多 Python 網絡庫,以包括對 asyncio 的支持,asyncio 是 Python 事件循環的標准庫實現。下面是上面代碼的非阻塞版本:
1 import asyncio 2 import aioredis 3 4 async def add_new_win(pool, winner): 5 await pool.zincrby('wins_counter', 1, winner) 6 await pool.incr('total_games_played') 7 8 async def main(): 9 # Connect to Redis 10 pool = await aioredis.create_redis_pool('redis://localhost', encoding='utf8') 11 # Tail the event stream 12 last_id = '$' 13 while True: 14 events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10) 15 # Process each event by calling `add_new_win` 16 for _, e_id, e in events: 17 winner = e['winner'] 18 await add_new_win(pool, winner) 19 last_id = e_id 20 21 if __name__ == '__main__': 22 loop = asyncio.get_event_loop() 23 loop.run_until_complete(main())
這段代碼與上面那段代碼相比,除了多了一些 await 關鍵字之外,其他的幾乎是相同的。最大的不同之處在最后兩行。在 Node.js 中,環境會默認加載事件循環,而在 Python 中,必須顯示地開啟。
重寫之后,我們可能會認為這么做就可以提高性能了。不幸的是,我們代碼的非阻塞版本還沒有提高性能。這里的問題在於我們編寫代碼的細節,而不僅僅是使用 async / await 的一般思想。
Await 使用的限制
我們重寫代碼后的主要問題是我們過度使用了 await。當我們在異步調用前面加上 await 時,我們做了以下兩件事:
1. 為執行做相應的調度
2. 等待完成
有時候,這樣做是對的。例如,在完成對第 15 行流的讀取之前,我們不能對每個事件進行迭代。在這種情況下,await 關鍵字是有意義的,但是看看 add_new_win 方法:
1 async def add_new_win(pool, winner): 2 await pool.zincrby('wins_counter', 1, winner) 3 await pool.incr('total_games_played')
在這個函數中,第二個操作並不依賴於第一個操作。我們可以將第二個命令與第一個命令一起發送,但是當我們發送第一個命令時,await 將阻塞執行流。我們其實更想要一種能立即執行這兩個操作的方法。為此,我們需要一個不同的同步原語。
1 async def add_new_win(pool, winner): 2 task1 = pool.zincrby('wins_counter', 1, winner) 3 task2 = pool.incr('total_games_played') 4 await asyncio.gather(task1, task2)
首先,調用一個異步函數不會執行其中的任何代碼,而是會先實例化一個“任務”。根據選擇的語言,這可能被稱為 coroutine, promise 或 future 等等。對我們來說,任務是一個對象,它表示一個值,該值只有在使用了 await 或其他同步原語(如 asyncio.gather)之后才可用。 在 Python 的官方文檔中,你可以找到更多關於 asyncio.gather 的信息。簡而言之,它允許我們在同一時間執行多個任務。我們需要等待它的結果,因為一旦所有的輸入任務完成,它就會創建一個新的任務。Python 的 asyncio.gather 相當於 JavaScript 的 Promise.all,C# 的 Task.WhenAll, Kotlin 的 awaitAll 等等。
改進我們的主循環代碼
我們對 add_new_win 所做的事情也可以用於主流事件處理循環。這是我所指的代碼:
1 last_id = '$' 2 while True: 3 events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10) 4 for _, e_id, e in events: 5 winner = e['winner'] 6 await add_new_win(pool, winner) 7 last_id = e_id
到目前為止,你會注意到我們是順序地處理每個事件。因為在第 6 行中,使用 await 既可以執行又可以等待 add_new_win 的完成。有時這正是你希望發生的情況,因為如果你不按順序執行,程序邏輯就會中斷。在我們的例子中,我們並不真正關心排序,因為我們只是更新計數器。
1 last_id = '$' 2 while True: 3 events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10) 4 tasks = [] 5 for _, e_id, e in events: 6 winner = e['winner'] 7 tasks.append(add_new_win(pool, winner)) 8 last_id = e_id 9 await asyncio.gather(*tasks)
我們現在也在並發地處理每一批事件,並且對代碼的改動是最小的。最后要記住,有時即使不使用 asyncio.gather,程序也可以是高性能的。特別是,當你為 web 服務器編寫代碼並使用像 Sanic 這樣的異步框架時,該框架將以並發的方式調用你的請求處理程序,即使你在等待每個異步函數調用,也能確保巨大的吞吐量。
總結
下面是我們進行上面兩個更改之后的完整代碼示例:
1 import asyncio 2 import aioredis 3 4 async def add_new_win(pool, winner): 5 # Creating tasks doesn't schedule them 6 # so you can create multiple and then 7 # schedule them all in one go using `gather` 8 task1 = pool.zincrby('wins_counter', 1, winner) 9 task2 = pool.incr('total_games_played') 10 await asyncio.gather(task1, task2) 11 12 async def main(): 13 # Connect to Redis 14 pool = await aioredis.create_redis_pool('redis://localhost', encoding='utf8') 15 # Tail the event stream 16 last_id = '$' 17 while True: 18 events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10) 19 tasks = [] 20 for _, e_id, e in events: 21 winner = e['winner'] 22 # Again we don't actually schedule any task, 23 # and instead just prepare them 24 tasks.append(add_new_win(pool, winner)) 25 last_id = e_id 26 # Notice the spread operator (`*tasks`), it 27 # allows using a single list as multiple arguments 28 # to a function call. 29 await asyncio.gather(*tasks) 30 31 if __name__ == '__main__': 32 loop = asyncio.get_event_loop() 33 loop.run_until_complete(main())
為了利用非阻塞 I/O,你需要重新考慮如何處理網絡操作。值得高興的是這並不是很困難,你只需要知道順序性什么時候重要,什么時候不重要。嘗試使用 aioredis 或等效的異步 redis 客戶端,看看可以在多大程度上提高應用程序的吞吐量。
更多優質中間件技術資訊/原創/翻譯文章/資料/干貨,請關注“中間件小哥”公眾號!
