先惡補一下知識點,上節回顧
上下文切換:當CPU從執行一個線程切換到執行另外一個線程的時候,它需要先存儲當前線程的本地的數據,程序指針等,然后載入另一個線程的本地數據,程序指針等,最后才開始執行。這種切換稱為“上下文切換”(“context switch”)
CPU會在一個上下文中執行一個線程,然后切換到另外一個上下文中執行另外一個線程,上下文切換並不廉價。如果沒有必要,應該減少上下文切換的發生
進程: 一個程序需要運行所需的資源的集合
每個進程數據是獨立的
每個進程里至少有一個線程
每個進程里有可以多有個線程
線程數據是共享的
進程間共享數據的代價是高昂的,所以要盡量避免進程間的數據共享
線程間的數據本來就是共享的
線程要修改同一份數據,必須加鎖,互斥鎖mutex
生產者消費者:1.解耦2.提高程序的運行效率,把中間等待的時間省去
多線程場景: IO密集型,因為IO操作基本不占用CPU,所以多用在web,爬蟲,socket交互
多進程場景:CPU密集型,大數據分析,金融分析,這樣用的IO就很少,因為這個進程會進行大量的運算, 但是如果切換了進程,就會變慢
協程
協程:微線程, 協程是一種用戶態的輕量級線程,CPU不知道它的存在,
協程擁有自己的寄存器上下文和棧.協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧,
因此協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合,),每次過程重入時,就相當於上一次調用的狀態, 也就是進入上一次離開時所處邏輯流的位置
協程的好處:(是程序級別切換,CPU是不知道的.)
1.無需線程上下文切換,
2.無需原子操作鎖定及同步開銷 , 什么是原子操作? :是不需要同步的!!,是指不會被線程調度打斷的操作;這種操作一旦開始,就運行到結束,中間不會有任何 context switch(切換到另一個線程,)
原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心
3.方便切換控制流,簡化編程模型
4.高並發 + 高擴展 + 低成本 : 一個CPU支持上萬的協程都不是問題,所以很適合用於高並發處理
壞處-----:
1.無法利用多核資源,協程的本質是個單線程,它不能同時將單個CPU的多個核用上, 協程需要配合進程才能在多CPU上, 適用於CPU密集型應用
2.進程阻塞 (Blocking) 操作 如IO操作時,會阻塞掉整個程序
----什么條件符合才能稱之為協程?
A.必須在只有一個單線程里實現並發
B.修改共享數據不需要加鎖
C.用戶程序里自己保持多個控制流的上下文棧
D.一個協程遇到IO操作自動切換到其他協程!!!!!!
重點來了。。。。。 大量的模塊知識點---我希望我以后還能記起來----汗顏!
Greenlet模塊
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator(生成器)
from greenlet import greenlet def test1(): print('test1:我是1') gr2.switch() #切換到test2 print('test1:我是1.1') gr2.switch() def test2(): print('test2:我是2') gr1.switch() #切換到test1 print('test2:我是2.2') gr1=greenlet(test1) gr2=greenlet(test2) gr1.switch() #先切換到test1 >> test1:我是1 test2:我是2 test1:我是1.1 test2:我是2.2
swich() 就是切換, 按執行順序-- 但是遇到IO操作 好像並沒有自動切換
Gevent模塊
Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
這里使用gevent.sleep 來獲取控制權
import gevent def func1(): print('\033[31;1m我是func1\033[0m') gevent.sleep(3) print('\033[31;1m我是func1.1--我上面有3秒\033[0m') def func2(): print('\033[32;1m我是func2.\033[0m') gevent.sleep(2) print('\033[32;1m我是func2.1 我上面有2秒\033[0m') def func3(): print('\033[32;1m我是func3.\033[0m') gevent.sleep(2) print('\033[32;1m我是func3.1我上面有2秒\033[0m') gevent.joinall([gevent.spawn(func1), gevent.spawn(func2), gevent.spawn(func3),])
這里會按照sleep 設置來執行 一定會先打印出func2-->func3-->func1
同步和異步的性能區別
import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(1, 10): task(i) def asynchronous(): #threads = [gevent.spawn(task, i) for i in range(10)] threads=[] for i in range(10): threads.append(gevent.spawn(task,i)) gevent.joinall(threads) #執行流程只會在 所有greenlet執行完后才會繼續向下走 print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
Synchrounous:定義了同步的函數:定義一個for循環。依次把內容傳輸給task函數,然后打印執行結果-----
Aynchrounous:定義了異步的函數: 這里用到了一個gevent.spawn方法,就是產生的意思. gevent.joinall 也就是等待所以操作都執行完畢
gevent.spawn 可以調用函數
可是我們一般也不會這么用。去故意的設置一個gevent.sleep來切換 ,下面就來在實際場景中應用
遇到IO阻塞,自動切換任務
這里就用到了簡單的網頁爬蟲環境中, 操作IO的時候。自動切換。這里就用到了猴子補丁(monkey.patch_all(), 知道這是運行時,動態修改已有的代碼,而不需要修改原始代碼)
from gevent import monkey import gevent import time from urllib.request import urlopen monkey.patch_all() #對比得出 協程 運行出的更快 #IO阻塞 自動切換任務。。 def say(url): print('get url',url) resp = urlopen(url) data = resp.read() print(len(data),url) t1_start = time.time() say('http://www.xiaohuar.com/') say('http://www.oldboyedu.com/') print("普通--time cost",time.time() - t1_start) t2_stat = time.time() gevent.joinall( [gevent.spawn(say,'http://www.xiaohuar.com/'), gevent.spawn(say,'http://www.oldboyedu.com/'), gevent.spawn(say,'http://weibo.com/MMbdzx?from=myfollow_all&is_all=1#_rnd1482040021384')] ) print("gevent---time cost",time.time() - t2_stat)
由於切換時再IO操作就自動完成,所以需要gevent修改py自帶的標准庫,這一過程在啟動時通過monkey patch完成 --
對比2次運行完畢的時間,很明顯的看到gevent在處理上,更加有優勢,
到了這里簡單的就算完了。。。來進入總結概念的部分--------http://www.cnblogs.com/zcqdream/p/6196948.html
通過gevent來實現單線程下的多socket並發
server 端,采用gevent協程
1 import sys 2 import socket 3 import time 4 import gevent 5 6 from gevent import socket,monkey 7 monkey.patch_all() 8 9 10 def server(port): 11 s = socket.socket() 12 s.bind(('0.0.0.0', port)) 13 s.listen(500) 14 while True: 15 cli, addr = s.accept() 16 gevent.spawn(handle_request, cli) #gevent.spwan調用handle參數並傳參 17 18 19 20 def handle_request(conn): 21 try: 22 while True: 23 data = conn.recv(1024) 24 print("recv:", data) 25 conn.send(data) 26 if not data: 27 conn.shutdown(socket.SHUT_WR) 28 29 except Exception as ex: 30 print(ex) 31 finally: 32 conn.close() 33 if __name__ == '__main__': 34 server(8001)
client端
單線程的客戶端
1 import socket 2 3 HOST = 'localhost' # The remote host 4 PORT = 8001 # The same port as used by the server 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 6 s.connect((HOST, PORT)) 7 while True: 8 msg = bytes(input(">>:"),encoding="utf8") 9 s.sendall(msg) 10 data = s.recv(1024) 11 #print(data) 12 13 print('Received', repr(data)) 14 s.close()
多線程客戶端去請求
import socket import threading def sock_conn(): client = socket.socket() client.connect(("localhost",8001)) count = 0 while True: #msg = input(">>:").strip() #if len(msg) == 0:continue client.send( ("hello %s" %count).encode("utf-8")) data = client.recv(1024) print("[%s]recv from server:" % threading.get_ident(),data.decode()) #結果 count +=1 client.close() for i in range(100): t = threading.Thread(target=sock_conn) t.start()
