目錄:
同步|異步:
線程的三種狀態:
1.就緒
2.運行
3.阻塞
阻塞和非阻塞描述的是運行的狀態
阻塞 :遇到了IO操作,代碼卡住,無法執行下一行,CPU會切換到其他任務
非阻塞 :與阻塞相反,代碼正在執行(運行狀態) 或處於就緒狀態
同步和異步指的是提交任務的方式
同步 :提交任務必須等待任務完成,才能執行下一行
異步 :提交任務不需要等待任務完成,立即執行下一行
代碼:
1 def task(): 2 for i in range(1000000): 3 i += 1000 4 print("11111") 5 6 print("start") 7 task() # 同步提交方式,等函數運行完菜執行下一行 8 print("end") 9 10 from threading import Thread 11 12 print("start1") 13 Thread(target=task).start() # 異步提交,開啟線程,然后去執行之后的代碼,線程內代碼自行執行 14 print("end1")
異步回調:任務執行結束后自動調用某個函數
異步回調:
在發起異步任務后,子進程或子線程完成任務后需要通知任務發起方.通過調用一個函數,all_done_callback(函數名)
為什么需要回調? 子進程幫助主進程完成任務,處理任務的結果應該交還給主進程 其他方式也可以將數據交還給主進程 1.shutdown 主進程會等到所有任務完成 2.result函數 會阻塞直到任務完成 都會阻塞,導致效率降低,所以使用回調 注意: 回調函數什么時候被執行? 子進程任務完成時 誰在執行回調函數? 主進程 線程的異步回調 使用方式都相同,唯一的不同是執行回調函數,是子線程在執行(線程間數據共享)
三種方式:
1 # 方式1 自己來保存數據 並執行shutdown 僅在多線程 2 3 res = [] 4 def task(): 5 print("%s is 正在打水" % os.getpid()) 6 time.sleep(0.2) 7 w = "%s 打的水" % os.getpid() 8 res.append(w) 9 return w 10 11 if __name__ == '__main__': 12 for i in range(20): 13 # 提交任務會返回一個對象 用於回去執行狀態和結果 14 f = pool.submit(task) 15 print(f.result()) # 方式2 執行result 它是阻塞的直到任務完成 又變成串行了 16 17 print("11111") 18 # pool.shutdown() # 首先不允許提交新任務 然后等目前所有任務完成后 19 # print(res) 20 print("over") 21 22 ==================================================================================== 23 24 pool = ThreadPoolExecutor() 25 26 # 方式3 通過回調(什么是回調 任務執行結束后自動調用某個函數) 27 def task(): 28 print("%s is 正在打水" % os.getpid()) 29 # time.sleep(0.2) 30 w = "%s 打的水" % os.getpid() 31 return w 32 33 def task_finish(res): 34 print("打水完成! %s" % res) 35 36 if __name__ == '__main__': 37 for i in range(20): 38 # 提交任務會返回一個對象 用於回去執行狀態和結果 39 f = pool.submit(task) 40 f.add_done_callback(task_finish) #添加完成后的回調 41 print("11111") 42 print("over")
利用回調完成生產者消費者:
多進程:
1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 2 from threading import current_thread 3 import os 4 # 進程池 5 pool = ProcessPoolExecutor() 6 # 爬蟲:從網絡某個地址獲取一個HTML文件 7 import requests # 該模塊用於網絡(HTTP)請求 8 9 # 生產數據,即生產者 10 def get_data_task(url): 11 print(os.getpid(),"正在生產數據!") 12 # print(current_thread(),"正在生產數據!") 13 14 response = requests.get(url) 15 text = response.content.decode("utf-8") 16 print(text) 17 return text 18 19 # 處理數據,即消費者 20 def parser_data(f): 21 print(os.getpid(),"處理數據") 22 # print(current_thread(), "處理數據") 23 print("正在解析: 長度%s" % len(f.result())) 24 25 urls = [ 26 "http://www.baidu.com", 27 "http://www.baidu.com", 28 "http://www.baidu.com", 29 "http://www.baidu.com" 30 ] 31 32 if __name__ == '__main__': 33 for url in urls: 34 f = pool.submit(get_data_task,url) 35 f.add_done_callback(parser_data) # 回調函數是主進程在執行 36 # 因為子進程是負責獲取數據的,然而數據怎么處理 ,子進程並不知道.應該把數據還給主進程 37 print("over")
多線程:
1 from concurrent.futures import ThreadPoolExecutor 2 from threading import current_thread 3 # 進程池 4 pool = ThreadPoolExecutor() 5 6 # 爬蟲:從網絡某個地址獲取一個HTML文件 7 import requests # 該模塊用於網絡(HTTP)請求 8 9 # 生產數據 10 def get_data_task(url): 11 # print(os.getpid(),"正在生產數據!") 12 print(current_thread(),"正在生產數據!") 13 14 response = requests.get(url) 15 text = response.content.decode("utf-8") 16 print(text) 17 return text 18 19 # 處理數據 20 def parser_data(f): 21 # print(os.getpid(),"處理數據") 22 print(current_thread(), "處理數據") 23 print("正在解析: 長度%s" % len(f.result())) 24 25 urls = [ 26 "http://www.baidu.com", 27 "http://www.baidu.com", 28 "http://www.baidu.com", 29 "http://www.baidu.com" 30 ] 31 32 if __name__ == '__main__': 33 for url in urls: 34 f = pool.submit(get_data_task,url) 35 f.add_done_callback(parser_data) # 回調函數是主進程在執行 36 # 因為子進程是負責獲取數據的 然而數據怎么處理 子進程並不知道 應該把數據還給主進程 37 print("over")
線程隊列:
普通隊列/堆棧隊列/優先級隊列:
import queue # 普通隊列 先進先出 q = queue.Queue() q.put("a") q.put("b") print(q.get()) print(q.get()) # 堆棧隊列 先進后出 后進先出 函數調用就是進棧 函數結束就出棧 遞歸造成棧溢出 q2 = queue.LifoQueue() q2.put("a") q2.put("b") print(q2.get()) # 優先級隊列 q3 = queue.PriorityQueue() # 數值越小優先級越高 優先級相同時 比較大小 小的先取 q3.put((-100, "c")) q3.put((1, "a")) q3.put((100, "b")) print(q3.get())
協程:在單線程下由應用程序級別實現並發
什么是協程? 協程指的是單線程下由應用程序級別實現的並發 即把本來由操作系統控制的切換+保存狀態,在應用 程序里實現了 協程的切換vs操作系統的切換 優點: 切換速度遠快於操作系統 缺點: 一個任務阻塞了,其余的任務都無法執行 ps:只有遇到io才切換到其他任務的協程才能提升 單線程的執行效率 為何用協程? 把單個線程的io降到最低,最大限度地提升單個線程的執行效率 如何實現協程? from gevent import spawn,monkey;monkey.patch_all()
協程的目的是在單線程下實現並發 為什么出現協程? 因為cpython中,由於GIL而導致同一時間只有一個線程在跑 意味着:如果你的程序時計算密集,多線程效率也不會提升 如果是io密集型 沒有必要在單線程下實現並發,我會開啟多線程來處理io,子線遇到io,cpu切走. 不能保證一定切到主線 如果可以,我在遇到io的時候轉而去做計算,這樣一來可以保證cpu一直在處理你的程序,當然處理時間太長也要切走 總結:單線程下實現並發,是將io阻塞時間用於執行計算,可以提高效率 原理:一直使用CPU直到超時 怎么實現單線程並發? 並發:指的是看起來像是同時運行,實際是在任務間來回切換,同時需要保存執行的狀態 任務一堆代碼 可以用函數裝起來 1.如何讓兩個函數切換執行 yield可以保存函數的執行狀態 通過生成器可以實現偽並發 並發不一定提升效率,當任務全是計算時,反而會降低效率 2.如何知道發生了io, 從而切換執行? 第三方模塊,gevent 第三方模塊 greenlet 可以實現並發 但是不能檢測io 第三方模塊 gevent 封裝greenlet 可以實現單線程並發,並且能夠檢測io操作,自動切換
協程的應用場景:
TCP 多客戶端實現方式
1.來一個客戶端就來一個進程 資源消耗較大
2.來一個客戶端就來一個線程 也不能無限開
3.用進程池 或 線程池 還是一個線程或進程只能維護一個連接
4.協程 一個線程就可以處理多個客戶端 遇到io就切到另一個
協成實現:單線程實現並發
1.yield 把函數做成生成器,生成器會自動保存狀態
1 # 這是一個進程,默認包含一個主線程 2 import time
#生成器函數 3 def task(): 4 while True: 5 print("task1") 6 time.sleep(1)#I/O,CPU切走 7 yield 1 8 9 def task2(): 10 g = task() 11 while True: 12 try: 13 print("task2") 14 next(g)#next()函數參數傳一個可迭代對象 15 except Exception: 16 print("任務完成") 17 break 18 task2() 19 打印結果: 20 task2 21 task1 22 task2 23 task1 24 task2 25 task1 26 ..........
2.greenlet模塊:幫我們封裝yield,可以實現任務切換,但是不能檢測I/O
# 1.實例化greenlet得到一個對象,傳入要執行的任務,至少需要兩個任務 # 2.先讓某個任務執行起來,使用對象調用switch # 3.在任務的執行過程中,手動調用switch來切換
1 import greenlet 2 import time 3 def task1(): 4 print("task1 1") 5 time.sleep(2) 6 g2.switch() 7 print("task1 2") 8 g2.switch() 9 10 def task2(): 11 print("task2 1") 12 g1.switch() 13 print("task2 2") 14 15 g1 = greenlet.greenlet(task1) 16 g2 = greenlet.greenlet(task2) 17 18 g1.switch()
3.gevent:在greenlet的基礎上封裝檢測io操作,自動切換
# 1.spawn函數傳入你的任務 # 2.調用join 去開啟任務 # 3.檢測io操作需要打monkey補丁,就是一個函數,在程序最開始的地方調用它
1 from gevent import monkey 2 monkey.patch_all() 3 4 import gevent 5 import time 6 def eat(): 7 print('eat food 1') 8 time.sleep(2) 9 print('eat food 2') 10 11 def play(): 12 print('play 1') 13 time.sleep(1) 14 print('play 2') 15 16 g1=gevent.spawn(eat) 17 g2=gevent.spawn(play) 18 19 gevent.joinall([g1,g2]) 20 print('主')
協程實現TCP:
服務端:
1 import gevent 2 from gevent import monkey 3 monkey.patch_all() 4 import socket 5 6 server = socket.socket() 7 # 重用端口 8 server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) 9 10 server.bind(("127.0.0.1",9999)) 11 12 server.listen(5) 13 def data_handler(conn): 14 print("一個新連接..") 15 while True: 16 data = conn.recv(1024) 17 conn.send(data.upper()) 18 19 while True: 20 conn,addr = server.accept() 21 # 切到處理數據的任務去執行 22 gevent.spawn(data_handler,conn)
客戶端:
1 import socket 2 3 c = socket.socket() 4 5 c.connect(("127.0.0.1", 9999)) 6 7 while True: 8 msg = input(">>>:") 9 if not msg: continue 10 c.send(msg.encode("utf-8")) 11 data = c.recv(1024) 12 print(data.decode("utf-8"))
