目錄
-多線程使用場景
-多進程
--簡單的一個多進程例子
--進程間數據的交互實現方法
---通過Queues和Pipe可以實現進程間數據的傳遞,但是不能實現數據的共享
---Queues
---Pipe
---通過Manager可以不同進程間實現數據的共享
--進程同步,即進程鎖
--進程池
-協程
--先用yield實現簡單的協程
--Greenlet
--Gevent
--用協程gevent寫一個簡單並發爬網頁
-事件驅動
--IO多路復用
---用戶空間和內核空間
---文件描述符fd
---緩存IO
--IO模式
---阻塞I/O(blocking IO)
---非阻塞I/O
---I/O多路復用(IO multiplexing)
---異步I/O(asynchronous IO)
-關於select poll epoll
--select
--poll
--epoll
--以select方法為例子進行理解
多線程的使用場景
IO操作不占用CPU
計算占用cpu
python多線程不適合cpu密集型操作的任務,適合IO操作密集型的任務
多進程
簡單的一個多進程例子:(用於理解對多線程方法的使用)
和線程的方法類似,下面是一個簡單的多進程代碼
1 #AUTHOR:FAN 2 import time,multiprocessing 3 4 def run(name): 5 time.sleep(2) 6 print("hello",name) 7 8 if __name__ =="__main__": 9 for i in range(6): 10 p = multiprocessing.Process(target=run,args=("dean",)) 11 p.start()
和之前學習的多線程結合在一起使用,代碼如下:
1 #AUTHOR:FAN 2 3 import time,threading 4 import multiprocessing 5 6 def thread_run(): 7 print(threading.get_ident()) #這里表示獲取線程id 8 9 10 def run(name): 11 time.sleep(2) 12 print("hello",name) 13 t=threading.Thread(target=thread_run) 14 t.start() 15 16 if __name__ =="__main__": 17 for i in range(6): 18 p = multiprocessing.Process(target=run,args=("dean",)) 19 p.start()
運行結果如下:

1 D:\python35\python.exe D:/python培訓/s14/day10/進程與線程結合使用.py 2 hello dean 3 10008 4 hello dean 5 9276 6 hello dean 7 8096 8 hello dean 9 1308 10 hello dean 11 hello dean 12 10112 13 8032 14 15 Process finished with exit code 0
接着我們查看下面代碼:
1 #AUTHOR:FAN 2 3 from multiprocessing import Process 4 import os 5 6 7 def info(title): 8 print(title) 9 print('module name:', __name__) 10 print('parent process:', os.getppid()) 11 print('process id:', os.getpid()) 12 print("\n\n") 13 14 15 def f(name): 16 info('\033[31;1mcalled from child process function f\033[0m') 17 print('hello', name) 18 19 if __name__ == '__main__': 20 info('\033[32;1mmain process line\033[0m')
運行結果如下:

1 D:\python35\python.exe D:/python培訓/s14/day10/獲取進程id.py 2 main process line 3 module name: __main__ 4 parent process: 8368 5 process id: 7664 6 7 8 9 10 Process finished with exit code 0
我們這里可以看到父進程id:8368,並且會發現無論程序運行多少次都是這個,然后我們在windows任務管理器查看發現這個是pycharm的進程id,如下圖:
這里要記住:每一個子進程都是由父進程啟動的
我們將上面代碼中if __name__=”__main__”進行修改,如下:
1 if __name__ == '__main__': 2 info('\033[32;1mmain process line\033[0m') 3 p = Process(target=f, args=('bob',)) 4 p.start()
運行結果如下:
進程間數據的交互,實現方法
通過Queues和Pipe可以實現進程間數據的傳遞,但是不能實現數據的共享
不同進程間內存不是共享的,要想實現兩個進程間的數據交換,有一下方法:
Queues
使用方法和threading里的queue使用差不多
先回憶一下線程之間的數據共享,通過下面代碼理解:
1 #AUTHOR:FAN 2 import threading 3 import queue 4 5 def func(): 6 q.put([22,"dean",'hello']) 7 8 if __name__=="__main__": 9 q = queue.Queue() 10 t = threading.Thread(target=func) 11 t.start() 12 print(q.get(q))
運行結果:

1 D:\python35\python.exe D:/python培訓/s14/day10/線程之間數據的共享.py 2 [22, 'dean', 'hello'] 3 4 Process finished with exit code 0
從上述代碼可以看出線程之間的數據是共享的:父線程可以訪問子線程放入的數據
如果是多進程之間呢?
將代碼進行修改如下,讓子進程調用父進程數據:
1 from multiprocessing import Process 2 import queue 3 4 5 6 def f(): 7 q.put([11,None,"hello"]) 8 9 10 if __name__=="__main__": 11 q = queue.Queue() 12 p = Process(target=f) 13 p.start() 14 print(q.get())
運行結果如下:
從這里我們也可以看出子進程是訪問不到父進程的數據
我們再次將代碼進行修改,寫f方法的時候直接將q給線程傳入,也就是,只有啟動線程,就自動傳入線程q,代碼如下:
1 #AUTHOR:FAN 2 3 from multiprocessing import Process 4 import queue 5 6 7 def f(data): 8 data.put([11,None,"hello"]) 9 10 if __name__=="__main__": 11 q = queue.Queue() #切記這里是線程q 12 p = Process(target=f,args=(q,)) 13 p.start() 14 print(q.get())
運行結果如下:
這里我們需要知道:進程不能訪問線程q
所以我們需要改成進程,代碼如下:
1 #AUTHOR:FAN 2 3 from multiprocessing import Process,Queue 4 5 6 def f(data): 7 data.put([11,None,"hello"]) 8 9 if __name__=="__main__": 10 q = Queue() 這里的q是進程q 11 p = Process(target=f,args=(q,)) 12 p.start() 13 print(q.get())
運行結果如下:

1 D:\python35\python.exe D:/python培訓/s14/day10/子進程訪問父進程數據.py 2 [11, None, 'hello'] 3 4 Process finished with exit code 0
這次我們就發現在父進程里就可以調用到子進程放入的數據
這里我們需要明白:這里的q其實是被克隆了一個q,然后將子線程序列化的內容傳入的克隆q,然后再反序列化給q,從而實現了進程之間數據的傳遞
Pipe
實現代碼例子:
1 #AUTHOR:FAN 2 3 from multiprocessing import Process,Pipe 4 5 def f(conn): 6 conn.send([22,None,"hello from child"]) 7 conn.send([22,None,"hello from child2"]) 8 print(conn.recv()) 9 conn.close() 10 11 if __name__=="__main__": 12 left_conn,right_conn = Pipe() 13 p = Process(target=f,args=(right_conn,)) 14 p.start() 15 print(left_conn.recv()) 16 print(left_conn.recv()) 17 left_conn.send("我是left_conn")
運行結果如下:

1 D:\python35\python.exe D:/python培訓/s14/day10/通過pipes實現進程間數據傳遞.py 2 [22, None, 'hello from child'] 3 [22, None, 'hello from child2'] 4 我是left_conn 5 6 Process finished with exit code 0
對上面代碼分析:pip()會生成兩個值,上面的left_conn和right_conn,這就如同一條網線的兩頭,兩頭都可以發送和接收數據
通過Manager可以不同進程間實現數據的共享
通過下面代碼進行理解:
1 #AUTHOR:FAN 2 from multiprocessing import Manager,Process 3 import os 4 5 def f(d,l): 6 d[1]="1" 7 d["2"] = 2 8 d[0.25] = None 9 l.append(os.getpid()) 10 print(l) 11 12 if __name__ == "__main__": 13 with Manager() as manager: #這種方式和直接manager=Manager()一樣 14 d = manager.dict() #生成一個字典,可以在多個進程間共享 15 l = manager.list(range(5)) #生成一個列表,可以在多個進程間共享 16 p_list = [] 17 for i in range(10): 18 p = Process(target=f,args=(d,l)) 19 p.start() 20 p_list.append(p) 21 for res in p_list: 22 res.join() 23 24 print(d) 25 print(l)
運行結果如下:

1 D:\python35\python.exe D:/python培訓/s14/day10/Manager實現進程間數據的共享.py 2 [0, 1, 2, 3, 4, 9756] 3 [0, 1, 2, 3, 4, 9756, 3352] 4 [0, 1, 2, 3, 4, 9756, 3352, 9220] 5 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736] 6 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724] 7 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860] 8 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084] 9 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452] 10 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452, 7376] 11 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452, 7376, 9952] 12 {0.25: None, 1: '1', '2': 2} 13 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452, 7376, 9952] 14 15 Process finished with exit code 0
通過結果可以看出已經實現了不同進程間數據的共享
進程同步,即進程鎖
1 #AUTHOR:FAN 2 from multiprocessing import Process, Lock 3 4 5 def f(l, i): 6 l.acquire() 7 print('hello world', i) 8 l.release() 9 10 if __name__ == '__main__': 11 lock = Lock() 12 for num in range(10): 13 Process(target=f, args=(lock, num)).start()
打印結果如下:

1 D:\python35\python.exe D:/python培訓/s14/day10/進程鎖.py 2 hello world 3 3 hello world 2 4 hello world 1 5 hello world 0 6 hello world 7 7 hello world 6 8 hello world 4 9 hello world 5 10 hello world 9 11 hello world 8 12 13 Process finished with exit code 0
可能會覺得這個加鎖沒有上面作用,其實是這樣的,當在屏幕上打印這些內容的時候,不同進程之間是共享這個屏幕的,鎖的作用在於當一個進程開始打印的時候,其他線程不能打印,從而防止打印亂內容
在windows上可能看不到效果,當不同進程打印的東西比較多的時候,就可以看到打印數據出現亂的情況
進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
apply
apply_async(這個就表示異步)
從下面代碼一點一點分析
1 #AUTHOR:FAN 2 3 from multiprocessing import Process, Pool 4 import time 5 import os 6 7 8 def Foo(i): 9 time.sleep(2) 10 print("in the process",os.getpid()) 11 return i + 100 12 13 14 def Bar(arg): 15 print('-->exec done:', arg) 16 17 if __name__ == "__main__": 18 pool = Pool(5) 19 20 for i in range(10): 21 pool.apply(func=Foo, args=(i,)) 22 print('end') 23 pool.close() 24 pool.join() # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
這樣運行結果發現,程序變成了串行了。
將上述代碼中的:
pool.apply(func=Foo, args=(i,))
替換為:
pool.apply_async(func=Foo,args=(i,))
之后就解決了之前的的問題
這個時候我們再次將
pool.apply_async(func=Foo,args=(i,))
替換為,這里的callback叫做回調函數
pool.apply_async(func=Foo, args=(i,), callback=Bar)
運行結果如下:

1 D:\python35\python.exe D:/python培訓/s14/day10/進程池.py 2 end 3 in the process 10876 4 -->exec done: 100 5 in the process 5084 6 -->exec done: 101 7 in the process 9648 8 -->exec done: 102 9 in the process 11028 10 -->exec done: 103 11 in the process 8528 12 -->exec done: 104 13 in the process 10876 14 -->exec done: 105 15 in the process 5084 16 -->exec done: 106 17 in the process 9648 18 -->exec done: 107 19 in the process 11028 20 -->exec done: 108 21 in the process 8528 22 -->exec done: 109 23 24 Process finished with exit code 0
下面將代碼進行修改,確定回調函數是由子進程還是主進程調用
1 #AUTHOR:FAN 2 3 from multiprocessing import Process, Pool 4 import time 5 import os 6 7 8 def Foo(i): 9 time.sleep(2) 10 print("in the process",os.getpid()) 11 return i + 100 12 13 14 def Bar(arg): 15 print('-->exec done:', arg,os.getpid()) 16 17 if __name__ == "__main__": 18 pool = Pool(5) 19 print(os.getpid()) 20 for i in range(5): 21 pool.apply_async(func=Foo, args=(i,), callback=Bar) 22 #pool.apply(func=Foo, args=(i,)) 23 #pool.apply_async(func=Foo,args=(i,)) 24 25 print('end') 26 pool.close() 27 pool.join() # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
運行結果如下,可以看出回調函數的pid和主進程是一樣的
協程
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程。
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
協程的好處:
無需線程上下文切換的開銷
無需原子操作鎖定及同步的開銷
方便切換控制流,簡化編程模型
高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。
缺點:
無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
先用yield實現簡單的協程
1 #AUTHOR:FAN 2 3 import time 4 import queue 5 6 7 def consumer(name): 8 print("--->starting eating baozi...") 9 while True: 10 new_baozi = yield 11 print("[%s] is eating baozi %s" % (name, new_baozi)) 12 time.sleep(1) 13 def producer(): 14 r = con.__next__() 15 r = con2.__next__() 16 n = 0 17 while n < 5: 18 n += 1 19 con.send(n) 20 con2.send(n) 21 print("\033[32;1m[producer]\033[0m is making baozi %s" % n) 22 if __name__ == '__main__': 23 con = consumer("c1") 24 con2 = consumer("c2") 25 p = producer()
運行結果如下:
1 D:\python35\python.exe D:/python培訓/s14/day10/yield實現協程.py 2 --->starting eating baozi... 3 --->starting eating baozi... 4 [c1] is eating baozi 1 5 [c2] is eating baozi 1 6 [producer] is making baozi 1 7 [c1] is eating baozi 2 8 [c2] is eating baozi 2 9 [producer] is making baozi 2 10 [c1] is eating baozi 3 11 [c2] is eating baozi 3 12 [producer] is making baozi 3 13 [c1] is eating baozi 4 14 [c2] is eating baozi 4 15 [producer] is making baozi 4 16 [c1] is eating baozi 5 17 [c2] is eating baozi 5 18 [producer] is making baozi 5 19 20 Process finished with exit code 0
Greenlet
1 #AUTHOR:FAN 2 3 from greenlet import greenlet 4 5 def test1(): 6 print(10) 7 gr2.switch() 8 print(11) 9 gr2.switch() 10 11 12 def test2(): 13 print(12) 14 gr1.switch() 15 print(13) 16 17 18 gr1 = greenlet(test1) #啟動一個協程 19 gr2 = greenlet(test2) 20 gr1.switch()
這里的gr1.switch()是手動切換
Gevent
Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度.
通過下面代碼進行理解:
1 import gevent 2 3 4 def foo(): 5 print('Running in foo1') 6 gevent.sleep(2) 7 print('Running in foo2') 8 9 def bar(): 10 print('Running in bar1') 11 gevent.sleep(1) 12 print('Running in bar2') 13 14 def func3(): 15 print("running in func1") 16 gevent.sleep(0) 17 print("running in func2") 18 19 20 gevent.joinall([ 21 gevent.spawn(foo), 22 gevent.spawn(bar), 23 gevent.spawn(func3), 24 ])
執行結果如下:

1 D:\python35\python.exe D:/python培訓/s14/day10/自動IO切換.py 2 Running in foo1 3 Running in bar1 4 running in func1 5 running in func2 6 Running in bar2 7 Running in foo2 8 9 Process finished with exit code 0
從運行結果可以看出,通過gevent.sleep()模擬執行IO操作,從而實現自動切換,程序最終花費的時間還是2秒
用協程gevent寫一個簡單並發爬網頁
1 #AUTHOR:FAN 2 3 from urllib import request 4 import gevent,time 5 6 def f(url): 7 print("get:%s" %url) 8 resp = request.urlopen(url) 9 data = resp.read() 10 print("%d bytes received from %s" %(len(data),url)) 11 12 13 urls = ["http://sina.com.cn", 14 "http://www.cnblogs.com/", 15 "https://news.cnblogs.com/" 16 ] 17 18 time_start = time.time() 19 for url in urls: 20 f(url) 21 22 print("同步串行cost:",time.time()-time_start) 23 24 async_time = time.time() 25 gevent.joinall([ 26 gevent.spawn(f,"http://sina.com.cn"), 27 gevent.spawn(f,"http://www.cnblogs.com/"), 28 gevent.spawn(f,"https://news.cnblogs.com/") 29 ]) 30 print("異步cost:",time.time()-async_time)
這樣的運行結果:
這里可以看出異步的時候和串行執行的時間基本一樣,其實這里的異步並沒有起作用,因為這里的gevent並不能識別出urllib執行時的IO操作,想要是gevent實現異步的方法是導入模塊:from gevent import monkey
將代碼進行修改如下:
1 #AUTHOR:FAN 2 3 from urllib import request 4 import gevent,time 5 from gevent import monkey 6 7 monkey.patch_all() 8 def f(url): 9 print("get:%s" %url) 10 resp = request.urlopen(url) 11 data = resp.read() 12 print("%d bytes received from %s" %(len(data),url)) 13 14 15 urls = ["http://sina.com.cn", 16 "http://www.cnblogs.com/", 17 "https://news.cnblogs.com/" 18 ] 19 20 time_start = time.time() 21 for url in urls: 22 f(url) 23 24 print("同步串行cost:",time.time()-time_start) 25 26 async_time = time.time() 27 gevent.joinall([ 28 gevent.spawn(f,"http://sina.com.cn"), 29 gevent.spawn(f,"http://www.cnblogs.com/"), 30 gevent.spawn(f,"https://news.cnblogs.com/") 31 ]) 32 print("異步cost:",time.time()-async_time)
然后執行,結果如下:
事件驅動
通常,我們寫服務器處理模型的程序時,有以下幾種模型:
(1)每收到一個請求,創建一個新的進程,來處理該請求;
(2)每收到一個請求,創建一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程通過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)中方法,由於創建新的進程的開銷比較大,所以,會導致服務器性能比較差,但實現比較簡單。
第(2)種方式,由於要涉及到線程的同步,有可能會面臨死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都復雜。
綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數網絡服務器采用的方式
目前大部分的UI編程都是事件驅動模型,如很多UI平台都會提供onClick()事件,這個事件就代表鼠標按下事件。事件驅動模型大體思路如下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增加一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不同的事件,調用不同的函數,如onClick()、onKeyDown()等;
4. 事件(消息)一般都各自保存各自的處理函數指針,這樣,每個消息都有獨立的處理函數
事件驅動編程是一種編程范式,這里程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程范式是(單線程)同步以及多線程編程。
讓我們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展示了隨着時間的推移,這三種模式下程序所做的工作。這個程序有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間已經用灰色框標示出來了。
在單線程同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之后它們才能依次執行。這種明確的執行順序和串行化處理的行為是很容易推斷得出的。如果任務之間並沒有互相依賴的關系,但仍然需要互相等待的話這就使得程序不必要的降低了運行速度。
在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操作系統來管理,在多處理器系統上可以並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其他線程得以繼續執行。與完成類似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,因為這類程序不得不通過線程同步機制如鎖、可重入函數、線程局部存儲或者其他機制來處理線程安全問題,如果實現不當就會導致出現微妙且令人痛不欲生的bug。
在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其他昂貴的操作時,注冊一個回調到事件循環中,然后當I/O操作完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢所有的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序盡可能的得以執行而不需要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行為,因為程序員不需要關心線程安全問題。
當我們面對如下的環境時,事件驅動模型通常是一個好的選擇:
(1)程序中有許多任務
(2)任務之間高度獨立(因此它們不需要互相通信,或者等待彼此)
(3)在等待事件到來時,某些任務會阻塞。
當應用程序需要在任務間共享可變的數據時,這也是一個不錯的選擇,因為這里不需要采用同步處理。
網絡應用程序通常都有上述這些特點,這使得它們能夠很好的契合事件驅動編程模型。
IO多路復用
用戶空間和內核空間
操作系統都是采用虛擬存儲器,對於32位操作系統,它的尋址空間(虛擬存儲空間)為4G。操作系統的核心是內核,獨立於普通的應用程序,可以訪問受保護內存空間,也有訪問底層硬件設備的所有權限,為了保證用戶進程不能直接操作內核,保證內核的安全,操作系統將虛擬空間分為兩部分:一部分為內核空間,一部分是用戶空間,針對linux系統而言,將最高的1G字節給內核使用,稱為內核空間,將3G字節的供各個進程使用,稱為用戶空間
文件描述符fd
文件描述符是一個用於表述指向文件的引用的抽象化概念
文件描述符在形式上是一個非負整數,實際上,它是一個索引值,指內核為每一個進程所維護的進程打開文件的記錄的記錄表,當程序打開一個現有文件或者創建一個新文件時,內核向進程返回一個文件描述符。
緩存IO
緩存IO,也被稱為標准IO,大多數文件系統默認IO操作都是緩存IO,在Linux的緩存IO機制中,操作系統會將IO的數據緩存在文件系統的頁緩存(page cache)中,也就是說,數據會先被拷貝到操作系統內核的緩沖區中,然后才會從操作系統內核的緩沖區拷貝到應用程序的地址空間
緩存IO的缺點:
數據在傳輸過程中需要在應用程序地址空間和內核進行多次數據拷貝操作,這些數據拷貝操作所帶來的CPU以及內存開銷是非常大的
IO模式
對於一次IO訪問(以read為例子),數據會先拷貝到操作系統內核的緩沖區中,然后會從操作系統內核的緩沖區拷貝到應用程序的地址空間,也就是說當一個read操作發生時,它會經歷兩個階段:
1. 等待數據准備
2. 經數據從內核拷貝到進程
正是因為這兩個階段,linux系統產生了五種網絡模式的方案
1. 阻塞I/O(blocking IO)
2. 非阻塞I/O(nonblocking IO)
3. I/O多路復用(IO multiplexing)
4. 信號驅動I/O(signal driven IO)
5. 異步I/O(asynchromous IO)
注意:信號驅動I/O(signal driven IO)在實際中不常用
阻塞I/O(blocking IO)
在linux中,默認情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:准備數據(對於網絡IO來說,很多時候數據在一開始還沒有到達。比如,還沒有收到一個完整的UDP包。這個時候kernel就要等待足夠的數據到來)。這個過程需要等待,也就是說數據被拷貝到操作系統內核的緩沖區中是需要一個過程的。而在用戶進程這邊,整個進程會被阻塞(當然,是進程自己選擇的阻塞)。當kernel一直等到數據准備好了,它就會將數據從kernel中拷貝到用戶內存,然后kernel返回結果,用戶進程才解除block的狀態,重新運行起來。
所以,blocking IO的特點就是在IO執行的兩個階段都被block了
非阻塞I/O
linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:
當用戶進程發出read操作時,如果kernel中的數據還沒有准備好,那么它並不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作后,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有准備好,於是它可以再次發送read操作。一旦kernel中的數據准備好了,並且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存,然后返回。
所以,nonblocking IO的特點是用戶進程需要不斷的主動詢問kernel數據好了沒有。
I/O多路復用(IO multiplexing)
IO multiplexing就是我們說的select,poll,epoll,有些地方也稱這種IO方式為event driven IO。select/epoll的好處就在於單個process就可以同時處理多個網絡連接的IO。它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。
當用戶進程調用了select,那么整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
所以,I/O 多路復用的特點是通過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就可以返回。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上,還更差一些。因為這里需要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。
所以,如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。)
在IO multiplexing Model中,實際中,對於每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
異步I/O(asynchronous IO)
Linux下的asynchronous IO其實用得很少。先看一下它的流程:
用戶進程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產生任何block。然后,kernel會等待數據准備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,kernel會給用戶進程發送一個signal,告訴它read操作完成了。
關於select poll epoll
select
sekect是通過一個select()系統調用來監視多個文件描述符,當select()返回后,該數組中就緒的文件描述符便會被該內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作
select的優點就是支持跨平台
缺點在於單個進程能夠監視的文件描述符的數量存在最大限制
另外select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其復制的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量TCP連接處於非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。
poll
和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制
poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增加而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)。
epoll
epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。
epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。
另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知
以select方法為例子進行理解
Python的select()方法直接調用操作系統的IO接口,它監控sockets,open files, and pipes(所有帶fileno()方法的文件句柄)何時變成readable 和writeable, 或者通信錯誤,select()使得同時監控多個連接變的簡單,並且這比寫一個長循環來等待和監控多客戶端連接要高效,因為select直接通過操作系統提供的C的網絡接口進行操作,而不是通過Python的解釋器。
接下來通過echo server例子要以了解select 是如何通過單進程實現同時處理多個非阻塞的socket連接的
代碼如下:
1 #AUTHOR:FAN 2 3 import select 4 import socket 5 import queue 6 server = socket.socket() 7 server.bind(('127.0.0.1',9999)) 8 server.listen() 9 10 server.setblocking(False)#不阻塞 11 msg_dict = {} 12 inputs=[server,] 13 outputs=[] 14 15 while True: 16 readable, writeable, exceptional = select.select(inputs, outputs, inputs) 17 print(readable, writeable, exceptional) 18 for r in readable: 19 if r is server: #代表來了一個新連接 20 conn,addr = server.accept() 21 print("來了一個新連接:",addr) 22 inputs.append(conn) #是因為這個新建立的連接還沒發數據過來,現在就接收的話程序就報錯了 23 #所以要想要實現這個客戶端發數據來時server端能知道,就需要讓select再監測這個conn 24 msg_dict[conn] = queue.Queue() #初始化一個隊列,后面需要返回給這個客戶端的數據 25 else: 26 data = r.recv(1024) 27 print("收到數據:",data) 28 msg_dict[r].put(data) 29 outputs.append(r) #放入返回的連接隊列里 30 31 for w in writeable: #要返回給客戶端的連接列表 32 data_to_client = msg_dict[w].get() 33 w.send(data_to_client) #返回給客戶端源數據 34 outputs.remove(w) #確保下次循環的時候writeable,不能返回這個已經處理完的連接了 35 for e in exceptional: 36 if e in outputs: 37 outputs.remove(e) 38 inputs.remove(e) 39 del msg_dict[e]
其實上述的代碼相對來說是比較麻煩,python已經封裝了selectors模塊,並且這個模塊中包含了select和epoll,會根據系統自動識別(windows只支持select,linux是二者都支持),默認用epoll
如果將上述代碼用selectors模塊的方式寫,代碼如下:
1 #AUTHOR:FAN 2 3 4 import selectors 5 import socket 6 7 sel = selectors.DefaultSelector() 8 def accept(server,mask): 9 conn,addr = server.accept() 10 print("一個新的連接",addr) 11 print(conn) 12 conn.setblocking(False) 13 sel.register(conn,selectors.EVENT_READ,read) #新連接注冊read回調函數 14 print("done") 15 16 def read(conn,mask): 17 print("ccc") 18 print("mask:",mask) 19 data = conn.recv(1024) 20 if data: 21 print(data) 22 conn.send(data) 23 else: 24 print("客戶端斷開連接") 25 sel.unregister(conn) 26 conn.close() 27 28 server = socket.socket() 29 server.bind(('127.0.0.1',9999)) 30 server.listen() 31 server.setblocking(False) 32 sel.register(server,selectors.EVENT_READ,accept) 33 34 while True: 35 print("cccccccsssssss") 36 events = sel.select() #默認阻塞,有活動連接,有活動連接就返回活動的連接列表 37 print(events) 38 for key,mask in events: 39 print("key:%s mask:%s"%(key,mask)) 40 callback = key.data #這里就是回調函數及上述的accept 41 print("key.data:",key.data) 42 print("key.fileobj:",key.fileobj) 43 callback(key.fileobj,mask) #key.fileobj
我們用客戶端模擬同時並發一萬去連接服務端
客戶端代碼如下:
1 #AUTHOR:FAN 2 3 4 import socket 5 import sys 6 7 messages = [ b'This is the message. ', 8 b'It will be sent ', 9 b'in parts.', 10 ] 11 server_address = ('192.168.8.102', 10000) 12 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(10000) 13 ] 14 print('connecting to %s port %s' % server_address) 15 for s in socks: 16 s.connect(server_address) 17 18 for message in messages: 19 for s in socks: 20 print('%s: sending "%s"' % (s.getsockname(), message) ) 21 s.send(message) 22 for s in socks: 23 data = s.recv(1024) 24 print( '%s: received "%s"' % (s.getsockname(), data) ) 25 if not data: 26 print(sys.stderr, 'closing socket', s.getsockname() )
將服務端放到linux服務端,在本機執行客戶端,從而實現了上萬的並發