Python面試題之解讀Socketserver & Tcpserver


 

在解析socketserver是如工作之前,我們先看看socektserver類的繼承關系圖:

  請求類繼承關系:

          

 

  server類繼承關系:

          

 

  有了上面的繼承關系圖后,我們解析socketserver就輕松多了,下面,我們從代碼開始,慢慢揭開socketserver面紗:

復制代碼
import socketserver
import struct, json, os

class FtpServer(socketserver.BaseRequestHandler):
    coding = 'utf-8'
    server_dir = 'file_upload'
    max_packet_size = 1024
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))

    def handle(self):
        print(self.request)
        while True:
            data = self.request.recv(4)
            data_len = struct.unpack('i', data)[0]
            head_json = self.request.recv(data_len).decode(self.coding)
            head_dic = json.loads(head_json)
            cmd = head_dic['cmd']
            if hasattr(self, cmd):
                func = getattr(self, cmd)
                func(head_dic)

    def put(self):
        pass

    def get(self):
        pass

if __name__ == '__main__':
    HOST, PORT = "localhost", 9999
    with socketserver.ThreadingTCPServer((HOST, PORT), FtpServer) as server:
        server.serve_forever()
復制代碼

  我們通過socketserver.ThreadingTCPServer實例化對象server,那么此時應用調用類的__init__方法,前往ThreadingTCPServer類看看:

class ThreadingTCPServer(ThreadingMixIn, UDPServer): pass

  發現這個類啥都沒寫,我們知道,如果一個類什么方法都沒有定義,那么它的方法肯定都是從其父類繼承而來,接着,先到ThreadingMinIn里面看看,

class ThreadingMixIn:
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        passdef process_request(self, request, client_address):
       pass

  這個類也沒有__init__方法,因此,我們應該去右繼承的父類TCPserver中找:

復制代碼
class TCPServer(BaseServer):
    address_family = socket.AF_INET
    socket_type = socket.SOCK_STREAM
    request_queue_size = 5
    allow_reuse_address = False
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        BaseServer.__init__(self, server_address, RequestHandlerClass)#
        self.socket = socket.socket(self.address_family,self.socket_type)  # 創建套接字對象
        if bind_and_activate:
            try:
                self.server_bind()  #綁定端口和IP
                self.server_activate()  # 監聽端口
            except:
                self.server_close()
                raise
復制代碼

  看到Tcpserver的__init__方法,完成了以下幾件事:

    創建套接字,綁定端口和IP,並監聽

    將端口、IP和我們創建類傳遞到Baseserver類中;

  此時,對象的初始化工作並沒有完成,接着,我們要進入baseserver類,看看該類下的__init__完成了什么工作:

class BaseServer:
    timeout = None
    def __init__(self, server_address, RequestHandlerClass):
        self.server_address = server_address #將端口和IP暫存
        self.RequestHandlerClass = RequestHandlerClass  #暫存我們創建的類
        self.__is_shut_down = threading.Event() # 創建event對象

  到此,對象的初始化工作完成。然后是調用serve_forever()方法,開始不斷循環監聽。下面,我們來看看,這個server_forever實現

  注意:我們要清楚一點,我們在找這個方法在哪里的時候,一定要按照順序去找,也就是說,我們先得從子類開始找,如果子類不存在,就去其父類找。下面我們就遵循這個原則來找找看。

  先來看看左繼承的父類ThreadingMixIn中有沒有server_forever:

復制代碼
class ThreadingMixIn:
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        try:
            self.finish_request(request, client_address)
        except Exception:
            self.handle_error(request, client_address)
        finally:
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()
復制代碼

  再來看看父類Tcpserver:

復制代碼
class TCPServer(BaseServer):def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
      
    def server_bind(self):
        def server_activate(self):
       def server_close(self):
def fileno(self):
 def get_request(self):
 def shutdown_request(self, request):
   def close_request(self, request):
    
復制代碼

  我們發現,沒有server_forever方法,好,我去其繼承的父類BaseServer類看看:

復制代碼
class BaseServer:def __init__(self, server_address, RequestHandlerClass):
        def server_activate(self):
       
    def serve_forever(self, poll_interval=0.5): def shutdown(self):
        def service_actions(self):
        
    def handle_request(self):
      def _handle_request_noblock(self):
       def handle_timeout(self):
        
    def verify_request(self, request, client_address):
        def process_request(self, request, client_address):
     def server_close(self):
      
    def finish_request(self, request, client_address):
    def shutdown_request(self, request):
     def close_request(self, request):
 
    def handle_error(self, request, client_address):
def __enter__(self):
def __exit__(self, *args):

復制代碼

  我們發現server_forever()果然在這個類中,現在,我們的目標是:找到在什么地方調用我們自己寫的handle方法。

  在我們找到的server_forever()方法中,

復制代碼
 def serve_forever(self, poll_interval=0.5):
        self.__is_shut_down.clear()
        try:
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)#原來底層是用epoll來實現不斷循環監聽                 while not self.__shutdown_request:
                    ready = selector.select(poll_interval) #有新的鏈接進來
                    if ready:
                        self._handle_request_noblock() # 這里應該是處理新的鏈接
                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()
復制代碼

  好,我大致找到了鏈接的處理入口,我們跟進去,繼續尋找:

復制代碼
    def _handle_request_noblock(self):
        try:
            request, client_address = self.get_request()         except OSError:
            return
        if self.verify_request(request, client_address):             try:
                self.process_request(request, client_address)#注意這里的process_request()             except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)
復制代碼

  到源碼中,我們找到該函數,現在,只看我划線的部分。其他部分都是針對異常的處理,如果沒有異常,其他都是不會執行的,所以,其他的異常處理,我們先暫時不看。

  我們發現,如果有鏈接,最后會交給process_request()(我們會發現,在baseserver類和ThreadingMixIn都有這個方法,這里找類方法,一定要按照類的繼承順序來查找),所以,我們到ThreadingMiXin中去看看processs_request()做了哪些事情:

    def process_request(self, request, client_address):

        t = threading.Thread(target = self.process_request_thread,args = (request, client_address)) # 原來開了一個線程,支持並發
        t.daemon = self.daemon_threads # 開啟守護線程
        t.start()

  在線程中執行該類下的process_requsest_thread()方法,

復制代碼
    def process_request_thread(self, request, client_address):
        try:
            self.finish_request(request, client_address)
        except Exception:
            self.handle_error(request, client_address)
        finally:
            self.shutdown_request(request)
復制代碼

  到此為止,鏈接建立成功!

  下面,我們來看看,當有消息發送,是如何進行處理的。

  當有消息發送,selector監聽到了,

復制代碼
    def serve_forever(self, poll_interval=0.5):
        self.__is_shut_down.clear()
        try:
        
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)# 監聽了活動鏈接

                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    if ready: # 准備好了
                        self._handle_request_noblock() # 進入處理

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()
復制代碼

  下面我們跟進_handle_request_noblock(),

復制代碼
    def _handle_request_noblock(self):
        try:
            request, client_address = self.get_request()         except OSError:
            return
        if self.verify_request(request, client_address):             try:
                self.process_request(request, client_address)             except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)
復制代碼

  我們到process_request()看看:

復制代碼
    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,  # start a threading to handle the request
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()
復制代碼

  然后開啟線程執行,process_request_thread()方法,

復制代碼
    def process_request_thread(self, request, client_address):
      
        try:
            self.finish_request(request, client_address) # -----> to Baseserver find
        except Exception:
            self.handle_error(request, client_address)
        finally:
            self.shutdown_request(request)
復制代碼

  然后調用finish_request()方法,現在我們跟進看看,

    def finish_request(self, request, client_address):
        self.RequestHandlerClass(request, client_address, self)

  執行了RequestHandlerClass(request, client_address, self),這個是啥??還記得最開始我們傳進來的類保存在哪呢?沒錯,就是RequestHandlerClass里面,現在這里才開始實例化這個類,也就是說,在這里開始調用我們自己的類了。既然是調用我們自己的類,那么必然要實例化,我們先回到自己創建的類,找找__init__方法。

class MyTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        self.data = self.request.recv(1024).strip()
        print("{} wrote:".format(self.client_address[0]))
        print(self.data)
        self.request.sendall(self.data.upper())

  自己類沒有寫__init__方法,那么我去它繼承的BaseRequestHandler()下面找找看:

復制代碼
class BaseRequestHandler:
    def __init__(self, request, client_address, server):
        self.request = request # 接受傳進來的請求鏈接
        self.client_address = client_address  # 客戶端的ip/端口
        self.server = server  # 
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass
復制代碼

  我們來看看,它繼承類實例化完成了哪些操作:

    調用handle()方法,我們發現,在這個類中也有一個handle()方法,那么這里調用時調用自己寫的還是這個類中的呢?

  當然是調用我們自己寫!

  至此,我們完成了一次通信的完整過程!

  總結sockerserver整個流程:

    1.開啟了線程,支持並發操作

    2.I/O多路復用,監聽多個文件描述符!

 

參考

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM