sokect編程進階


IO模型

什么是IO?

IO:input和output的縮寫,即輸入/輸出端口。每個設備都會有一個專用的I/O地址,用來處理自己的輸入輸出信息

同步、異步、阻塞、非阻塞

  • 同步和異步的概念描述的是用戶線程與內核的交互方式:同步是指用戶線程發起IO請求后需要等待或者輪詢內核IO操作完成后才能繼續執行;而異步是指用戶線程發起IO請求后仍繼續執行,當內核IO操作完成后會通知用戶線程,或者調用用戶線程注冊的回調函數。
  • 阻塞和非阻塞的概念描述的是用戶線程調用內核IO操作的方式:阻塞是指IO操作需要徹底完成后才返回到用戶空間;而非阻塞是指IO操作被調用后立即返回給用戶一個狀態值,無需等到IO操作徹底完成

IO發生時涉及的對象和步驟

對於一個network IO (這里我們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另一個就是系統內核(kernel)。當一個read操作發生時,它會經歷兩個階段:
1 等待數據准備 (Waiting for the data to be ready)
2 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)
記住這兩點很重要,因為這些IO Model的區別就是在兩個階段上各有不同的情況

常見的四種模型

  • 同步阻塞IO(Blocking IO):即傳統的IO模型


    當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:准備數據。對於network io來說,很多時候數據在一開始還沒有到達(比如沒有收到一個完整的TCP/UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據准備好了,它就會將數據從kernel中拷貝到用戶內存,然后kernel返回結果,用戶進程才解除 block的狀態,重新運行起來。

所以阻塞:blocking IO的特點是I/O執行時的兩個操作(等待數據准備 (Waiting for the data to be ready)、將數據從內核拷貝到進程中(Copying the data from the kernel to the process))都是阻塞的。
python socket中:accept() recv() 是阻塞的
所以,所謂阻塞型接口是指系統調用(一般是IO接口)如果不返回結果就一直阻塞,就是socket經常說的,有發就有收,收發必相等.

  • 同步非阻塞IO(Non-blocking IO):默認創建的socket都是阻塞的,非阻塞IO要求socket被設置為NONBLOCK。注意這里所說的NIO並非Java的NIO(New IO)庫。 linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:

從圖中可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有准備好,那么它並不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作后,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有准備好,於是它可以再次發送read操作。一旦kernel中的數據准備好了,並且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存,然后返回。
所以,用戶進程其實是需要不斷的主動詢問kernel數據好了沒有
* IO多路復用(IO Multiplexing):即經典的Reactor設計模式,有時也稱為異步阻塞IO,Java中的Selector和Linux中的epoll都是這種模型。

用戶首先將需要進行IO操作的socket添加到select中,然后阻塞等待select系統調用返回。當數據到達時,socket被激活,select函數返回。用戶線程正式發起read請求,讀取數據並繼續執行。
從流程上來看,使用select函數進行IO請求和同步阻塞模型沒有太大的區別,甚至還多了添加監視socket,以及調用select函數的額外操作,效率更差。但是,使用select以后最大的優勢是用戶可以在一個線程內同時處理多個socket的IO請求。用戶可以注冊多個socket,然后不斷地調用select讀取被激活的socket,即可達到在同一個線程內同時處理多個IO請求的目的。而在同步阻塞模型中,必須通過多線程的方式才能達到這個目的
* 異步IO(Asynchronous IO):即經典的Proactor設計模式,也稱為異步非阻塞IO

用戶進程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產生任何block。然后,kernel會等待數據准備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,kernel會給用戶進程發送一個signal,告訴它read操作完成了。

IO多路復用

概述

IO多路復用是通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。
IO多路復用適用如下場合:

  (1)當客戶處理多個描述字時(一般是交互式輸入和網絡套接口),必須使用I/O復用。

  (2)當一個客戶同時處理多個套接口時,而這種情況是可能的,但很少出現。

  (3)如果一個TCP服務器既要處理監聽套接口,又要處理已連接套接口,一般也要用到I/O復用。

  (4)如果一個服務器即要處理TCP,又要處理UDP,一般要使用I/O復用。

  (5)如果一個服務器要處理多個服務或多個協議,一般要使用I/O復用。
  與多進程和多線程技術相比,I/O多路復用技術的最大優勢是系統開銷小,系統不必創建進程/線程,也不必維護這些進程/線程,從而大大減小了系統的開銷。

IO多路復用的方法

  • select
  • poll
  • epoll

    python實現IO多路復用

  • 概述
    sokect基礎中,默認服務端只能同時處理一個客戶端的請求,當一個客戶端連接到服務器之后,其他客戶端只能處於等待狀態,而IO多路復用可以實現服務端同時處理多個客戶端請求,
    Python中有一個select模塊,其中提供了:select、poll、epoll三個方法,分別調用系統的 select,poll,epoll 從而實現IO多路復用
    根據系統不同:支持的方法也不同

    Windows Python:
        提供: select
    Mac Python:
        提供: select
    Linux Python:
        提供: select、poll、epoll

     

    注意:網絡操作、文件操作、終端操作等均屬於IO操作,對於windows只支持Socket操作,其他系統支持其他IO操作,但是無法檢測 普通文件操作 自動上次讀取是否已經變化。
    普通文件操作所有系統都是完成不了的,普通文件是屬於I/O操作!但是對於python來說文件變更python是監控不了的,所以我們能用的只有是“終端的輸入輸出,Socket的輸入輸出”

  • 實現原理
    先看一段同步阻塞型代碼:

 import socket

    ip_port = ('127.0.0.1',8080)
    s = socket.socket()
    s.bind(ip_port)
    s.listen(5)
    conn,addr = s.accept()
    res = conn.recv(1024)
    conn.send(res)
    conn.close()

 

下面需要理清楚幾個概念:

  • IO多路復用主要作用是監聽socket對象內部是否變化
  • socket對象什么時候變化? 連接或和客戶端交互收發消息的時候socket對象會發生變化
  • socket對象變化的實質 服務端創建的socket對象sk發生變化--->表示有新連接過來 服務端和客戶端建立的conn連接對象變化了--->表示客戶端發送消息過來了。要收發數據
  • 所以,IO多路復用是不斷監聽服務端sk對象和conn對象是否發生變化,變化之后,調用對應的線程或進程進行相應的操作。這里用到了select模塊中select()方法。該方法支持四個參數: select(rlist, wlist, xlist, timeout=None)
    • 第一個參數是select監聽的對象列表,在此列表中的對象一旦發生變化,select會返回變化的對象。如rlist為[sk1,sk2,sk3],如果sk1和sk3發生變化。則返回[sk1對象,sk3對象]
    • 第二個參數是select永遠認為變化的對象列表,即一旦對象出現在此列表中,每次循環檢查select會認為此對象一直發生變化,所以每次都會返回該對象,例如wlist為[sk1],那么,每次循環select都會返回[sk1對象]
    • 第三個參數是select檢測對象是否發生錯誤的列表,在該列表中的對象如果發生錯誤,則返回該對象
    • 第四個參數是超時時間

下面通過python代碼逐步實現IO多路復用

  • 1.實現動態檢測對象是否發生變化

    import socket
    import select
    
    sk = socket.socket()
    ip_port = ("127.0.0.1",8080)
    sk.bind(ip_port)
    sk.listen(5)
    
    inputs = [sk,]     #定義監聽的對象列表。默認只監聽服務端socket對象
    
    while True:    #循環檢測對象
        #監聽sk(服務端)對象,如果sk發生變化,表示有新連接來了,此時rlist的值為sk
        rlist,wlist,e = select.select(inputs,[],[],1)
        print("inputs:",len(inputs),"rlist:",len(rlist),inputs)  #打印inputs 便於查看效果
        for i in rlist:       #循環變化的列表,然后對每個對象進行操作
            if i == sk:     #如果是服務端socket對象發生變化,表示新連接建立了
                conn,ip = sk.accept()     #建立連接
                inputs.append(conn)      #將新建立的連接加入監聽的列表中
            else:         #否則是客戶端給服務端發消息了。所以需要收消息
                recv_data = i.recv(1024)
                i.send(recv_data)

     

運行服務端程序,在沒有新連接之前,輸出是這樣

inputs: 1 rlist: 0 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>]

 

一個客戶端和服務器建立連接:
inputs: 1 rlist: 0 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>]
inputs: 1 rlist: 1 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>]
inputs: 2 rlist: 0 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, <socket.socket fd=320, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52810)>]

發現,當一個客戶端和服務端建立連接之后,select檢測到sk發生變化,所以rlist等於1,然后將新建立的連接對象加入到監控的inputs列表中,所以第二行inputs長度變為2,而此次循環沒有檢測到變化的sk,所以rlist為0,最后打印inputs列表得知,列表里存儲的是對象

第二個客戶端和服務器建立連接:

inputs: 2 rlist: 0 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, <socket.socket fd=320, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52810)>]
inputs: 2 rlist: 1 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, <socket.socket fd=320, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52810)>]
inputs: 3 rlist: 0 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, <socket.socket fd=320, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52810)>, <socket.socket fd=248, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52873)>]

 


發現情況和第一個客戶端一樣

客戶端發多條信息:

客戶端:
>> adasdasasd
adasdasasd
>> sdfsd
sdsdfsd

服務端:
inputs: 3 rlist: 0 [<socket.socket fd=284, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, <socket.socket fd=292, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52955)>, <socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52957)>]
inputs: 3 rlist: 1 [<socket.socket fd=284, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, <socket.socket fd=292, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52955)>, <socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52957)>]
inputs: 3 rlist: 0 [<socket.socket fd=284, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, <socket.socket fd=292, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52955)>, <socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52957)>]
inputs: 3 rlist: 1 [<socket.socket fd=284, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, <socket.socket fd=292, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52955)>, <socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52957)>]
inputs: 3 rlist: 0 [<socket.socket fd=284, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, <socket.socket fd=292, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52955)>, <socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 52957)>]

 

  • 2.實現讀寫分離

利用第二個參數,如果客戶端發消息,則將客戶端放在outputs中,select每次循環都會將對象返回到wlist,最后循環wlist,處理消息,發送至客戶端,再在outpu中移除該對象

import socket
import select

sk = socket.socket()
ip_port = ("127.0.0.1",8080)
sk.bind(ip_port)
sk.listen(5)


inputs = [sk,]     #定義監聽的對象列表。默認只監聽服務端socket對象
outputs = []      #定義消息輸出列表
message = {}      #定義一個字典,用於記錄客戶端對象和收到的消息  key為客戶端對象,value為消息


while True:    #循環檢測對象
    #監聽sk(服務端)對象,如果sk發生變化,表示有新連接來了,此時rlist的值為sk
    rlist,wlist,e = select.select(inputs,outputs,[],1)
    print("inputs:",len(inputs),"rlist:",len(rlist),"outputs:",len(outputs),"wlist:",len(wlist))  #打印inputs 便於查看效果
    #讀數據
    for i in rlist:       #循環變化的列表,然后對每個對象進行操作
        if i == sk:     #如果是服務端socket對象發生變化,表示新連接建立了
            conn,ip = sk.accept()     #建立連接
            inputs.append(conn)      #將新建立的連接加入監聽的列表中
            message[conn] = []    #創建關於新連接的key
        else:         #否則是客戶端給服務端發消息了。所以需要收消息
            try:
                recv_data = i.recv(1024)    #接受數據
                if not recv_data:       #判斷數據是否為空
                    raise Exception("disconnect")       #為空的話,主動拋出異常
                else:
                    outputs.append(i)           #將發消息的對象存入outputs
                    message[i].append(recv_data)        #將發消息的對象和消息存入message
            except:
                inputs.remove(i)            #捕捉到異常,客戶端斷開連接,則在監控列表中刪除此對象
                del message[i]  #在消息字典中刪除此對象key/value
    #發數據
    for i in wlist:     #循環wlist列表。如果wlist列表中有值的話,說明收到客戶端消息
        msg = message[i].pop()      #取出該客戶端對象發的消息,並在列表中刪除
        resp = 'respond'.encode() + msg         #處理客戶端的消息
        i.send(resp)        #發送給客戶端
        outputs.remove(i)       #outputs列表中刪除該對象,如果不刪除,下次循環還會誤認為其發消息了

sk.close()

 

客戶端建立連接,並發送消息:

inputs: 1 rlist: 0 outputs: 0 wlist: 0
inputs: 1 rlist: 0 outputs: 0 wlist: 0
inputs: 1 rlist: 0 outputs: 0 wlist: 0
inputs: 1 rlist: 1 outputs: 0 wlist: 0        #第一個客戶端建立連接
inputs: 2 rlist: 0 outputs: 0 wlist: 0
inputs: 2 rlist: 1 outputs: 0 wlist: 0          #第二個客戶端建立連接
inputs: 3 rlist: 0 outputs: 0 wlist: 0
inputs: 3 rlist: 0 outputs: 0 wlist: 0
inputs: 3 rlist: 0 outputs: 0 wlist: 0
inputs: 3 rlist: 0 outputs: 0 wlist: 0
inputs: 3 rlist: 0 outputs: 0 wlist: 0
inputs: 3 rlist: 1 outputs: 0 wlist: 0        #第一個客戶端發送一條消息
inputs: 3 rlist: 0 outputs: 1 wlist: 1        #消息記錄至outpu
inputs: 3 rlist: 0 outputs: 0 wlist: 0        #返回客戶端消息,outputs清空
inputs: 3 rlist: 0 outputs: 0 wlist: 0
inputs: 3 rlist: 1 outputs: 0 wlist: 0
inputs: 3 rlist: 0 outputs: 1 wlist: 1
inputs: 3 rlist: 0 outputs: 0 wlist: 0
inputs: 3 rlist: 1 outputs: 0 wlist: 0
inputs: 3 rlist: 0 outputs: 1 wlist: 1
inputs: 3 rlist: 1 outputs: 0 wlist: 0
inputs: 3 rlist: 0 outputs: 1 wlist: 1
inputs: 3 rlist: 1 outputs: 0 wlist: 0
inputs: 3 rlist: 0 outputs: 1 wlist: 1
inputs: 3 rlist: 0 outputs: 0 wlist: 0
inputs: 3 rlist: 0 outputs: 0 wlist: 0

 

最后,這就根據select方法實現了IO多路復用,即一個簡單的服務端和客戶端偽並發的通信

socket源碼剖析

在socket基礎中,我們最后說的使用socketserver實現多線程並發處理客戶端請求,下面進行深入剖析實現原理

import socketserver
import subprocess

class MyServer(socketserver.BaseRequestHandler):  #繼承
    def handle(self):   #handle方法。注意此時send和recv時調用的self.request方法
        self.request.sendall(bytes('Welcome',encoding='utf-8'))
        while True:
            try:
                recv_data = self.request.recv(1024)
                if not recv_data: break
                p = subprocess.Popen(str(recv_data, encoding='utf-8'), shell=True, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
                res = p.stdout.read()
                if not res:
                    send_data = p.stderr.read()
                else:
                    send_data = res
                if not send_data:
                    send_data = 'no output'.encode()

                data_size = len(send_data)
                self.request.send(bytes(str(data_size), encoding='utf-8'))
                self.request.recv(1024)
                self.request.send(send_data)
            except Exception:
                break



if __name__ == '__main__':

    server = socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyServer)    #啟動server
    server.serve_forever()
實現socketserver運行的代碼

 

  • 下面開始從外到內進行剖析

    • 1.server = socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyServer) 當解析器之行到此步驟時,調用了socketserver的ThreadingTCPServer方法,ThreadingTCPServer是一個類,繼承了TreadingMinIn和TCPServer class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
    • 2.執行ThreadingTCPServer類的構造方法__init__,根據類的多繼承原則,子類不存在的話依次查找父類

      • 查找 父類TreadingMinIn是否有__init__方法,結果沒有,其沒有父類,本段查詢終止

        ```python
        class ThreadingMixIn:
            """Mix-in class to handle each request in a new thread."""
        
            # Decides how threads will act upon termination of the
            # main process
            daemon_threads = False
        
            def process_request_thread(self, request, client_address):
                """Same as in BaseServer but as a thread.
        
                In addition, exception handling is done here.
        
                """
                try:
                    self.finish_request(request, client_address)
                    self.shutdown_request(request)
                except:
                    self.handle_error(request, client_address)
                    self.shutdown_request(request)
        
            def process_request(self, request, client_address):
                """Start a new thread to process the request."""
                t = threading.Thread(target = self.process_request_thread,
                                     args = (request, client_address))
                t.daemon = self.daemon_threads
                t.start()
        ThreadingMinIn類

         

      • 查找父類TCPServer是否有__init__方法,

        class TCPServer(BaseServer):
        
            address_family = socket.AF_INET
        
            socket_type = socket.SOCK_STREAM
        
            request_queue_size = 5
        
            allow_reuse_address = False
        
            def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
                """Constructor.  May be extended, do not override."""
                BaseServer.__init__(self, server_address, RequestHandlerClass)
                self.socket = socket.socket(self.address_family,
                                            self.socket_type)
                if bind_and_activate:
                    try:
                        self.server_bind()
                        self.server_activate()
                    except:
                        self.server_close()
                        raise
        TCPServer的構造方法

         

        TCPServer有構造方法,但 BaseServer.__init__(self, server_address, RequestHandlerClass)實現了首先執行父類BaseServer的構造方法

      • 查找TCPServer父類BaseServer的構造方法:
        class BaseServer:
        
        timeout = None
        
        def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False
        BaseServer的構造方法

      從此構造方法中,server_address為定義的ip和端口,RequestHandlerClass為自己定義的class類MyServer

      • 繼續執行TCPServer的構造方法

      執行sever_bind()方法,實現綁定ip和端口

      def server_bind(self):
                          """Called by constructor to bind the socket.
      
                          May be overridden.
      
                          """
                          if self.allow_reuse_address:
                              self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                          self.socket.bind(self.server_address)
                          self.server_address = self.socket.getsockname()
      server_bind方法

       

      執行server_activate()方法,實現監聽

      def server_activate(self):
                      """Called by constructor to activate the server.
      
                      May be overridden.
      
                      """
                      self.socket.listen(self.request_queue_size)
      server_activate方法

       

      此時socket已經創建完畢

    • 3.構造方法執行完之后,開始執行下面的server的serve_forever()方法

      • 重新開始從子類找server_forever方法
      • 在BaseServer中找到server_forever方法
      def serve_forever(self, poll_interval=0.5):
      
              self.__is_shut_down.clear()
              try:
                  with _ServerSelector() as selector:
                      selector.register(self, selectors.EVENT_READ)
      
                      while not self.__shutdown_request:
                          ready = selector.select(poll_interval)
                          if ready:
                              self._handle_request_noblock()
      
                          self.service_actions()
              finally:
                  self.__shutdown_request = False
                  self.__is_shut_down.set()
      serve_foreve方法
      • 從server_forever方法中看出,最終要執行self._handle_request_noblock()方法
      • 從子類重新開始找self._handle_request_noblock()方法,發現在BaseServer中找到
       def _handle_request_noblock(self):
              try:
                  request, client_address = self.get_request()
              except OSError:
                  return
              if self.verify_request(request, client_address):
                  try:
                      self.process_request(request, client_address)
                  except:
                      self.handle_error(request, client_address)
                      self.shutdown_request(request)
      _handle_request_noblock方法
      • 從self._handle_request_noblock()中看出,最終要執行self.process_request方法
      • 從子類中繼續查找process_request方法,發現在ThreadingMixIn類和BaseServer中同樣都存在,根據多繼承的原則,將優先執行ThredingMixIn的process_request()方法
       def process_request(self, request, client_address):
              t = threading.Thread(target = self.process_request_thread,
                                   args = (request, client_address))
              t.daemon = self.daemon_threads
              t.start()
      process_request方法
      • process_request方法調用threading.Thread方法,實現多線程處理並發請求,每個線程執行的方法為self.process_request_thread,下面開始繼續找該方法
      • 在ThredingMixIn找到該方法
      def process_request_thread(self, request, client_address):
              """Same as in BaseServer but as a thread.
      
              In addition, exception handling is done here.
      
              """
              try:
                  self.finish_request(request, client_address)
                  self.shutdown_request(request)
              except:
                  self.handle_error(request, client_address)
                  self.shutdown_request(request)
      process_request_thread方法
      • process_request_thread調用了self.finish_request方法,繼續查找self.finish_request方法
      • 在BaseServer中找到finish_request方法
      def finish_request(self, request, client_address):
              """Finish one request by instantiating RequestHandlerClass."""
              self.RequestHandlerClass(request, client_address, self)
      finish_reques方法
      • finish_request方法中調用了self.RequestHandlerClass方法,注意此時self.RequestHandlerClass在執行構造方法的時候已經定義為自己創建的Myclass類,所以self.RequestHandlerClass()=self.Myclass()
      • 執行self.Myclass(),首先會執行構造函數,其沒有,繼續查找父類socketserver.BaseRequestHandler
      • 父類socketserver.BaseRequestHandler中有構造方法
      class BaseRequestHandler:
      
          def __init__(self, request, client_address, server):
              self.request = request
              self.client_address = client_address
              self.server = server
              self.setup()
              try:
                  self.handle()
              finally:
                  self.finish()
      
          def setup(self):
              pass
      
          def handle(self):
              pass
      
          def finish(self):
              pass
      BaseRequestHandler類

       

      • socketserver.BaseRequestHandler的構造方法定義了setup() handler() finish()三個方法的執行順序,最后查找到handler()方法並執行,也就驗證了自定義類中必須使用handler()方法來處理客戶端請求
  • 總結

上述只是一個簡單的查找源碼實現多線程的過程,查找過程中一定要捋清楚每個父類的方法,根據多繼承原則進行查找和執行,上述過程簡單做成流程圖如下:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM