了解異步編程
樓主在工作中遇到了以下問題,開發接口爬取數據代碼完成之后要寫入redis緩存,但是在寫入緩存的過程花費2-3s,進行這樣就大大影響了接口的性能,於是想到了使用異步存儲。
傳統的同步編程是一種請求響應模型,調用一個方法,等待其響應返回.
異步編程就是要重新考慮是否需要響應的問題,也就是縮小需要響應的地方。因為越快獲得響應,就是越同步化,順序化,事務化,性能差化。
線程實現異步
思路:通過線程調用的方式,來達到異步非阻塞的效果,也就是說主程序無需等待線程執行完畢,仍然可以繼續向下執行。
1.threading模塊和thread模塊
Python通過兩個標准庫thread和threading提供對線程的支持。thread提供了低級別的、原始的線程以及一個簡單的鎖。
threading 模塊提供的其他方法:
- threading.currentThread(): 返回當前的線程變量。
- threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
- threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
除了使用方法外,線程模塊同樣提供了Thread類來處理線程,Thread類提供了以下方法:
- run(): 用以表示線程活動的方法。
- start():啟動線程活動。
- join([time]): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
- isAlive(): 返回線程是否活動的。
- getName(): 返回線程名。
- setName(): 設置線程名。
同步阻塞:
1 import threading,time 2 3 def thead(num): 4 time.sleep(1) 5 print("阻塞程序%s開始執行"%num) 6 time.sleep(3) 7 print("阻塞程序%s執行完畢"%num) 8 9 def main(): 10 print("主方法開始執行") 11 12 for i in range(1,3): 13 thead(i) 14 15 print("主方法執行完畢") 16 return 17 18 if __name__ == '__main__': 19 print(time.ctime()) 20 num = main() 21 print("返回結果為%s"%num) 22 print(time.ctime())
Wed Nov 21 09:22:56 2018
主方法開始執行
阻塞程序1開始執行
阻塞程序1執行完畢
阻塞程序2開始執行
阻塞程序2執行完畢
主方法執行完畢
返回結果為None
Wed Nov 21 09:23:04 2018
異步,無需等待線程執行
import threading,time
def thead(num):
# time.sleep(1)
print("線程%s開始執行"%num)
time.sleep(3)
print("線程%s執行完畢"%num)
def main():
print("主方法開始執行")
#創建2個線程
poll = []#線程池
for i in range(1,3):
thead_one = threading.Thread(target=thead, args=(i,))
poll.append(thead_one) #線程池添加線程
for n in poll:
n.start() #准備就緒,等待cpu執行
print("主方法執行完畢")
return
if __name__ == '__main__':
print(time.ctime())
num = main()
print("返回結果為%s"%num)
print(time.ctime())
Wed Nov 21 09:48:00 2018
主方法開始執行
主方法執行完畢
返回結果為None
Wed Nov 21 09:48:00 2018
線程1開始執行
線程2開始執行
線程1執行完畢
線程2執行完畢
2.concurrent.futures模塊
concurrent.futures模塊實現了對threading(線程)
和multiprocessing(進程)
的更高級的抽象,對編寫線程池/進程池提供了直接的支持。
從Python3.2開始,標准庫為我們提供了concurrent.futures
模塊,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
兩個類,ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創建線程池和進程池的代碼。(暫時只介紹線程池的使用)
concurrent.futures模塊的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用,顧名思義兩者分別被用來創建線程池和進程池的代碼。我們可以將相應的tasks直接放入線程池/進程池,不需要維護Queue來操心死鎖的問題,線程池/進程池會自動幫我們調度。
Future這個概念你可以把它理解為一個在未來完成的操作,這是異步編程的基礎,傳統編程模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。
-
Future Objects:Future類封裝了可調用的異步執行.Future 實例通過 Executor.submit()方法創建。
- submit(fn, *args, **kwargs):調度可調用的fn,作為fn(args kwargs)執行,並返回一個表示可調用的執行的Future對象。
-
ThreadPoolExecutor:ThreadPoolExecutor是一個Executor的子類,它使用線程池來異步執行調用。
-
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=''):Executor子類,使用max_workers規格的線程池來執行異步調用。
在Flask應用中使用異步redis:
from flask import Flask import time from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor() app = Flask(__name__) @app.route('/') def update_redis(): executor.submit(do_update) return 'ok' def do_update(): time.sleep(3) print('start update cache') time.sleep(1) print("end") if __name__ == '__main__': app.run(debug=True)
“ok“在更新緩存前已經返回。
本文到這里就結束了,着重介紹了線程實現異步的方法。當然還有其他的方法,比如yied實現,還有asyncio模塊,后續會繼續更新異步編程的文章。
溫馨提示
- 本文代碼是在python3.5版本測試運行。
- 如果您對本文有疑問,請在評論部分留言,我會在最短時間回復。
- 如果本文幫助了您,也請評論關注,作為對我的一份鼓勵。
- 如果您感覺我寫的有問題,也請批評指正,我會盡量修改。
- 本文為原創,轉載請注明出處。