python3之協程


1、協程的概念

協程,又稱微線程,纖程。英文名Coroutine。

線程是系統級別的它們由操作系統調度,而協程則是程序級別的由程序根據需要自己調度。在一個線程中會有很多函數,我們把這些函數稱為子程序,在子程序執行過程中可以中斷去執行別的子程序,而別的子程序也可以中斷回來繼續執行之前的子程序,這個過程就稱為協程。也就是說在同一線程內一段代碼在執行過程中會中斷然后跳轉執行別的代碼,接着在之前中斷的地方繼續開始執行,類似與yield操作。

協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

協程的優點:

  (1)無需線程上下文切換的開銷,協程避免了無意義的調度,由此可以提高性能(但也因此,程序員必須自己承擔調度的責任,同時,協程也失去了標准線程使用多CPU的能力)

  (2)無需原子操作鎖定及同步的開銷

  (3)方便切換控制流,簡化編程模型

  (4)高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。

協程的缺點:

  (1)無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。

  (2)進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序

2、python3實現線程

(1)yield實現協程效果

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/24 10:46
# @Author  : Py.qi
# @File    : yield_xiecheng.py
# @Software: PyCharm

def consumer(name):
    print('開始吃包子...')
    while True:
        print('\033[31;1m[consumer]%s需要包子\033[0m'%name)
        bone = yield   #接收send發送的數據
        print('\033[31;1m[%s]吃了%s個包子\033[0m'%(name,bone))
def producer(obj1):
    obj1.send(None)   #必須先發送None
    for i in range(3):
        print('\033[32;1m[producer]\033[0m正在做%s個包子'%i)
        obj1.send(i)


if __name__ == '__main__':
    con1 = consumer('消費者A')  #創建消費者對象
    producer(con1)

#output:
開始吃包子...
[consumer]消費者A需要包子
[producer]正在做0個包子
[消費者A]吃了0個包子
[consumer]消費者A需要包子
[producer]正在做1個包子
[消費者A]吃了1個包子
[consumer]消費者A需要包子
[producer]正在做2個包子
[消費者A]吃了2個包子
[consumer]消費者A需要包子

(2)greenlet模塊實現程序間切換執行

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/24 15:25
# @Author  : Py.qi
# @File    : greenlet_now.py
# @Software: PyCharm
import greenlet

def A():
    print('a.....')
    g2.switch()  #切換至B
    print('a....2')
    g2.switch()
def B():
    print('b.....')
    g1.switch()  #切換至A
    print('b....2')

g1 = greenlet.greenlet(A) #啟動一個線程
g2 = greenlet.greenlet(B)
g1.switch()

(3)gevent實現協程

Gevent 是一個第三方庫,可以輕松通過gevent實現協程程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。

gevent會主動識別程序內部的IO操作,當子程序遇到IO后,切換到別的子程序。如果所有的子程序都進入IO,則阻塞。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/24 15:59
# @Author  : Py.qi
# @File    : gevent_noe.py
# @Software: PyCharm
import gevent

def foo():
    print('running in foo')
    gevent.sleep(2)
    print('com back from bar in to foo')
def bar():
    print('running in bar')
    gevent.sleep(2)
    print('com back from foo in to bar')

gevent.joinall([      #創建線程並行執行程序,碰到IO就切換
    gevent.spawn(foo),
    gevent.spawn(bar),
])

線程函數同步與異步比較:

import gevent
def task(pid):
    gevent.sleep(1)
    print('task %s done'%pid)

def synchronous():  #同步一個線程執行函數
    for i in range(1,10):
        task(i)
def asynchronous(): #異步一個線程執行函數
    threads = [gevent.spawn(task,i) for i in range(10)]
    gevent.joinall(threads)

print('synchronous:')
synchronous()   #同步執行時要等待執行完后再執行
print('asynchronous:')
asynchronous()  #異步時遇到等待則會切換執行

爬蟲異步IO阻塞切換:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/24 17:00
# @Author  : Py.qi
# @File    : gevent_urllib.py
# @Software: PyCharm

from urllib import request
import gevent,time
from gevent import monkey

monkey.patch_all()   #將程序中所有IO操作做上標記使程序非阻塞狀態
def url_request(url):
    print('get:%s'%url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%s bytes received from %s'%(len(data),url))

async_time_start = time.time() #開始時間
gevent.joinall([
    gevent.spawn(url_request,'https://www.python.org/'),
    gevent.spawn(url_request,'https://www.nginx.org/'),
    gevent.spawn(url_request,'https://www.ibm.com'),
])
print('haoshi:',time.time()-async_time_start) #總用時

 協程實現多並發鏈接socket通信:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/24 17:22
# @Author  : Py.qi
# @File    : gevent_sock.py
# @Software: PyCharm
import socket,gevent
from gevent import monkey
monkey.patch_all()

def server_sock(port):
    s = socket.socket()
    s.bind(('',port))
    s.listen(10)
    while True:
        conn,addr = s.accept()
        gevent.spawn(handle_request,conn)
def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            if not data: conn.shutdown(socket.SHUT_WR)
            print('recv:',data.decode())
            conn.send(data)
    except Exception as ex:
        print(ex)
    finally:
        conn.close()

if __name__ == '__main__':
    server_sock(8888)




#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/24 17:35
# @Author  : Py.qi
# @File    : gevent_sockclient.py
# @Software: PyCharm

import socket

HOST = 'localhost'  # The remote host
PORT = 8888  # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    #msg = bytes(input(">>:"), encoding="utf8")
    for i in range(50):
        s.send('dddd'.encode())
        data = s.recv(1024)
    # print(data)

        print('Received', repr(data))
    s.close()

 3、事件驅動

事件驅動編程是一種編程范式,這里程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理,另外兩種常用的編程范式是(單線程)同步以及多線程編程。

服務器處理模型的程序時,有以下幾種模型:

(1)每收到一個請求,創建一個新的進程,來處理該請求;
(2)每收到一個請求,創建一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程通過非阻塞I/O方式來處理請求
第(1)中方法,由於創建新的進程的開銷比較大,所以,會導致服務器性能比較差,但實現比較簡單。
第(2)種方式,由於要涉及到線程的同步,有可能會面臨 死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都復雜。
綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數 網絡服務器采用的方式
讓我們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展示了隨着時間的推移,這三種模式下程序所做的工作。這個程序有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間已經用灰色框標示出來了。

在單線程同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之后它們才能依次執行。這種明確的執行順序和串行化處理的行為是很容易推斷得出的。如果任務之間並沒有互相依賴的關系,但仍然需要互相等待的話這就使得程序不必要的降低了運行速度。

在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操作系統來管理,在多處理器系統上可以並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其他線程得以繼續執行。與完成類似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,因為這類程序不得不通過線程同步機制如鎖、可重入函數、線程局部存儲或者其他機制來處理線程安全問題,如果實現不當就會導致出現微妙且令人痛不欲生的bug。

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其他昂貴的操作時,注冊一個回調到事件循環中,然后當I/O操作完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢所有的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序盡可能的得以執行而不需要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行為,因為程序員不需要關心線程安全問題。

當程序中有許多任務,且任務之間高度獨立(它們不需要互相通信,或等待彼此)而且在等待事件到來時,某些任務會阻塞時事件驅動模型時個很好的選擇;當應用程序需要在任務間共享可變的數據時,事件驅動模式可以更好的在單線程下處理。

網絡應用程序通常都是上述特點,這使得它們能夠很好的契合事件驅動編程模型。

此處要提出一個問題,就是,上面的事件驅動模型中,只要一遇到IO就注冊一個事件,然后主程序就可以繼續干其它的事情了,只到io處理完畢后,繼續恢復之前中斷的任務,這本質上是怎么實現的呢?這就涉及到select\poll\epoll異步IO

4、IO多路復用

同步IO和異步IO,阻塞IO和非阻塞IO分別是什么,到底有什么區別?

不同的人在不同的上下文下給出的答案是不同的。所以先限定一下本文的上下文。

本文討論的背景是Linux環境下的network IO。

在進行解釋之前,首先要說明幾個概念:

  進程切換

  進程的阻塞

  文件描述符

  緩存 I/O

 進程切換

  為了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復以前掛起的某個進程的執行。這種行為被稱為進程切換。

  因此可以說,任何進程都是在操作系統內核的支持下運行的,是與內核緊密相關的。

 從一個進程的運行轉到另一個進程上運行,這個過程中經過下面這些變化:

  1. 保存處理器上下文,包括程序計數器和其他寄存器

  2. 更新PCB信息

  3. 把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列

  4. 選擇另一個進程執行,並更新其PCB

  5. 更新內存管理的數據結構

  6. 恢復處理器上下文

進程控制塊PCB(Processing Control Block),是操作系統核心中一種數據結構,主要表示進程狀態。

 PCB的作用是使一個在多道程序環境下不能獨立運行的程序(含數據),成為一個能獨立運行的基本單位或與其它進程並發執行的進程。或者說,OS是根據PCB來對並發執行的進程進行控制和管理的。  PCB通常是系統內存占用區中的一個連續存區,它存放着操作系統用於描述進程情況及控制進程運行所需的全部信息 

進程的阻塞

  正在執行的進程,由於期待的某些事件未發生,如請求系統資源失敗、等待某種操作的完成、新數據尚未到達或無新工作做等,則由系統自動執行阻塞原語(Block),使自己由運行狀態變為阻塞狀態。可見,進程的阻塞是進程自身的一種主動行為,也因此只有處於運行態的進程(獲得CPU),才可能將其轉為阻塞狀態。當進程進入阻塞狀態,是不占用CPU資源的。

文件描述符fd

  文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。

  文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核為每一個進程所維護的該進程打開文件的記錄表。

  當程序打開一個現有文件或者創建一個新文件時,內核向進程返回一個文件描述符。

  在程序設計中,一些涉及底層的程序編寫往往會圍繞着文件描述符展開。但是文件描述符這一概念往往只適用於UNIX、Linux這樣的操作系統。

緩存 I/O

  緩存 I/O 又被稱作標准 I/O,大多數文件系統的默認 I/O 操作都是緩存 I/O。

  在 Linux 的緩存 I/O 機制中,操作系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中。

 數據會先被拷貝到操作系統內核的緩沖區中,然后才會從操作系統內核的緩沖區拷貝到應用程序的地址空間。

緩存 I/O 的缺點:

  數據在傳輸過程中需要在應用程序地址空間和內核進行多次數據拷貝操作,這些數據拷貝操作所帶來的 CPU 以及內存開銷是非常大的。

 對於一次IO訪問(以read舉例),數據會先被拷貝到操作系統內核的緩沖區中,然后才會從操作系統內核的緩沖區拷貝到應用程序的地址空間。

  一個IO(如read)操作會經歷以下兩個階段:

  1. 等待數據准備 (Waiting for the data to be ready)

  2. 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)

  因為有了這兩個階段,linux系統產生了下面五種網絡模式的方案。

  1.阻塞 I/O(blocking IO)

  2.非阻塞 I/O(nonblocking IO)

  3.I/O 多路復用( IO multiplexing)

  4.信號驅動 I/O( signal driven IO)

  5.異步 I/O(asynchronous IO)

阻塞 I/O(blocking IO)

  在linux中,默認情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:

當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:准備數據(對於網絡IO來說,很多時候數據在一開始還沒有到達。比如,還沒有收到一個完整的UDP包。這個時候kernel就要等待足夠的數據到來)。這個過程需要等待,也就是說數據被拷貝到操作系統內核的緩沖區中是需要一個過程的。而在用戶進程這邊,整個進程會被阻塞(當然,是進程自己選擇的阻塞)。當kernel一直等到數據准備好了,它就會將數據從kernel中拷貝到用戶內存,然后kernel返回結果,用戶進程才解除block的狀態,重新運行起來。

所以,blocking IO的特點就是在IO執行的兩個階段都被block了。

非阻塞 I/O(nonblocking IO)

  linux下,可通過設置socket使其變為非阻塞IO。當對一個non-blocking socket執行讀操作時,流程是這個樣子:

當用戶進程發出read操作時,如果kernel中的數據還沒有准備好,那么它並不會block用戶進程,而是立刻返回一個error。

  從用戶進程角度講 ,它發起一個read操作后,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有准備好,於是它可以再次發送read操作。一旦kernel中的數據准備好了,並且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存,然后返回。

所以,nonblocking IO的特點是用戶進程需要不斷的主動詢問kernel數據好了沒有。

 I/O 多路復用( IO multiplexing)

  IO multiplexing就是我們說的select,poll,epoll,有些地方也稱這種IO方式為event driven IO。

  select/epoll的好處就在於單個process就可以同時處理多個網絡連接的IO。

  它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的所有socket

  當某個socket有數據到達了,就通知用戶進程。

 

當用戶進程調用了select,那么整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。

  所以,I/O 多路復用的特點是通過一種機制使一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就可以返回。

  IO多路復用和阻塞IO其實並沒有太大的不同,事實上,還更差一些。因為這里需要使用兩個system call (select 和 recvfrom),而阻塞IO只調用了一個system call (recvfrom)。但是,用select的優勢在於它可以同時處理多個連接。

  如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用多線程+阻塞IO的web server性能更好,可能延遲還更大。

  select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。

  在IO multiplexing Model中,實際中,對於每一個socket,一般都設置成為non-blocking

  但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

異步 I/O(asynchronous IO)

用戶進程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產生任何block。

  然后,kernel會等待數據准備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,kernel會給用戶進程發送一個signal,告訴它read操作完成了。

 blocking和non-blocking的區別

    調用blocking IO會一直block住對應的進程直到操作完成

    調用non-blocking IO在kernel還准備數據的情況下會立刻返回

  synchronous IO和asynchronous IO的區別

synchronous I/O操作會導致請求進程被阻塞,直到I/O操作完成;

asynchronous I/O操作不會導致請求進程被阻塞;

之前所說的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。

 有人會說,non-blocking IO並沒有被block啊。 這里需要格外注意,定義中所指的”IO operation”是指真實的IO操作,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,如果kernel的數據沒有准備好,這時候不會block進程。但是,當kernel中數據准備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。

而asynchronous IO則不一樣,當進程發起IO 操作之后,就直接返回再也不理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程中,進程完全沒有被block。

各個IO Model的比較如圖所示:

通過上面的圖片,可以發現non-blocking IO和asynchronous IO的區別還是很明顯的。

  在non-blocking IO中,雖然進程大部分時間都不會被block,但是它仍然要求進程去主動的check,並且當數據准備完成以后,也需要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。

  而asynchronous IO則完全不同。它就像是用戶進程將整個IO操作交給了他人(kernel)完成,然后他人做完后發信號通知。在此期間,用戶進程不需要去檢查IO操作的狀態,也不需要主動的去拷貝數據。

5、select、poll、epoll詳解

select,poll,epoll都是IO多路復用的機制。I/O多路復用就是通過一種機制使一個進程可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。

  select,poll,epoll本質上都是同步I/O,因為他們都需要在讀寫事件就緒后自己負責進行讀寫,也就是說這個讀寫過程是阻塞的

  異步I/O則無需自己負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。

  sellect、poll、epoll三者的區別 :

  select:

  目前支持幾乎所有的平台

  默認單個進程能夠監視的文件描述符的數量存在最大限制,在linux上默認只支持1024個socket

    可以通過修改宏定義或重新編譯內核(修改系統最大支持的端口數)的方式提升這一限制

  內核准備好數據后通知用戶有數據了,但不告訴用戶是哪個連接有數據,用戶只能通過輪詢的方式來獲取數據

    假定select讓內核監視100個socket連接,當有1個連接有數據后,內核就通知用戶100個連接中有數據了

    但是不告訴用戶是哪個連接有數據了,此時用戶只能通過輪詢的方式一個個去檢查然后獲取數據

    這里是假定有100個socket連接,那么如果有上萬個,上十萬個呢?

    那你就得輪詢上萬次,上十萬次,而你所取的結果僅僅就那么1個。這樣就會浪費很多沒用的開銷

  只支持水平觸發;每次調用select,都需要把fd集合從用戶態拷貝到內核態,這個開銷在fd很多時會很大

  同時每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也會很大

poll:

  與select沒有本質上的差別,僅僅是沒有了最大文件描述符數量的限制

  只支持水平觸發

  只是一個過渡版本,很少用

  epoll:

  Linux2.6才出現的epoll,具備了select和poll的一切優點,公認為性能最好的多路IO就緒通知方法

  沒有最大文件描述符數量的限制

  同時支持水平觸發和邊緣觸發

  不支持windows平台

  內核准備好數據以后會通知用戶哪個連接有數據了

  IO效率不隨fd數目增加而線性下降

  使用mmap加速內核與用戶空間的消息傳遞

水平觸發與邊緣觸發:

  水平觸發:將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用epoll時將再次報告這些文件描述符,這種方式稱為水平觸發

  邊緣觸發:只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發

  理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。

 select和epoll的特點

  select:

  select通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。

  由於網絡響應時間的延遲使得大量TCP連接處於非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。

  epoll:

  epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。

  另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。

select

select(rlist,wlist,xlist,timeout=None)

 select函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。

 調用后select函數會阻塞,直到有描述符就緒(有數據可讀、可寫、或者有except),或者超時(timeout指定等待時間,如果立即返回設為null即可),函數返回。當select函數返回后,可以通過遍歷fdset,來找到就緒的描述符。

poll

int poll(struct pollfd *fds,unsigned int nfds,int timeout);

不同於select使用三個位圖來表示三個fdset的方式,poll使用一個pollfd的指針實現。

struct pollfd {
    int fd; /*file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

pollfd結構包含了要監視的event和發生的event,不再使用select“參數-值”傳遞的方式。同時,pollfd並沒有最大數量限制(但是數量過大后性能也是會下降)。  和select函數一樣,poll返回后,需要輪詢pollfd來獲取就緒的描述符。

 從上面看,select和poll都需要在返回后,通過遍歷文件描述符來獲取已經就緒的socket。

 事實上,同時連接的大量客戶端在一時刻可能只有很少的處於就緒狀態,因此隨着監視的描述符數量的增長,其效率也會線性下降。

epool

 epoll是在2.6內核中提出的,是之前的select和poll的增強版本。相對於select和poll來說,epoll更加靈活,沒有描述符限制。

  epoll使用一個文件描述符管理多個描述符,將用戶關系的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。

epoll操作過程需要三個接口,分別如下:

int epoll_create(int size);//創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

(1)int epool_create(int size);

創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不同於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並不是限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。

  當創建好epoll句柄后,它就會占用一個fd值,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll后,必須調用close()關閉,否則可能導致fd被耗盡。

(2)int epool_ctl(int epfd,int op,int fd,struct epoll_event *event);

  函數是對指定描述符fd執行op操作。

  epfd:是epoll_create()的返回值。

  op:表示op操作,用三個宏來表示:

    添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。

    分別添加、刪除和修改對fd的監聽事件。

  fd:是需要監聽的fd(文件描述符)

  epoll_event:是告訴內核需要監聽什么事

(3)int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待epfd上的io事件,最多返回maxevents個事件。

  參數events用來從內核得到事件的集合,maxevents告之內核這個events有多大,這個maxevents的值不能大於創建epoll_create()時的size,參數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。該函數返回需要處理的事件數目,如返回0表示已超時。

一個簡單的select多並發socket服務端代碼:

#!/usr/bin/python
#Author:sean
 
import select
import socket
import queue
 
server = socket.socket()
HOST = 'localhost'
PORT = 8080
print("start up %s on port: %s",% (HOST,PORT))
server.bind((HOST,PORT))
server.listen()
 
server.setblocking(False)   #不阻塞
 
msg_dic_queue = {}    #這是一個隊列字典,存放要返回給客戶端的數據
 
inputs = [server]   #inputs里存放要讓內核監測的連接,這里的server是指監測server本身的連接狀態
#inputs = [server,conn]
outputs = []    #outputs里存放要返回給客戶端的數據連接對象
 
while True:
    print("waiting for next connect...")
    readable,writeable,exceptional = select.select(inputs,outputs,inputs)   #如果沒有任何fd就緒,程序就會一直阻塞在這里
    # print(readable,writeable,exceptional)
    for r in readable:  #處理活躍的連接,每個r就是一個socket連接對象
        if r is server: #代表來了一個新連接
            conn,client_addr = server.accept()
            print("arrived a new connect: ",client_addr)
            conn.setblocking(False)
            inputs.append(conn) #因為這個新建立的連接還沒發數據來,現在就接收的話,程序就報異常了
            #所以要想實現這個客戶端發數據來時server端能知道,就需要讓select再監測這個conn
            msg_dic_queue[conn] = queue.Queue()   #初始化一個隊列,后面存要返回給客戶端的數據
        else:   #r不是server的話就代表是一個與客戶端建立的文件描述符了
            #客戶端的數據過來了,在這里接收
            data = r.recv(1024)
            if data:
                print("received data from [%s]: "% r.getpeername()[0],data)
                msg_dic_queue[r].put(data)  #收到的數據先放到隊列字典里,之后再返回給客戶端
                if r not in outputs:
                    outputs.append(r)   #放入返回的連接隊列里。為了不影響處理與其它客戶端的連接,這里不立刻返回數據給客戶端
            else:   #如果收不到data就代表客戶端已經斷開了
                print("Client is disconnect",r)
                if r in outputs:
                    outputs.remove(r)   #清理已斷開的連接
                inputs.remove(r)
                del msg_dic_queue[r]
 
    for w in writeable: #處理要返回給客戶端的連接列表
        try:
            next_msg = msg_dic_queue[w].get_nowait()
        except queue.Empty:
            print("client [%s]"% w.getpeername()[0],"queue is empty...")
            outputs.remove(w)   #確保下次循環時writeable不返回已經處理完的連接
        else:
            print("sending message to [%s]"% w.getpeername()[0],next_msg)
            w.send(next_msg)    #返回給客戶端源數據
 
    for e in exceptional:   #處理異常連接
        if e in outputs:
            outputs.remove(e)
        inputs.remove(e)
        del msg_dic_queue[e]

select多並發socket客戶端代碼:

#!/usr/bin/python
#Author:sean
 
import socket
 
msgs = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
SERVER_ADDRESS = 'localhost'
SERVER_PORT = 8080
 
# Create a few TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(500) ]
 
# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % (SERVER_ADDRESS,SERVER_PORT))
for s in socks:
    s.connect((SERVER_ADDRESS,SERVER_PORT))
 
for message in msgs:
 
    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )

epoll多並發socket服務端代碼如下:

#!/usr/bin/python
#Author:sean
 
import socket, logging
import select, errno
 
logger = logging.getLogger("network-server")
 
def InitLog():
    logger.setLevel(logging.DEBUG)
 
    fh = logging.FileHandler("network-server.log")
    fh.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)
 
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)
 
    logger.addHandler(fh)
    logger.addHandler(ch)
 
 
if __name__ == "__main__":
    InitLog()
 
    try:
        # 創建 TCP socket 作為監聽 socket
        listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    except socket.error as  msg:
        logger.error("create socket failed")
 
    try:
        # 設置 SO_REUSEADDR 選項
        listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    except socket.error as  msg:
        logger.error("setsocketopt SO_REUSEADDR failed")
 
    try:
        # 進行 bind -- 此處未指定 ip 地址,即 bind 了全部網卡 ip 上
        listen_fd.bind(('', 8008))
    except socket.error as  msg:
        logger.error("bind failed")
 
    try:
        # 設置 listen 的 backlog 數
        listen_fd.listen(10)
    except socket.error as  msg:
        logger.error(msg)
 
    try:
        # 創建 epoll 句柄
        epoll_fd = select.epoll()
        # 向 epoll 句柄中注冊 監聽 socket 的 可讀 事件
        epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
    except select.error as  msg:
        logger.error(msg)
 
    connections = {}
    addresses = {}
    datalist = {}
    while True:
        # epoll 進行 fd 掃描的地方 -- 未指定超時時間則為阻塞等待
        epoll_list = epoll_fd.poll()
 
        for fd, events in epoll_list:
            # 若為監聽 fd 被激活
            if fd == listen_fd.fileno():
                # 進行 accept -- 獲得連接上來 client 的 ip 和 port,以及 socket 句柄
                conn, addr = listen_fd.accept()
                logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno()))
                # 將連接 socket 設置為 非阻塞
                conn.setblocking(0)
                # 向 epoll 句柄中注冊 連接 socket 的 可讀 事件
                epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)
                # 將 conn 和 addr 信息分別保存起來
                connections[conn.fileno()] = conn
                addresses[conn.fileno()] = addr
            elif select.EPOLLIN & events:
                # 有 可讀 事件激活
                datas = ''
                while True:
                    try:
                        # 從激活 fd 上 recv 10 字節數據
                        data = connections[fd].recv(10)
                        # 若當前沒有接收到數據,並且之前的累計數據也沒有
                        if not data and not datas:
                            # 從 epoll 句柄中移除該 連接 fd
                            epoll_fd.unregister(fd)
                            # server 側主動關閉該 連接 fd
                            connections[fd].close()
                            logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
                            break
                        else:
                            # 將接收到的數據拼接保存在 datas 中
                            datas += data
                    except socket.error as  msg:
                        # 在 非阻塞 socket 上進行 recv 需要處理 讀穿 的情況
                        # 這里實際上是利用 讀穿 出 異常 的方式跳到這里進行后續處理
                        if msg.errno == errno.EAGAIN:
                            logger.debug("%s receive %s" % (fd, datas))
                            # 將已接收數據保存起來
                            datalist[fd] = datas
                            # 更新 epoll 句柄中連接d 注冊事件為 可寫
                            epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
                            break
                        else:
                            # 出錯處理
                            epoll_fd.unregister(fd)
                            connections[fd].close()
                            logger.error(msg)
                            break
            elif select.EPOLLHUP & events:
                # 有 HUP 事件激活
                epoll_fd.unregister(fd)
                connections[fd].close()
                logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
            elif select.EPOLLOUT & events:
                # 有 可寫 事件激活
                sendLen = 0
                # 通過 while 循環確保將 buf 中的數據全部發送出去
                while True:
                    # 將之前收到的數據發回 client -- 通過 sendLen 來控制發送位置
                    sendLen += connections[fd].send(datalist[fd][sendLen:])
                    # 在全部發送完畢后退出 while 循環
                    if sendLen == len(datalist[fd]):
                        break
                # 更新 epoll 句柄中連接 fd 注冊事件為 可讀
                epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
            else:
                # 其他 epoll 事件不進行處理
                continue

5、python之selectors模塊

 selectors模塊是在python3.4版本中引進的,它封裝了IO多路復用中的select和epoll,能夠更快,更方便的實現多並發效果。

  官方文檔見:https://docs.python.org/3/library/selectors.html

  以下是一個selectors模塊的代碼示范:

#!/usr/bin/python
#Author:sean
 
import selectors
import socket
#selectors模塊默認會用epoll,如果你的系統中沒有epoll(比如windows)則會自動使用select
sel = selectors.DefaultSelector()   #生成一個select對象
 
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False) #設定非阻塞
    sel.register(conn, selectors.EVENT_READ, read)  #新連接注冊read回調函數
 
def read(conn, mask):
    data = conn.recv(1024)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
 
sock = socket.socket()
sock.bind(('localhost', 8080))
sock.listen()
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)    #把剛生成的sock連接對象注冊到select連接列表中,並交給accept函數處理
 
while True:
    events = sel.select()   #默認是阻塞,有活動連接就返回活動的連接列表
    #這里看起來是select,其實有可能會使用epoll,如果你的系統支持epoll,那么默認就是epoll
    for key, mask in events:
        callback = key.data     #去調accept函數
        callback(key.fileobj, mask) #key.fileobj就是readable中的一個socket連接對象

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM