1、循環創建多個線程,並通過循環啟動執行
import threading from datetime import * from time import sleep # 單線程執行 def test(): print('hello world') t = threading.Thread(target=test) t.start() # 多線程執行 def test_01(): sleep(1) x = 0 while x == 0: # 設置一個死循環 print(datetime.now()) # 獲取當前系統時間 def looptest(): ''' 循環20次執行 test_o1()函數 :return: ''' for i in range(20): test_01() def thd(): ''' 創建並執行多個線程 需求:並發執行50次 test_o1()函數 說明:把50的並發拆成25個線程組,每個線程再循環20次執行 test_o1()函數,這樣在啟動下一個線程的時候, 上一個線程已經在循環了,以此類推,當啟動第25個線程的時候,可能已經執行了200次的t est_o1()函數, 這樣就可以大大減少並發的時間差異 :return: ''' Threads = [] for i in range(25): th = threading.Thread(target=looptest) Threads.append(th) ''' 守護線程:主線程執行完畢之后,會等待子線程全部執行完畢,才會關閉結束程序 必須加在start()之前,默認為 false ''' th.setDaemon(True) for th in Threads: th.start() for th in Threads: ''' 阻塞線程:等主線程執行完畢之后再關閉所有子線程 必須加在start()之后 可以通過join()的timeout參數來完美解決相互等待的問題,子線程告訴主線程讓其等待0.04秒, 0.04秒之內子線程完成,主線程就繼續往下執行,0.04秒之后如果子線程還未完成,主線程也會 繼續往下執行,執行完成之后關閉子線程 ''' th.join(0.04) if __name__=="__main__": print('start') thd() print('end')
2、並發測試框架
# 並發測試框架 THREAD_NUM = 1 ONE_WORKER_NUM = 1 def test(): pass # 測試代碼 def working(): global ONE_WORKER_NUM for i in range(0, ONE_WORKER_NUM): test() def t(): global THREAD_NUM Threads = [] for i in range(THREAD_NUM): t = threading.Thread(target=working,name='T'+str(i)) t.setDaemon(True) Threads.append(t) for t in Threads: t.start() for t in Threads: t.join() if __name__=="__main__": t()