快速使用python搭建一個簡易服務器 --- socketServer


官方提供了socketserver包去方便我們快速的搭建一個服務器框架。

server類

socketserver包提供5個Server類,這些單獨使用這些Server類都只能完成同步的操作,他是一個單線程的,不能同時處理各個客戶端的請求,只能按照順序依次處理。

+------------+
| BaseServer |
+------------+
    |
    v
+-----------+        +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+        +------------------+
    |
    v
+-----------+        +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+        +--------------------+

兩個Mixin類

+--------------+        +----------------+
| ForkingMixIn |        | ThreadingMixIn |
+--------------+        +----------------+

各自實現了多進程和多線程的功能(ForkingMixIn在Windows不支持)

於是將這些同步類和Mixin類組合就實現了異步服務類的效果。

class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

class ForkingUDPServer(ForkingMixIn, UDPServer): pass 
class ForkingTCPServer(ForkingMixIn, TCPServer): pass

基本使用

由於server需要同時處理來自多個客戶端的請求,需要提供異步的支持,所以通常使用上面的異步類創建服務器。在Windows系統中沒有提供os.fork()接口,Windows無法使用多進程的ForkingUDPServer和ForkingTCPServer,只能使用ThreadingTCPServer或者ThreadingUDPServer;而Linux和Unix多線程和多進程版本都可以使用。

服務器主要負責接受客戶端的連接請求,當一個新的客戶端請求到來后,將分配一個新的線程去處理這個請求(異步服務器ThreadingTCPServer),而與客戶端信息的交互則交給了專門的請求處理類(RequestHandlerClass)處理。

import socketserver

# 創建一個基於TCP的server對象,並使用BaseRequestHandler處理客戶端發送的消息
server = socketserver.ThreadingTCPServer(("127.0.0.1", 8000), BaseRequestHandler)  

server.serve_forever()  # 啟動服務器,

只需要上面兩行代碼就可以創建開啟一個服務,運行上面代碼后常看本機8000端口,發現有程序正在監聽。

C:\Users\user>netstat -anp tcp | findstr 8000
TCP    127.0.0.1:8000         0.0.0.0:0              LISTENING
ThreadingTCPServer可以對我們的請求進行接受,但是並不會進行處理請求,處理請求的類是上面指定BaseRequestHandler類,該類可以定義handle方法來處理接受的請求。

BaseRequestHandler的源碼
class BaseRequestHandler:

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

server = socketserver.ThreadingTCPServer(("127.0.0.1", 8000), BaseRequestHandler)中,BaseRequestHandler將作為參數綁定到服務器的實例上,服務器啟動后,每當有一個新的客戶端接接入服務器,將會實例化一個請求處理對象,並傳入三個參數,request(連接客戶端的socket)、client_address(遠程客戶端的地址)、server(服務器對象),執行init方法,將這三個參數保存到對應屬性上。這個請求處理對象便可以與客戶端交互了。

簡單示例

import socketserver
import threading 

class MyRequestHandler(socketserver.BaseRequestHandler):
    """ BaseRequestHandler的實例化方法中,獲得了三個屬性
    self.request = request   # 該線程中與客戶端交互的 socket 對象。
    self.client_address      # 該線程處理的客戶端地址
    self.server = server     # 服務器對象
    """

    def handle(self):
        while True:
            msg = self.request.recv()   # 接受客戶端的數據
            if msg == b"quit" or msg == "":  # 退出
                break

            print(msg.decode())
            self.request.send(msg)  # 將消息發送回客戶端

    def finish(self):
        self.request.close()       # 關閉套接字

if __name__ == "__main__":
    # 創建一個基於TCP的server對象,並使用BaseRequestHandler處理客戶端發送的消息
    server = socketserver.ThreadingTCPServer(("127.0.0.1", 8000), MyRequestHandler)

    server.serve_forever()   # 啟動服務器

我們創建了一個ThreadingTCPServer服務器,然后在傳入的處理類MyRequestHandler,並在handle方法中提供與客戶端消息交互的業務邏輯,此處只是將客戶端的消息返回客戶端。最后我們在finish方法中關閉資源,finish方法使用了finally機制,保證了這些代碼一定會執行。

 

上一篇使用socket實現了一個群聊服務器,這個里使用socketServer將更加方便的實現

class MyRequestHandle(BaseRequestHandler):
    clients = {}  # 在類屬性中記錄所有與客戶端連接socket。
    lock = threading.Lock()  # 互斥鎖,各個線程共用

    def setup(self):  # 新的用戶連接時,預處理,將這個新的連接加入到clients中,考慮線程安全,需要加鎖
        with self.lock:
            self.clients[self.client_address] = self.request

    def handle(self):  # 處理客戶端的請求主邏輯
        while True:
            data = self.request.recv(1024).strip()   # 接受數據

            if data == b"quit" or data == b"":  # 客戶端退出
                with self.lock:
                    self.server.clients.pop(self.client_address)
                    self.request.close()
                    break

            print("{}-{}: {}".format(*self.client_address, data.decode()))

            with self.lock:
                for _, c in self.server.clients.items():  # 群發
                    c.send(data)

    def finish(self):
        with server.lock:
            for _, c in server.clients.items():
                c.close()
        server.server_close()def main():
    server = ThreadingTCPServer(("127.0.0.1", 8000), MyRequestHandle)
    # 將創建的所有線程設置為daemon線程,這樣控台主程序退出時,這個服務器的所有線程將會被結束
    server.daemon_threads = True 

if __name__ == "__main__":
    main()

上面requestHandlerclass中的handle方法和finish方式對應了上一篇中TCP服務器的recv方法和stop方法,他們處理請求的邏輯是相同的。只是上面使用了socketserver的代碼變少了,處理的邏輯也變少了,TCPserver幫我們完成了大量的工作,這利於軟件的快速開發。

內置的兩個RequestHandlerClass

StreamHandlerRequest

StreamHandlerRequest顧名思義是一種流式的求情處理類,對應TCP協議的面向字節流的傳輸形式。我們從源代碼分析。(去除了一些次要代碼)

class StreamRequestHandler(BaseRequestHandler):
    rbufsize = -1  # 讀緩存
    wbufsize = 0   # 寫緩存
    timeout = None # 超時時間
    # IP/TCP擁塞控制的Nagle算法算法。
    disable_nagle_algorithm = False

    def setup(self): # 實現了setup,
        self.connection = self.request
        if self.timeout is not None:
            self.connection.settimeout(self.timeout)
        if self.disable_nagle_algorithm:
            self.connection.setsockopt(socket.IPPROTO_TCP,
                                       socket.TCP_NODELAY, True)
        
        # 使用 makefile方法獲得了一個只讀文件對象 rfile
        self.rfile = self.connection.makefile('rb', self.rbufsize)
        
        # 獲得一個只寫的文件對象 wfile
        if self.wbufsize == 0:
            self.wfile = _SocketWriter(self.connection)
        else:
            self.wfile = self.connection.makefile('wb', self.wbufsize)

    def finish(self):  # 負責將這個 wfile 和 rfile方法關閉。
        if not self.wfile.closed: 
            try:
                self.wfile.flush()
            except socket.error:
                pass
        self.wfile.close()
        self.rfile.close()

使用StreamRequestHandler方法可以將這個socket包裝成一個類文件對象,方便我們使用一套文件對象的方法處理這個socket,它沒有實現handle方法,我仍然需要我們實現。我們可以這樣使用它

class MyHandle(StreamRequestHandler):
    # 如果需要使用setup和finish方法,需要調用父類方法,否則該方法將會被覆蓋。
    def setup(self):
        super().setup()
        # 添加自己的需求

    def handle(self):
        # 這里我們可以使用wfile和rfile來處理socket消息了,例如之前使用self.request.recv()方法等同於self.rfile.read()
        # 而 self.wfile.write 等同於 self.request.send(),在handle方法中完成業務邏輯即可

    def finish(self):
        super().finish()

server = ThreadingTCPServer("127.0.0.1", MyHandle)
server.serve_forever()

StreamRequestHandler主要定義了兩個新的 wfile對象和rfile對象,來分別對這個socket進行讀寫操作,當我們業務需要時,比如需要使用文件接口方法時,選擇繼承於StreamRequestHandler構建我們自己處理請求類來完成業務邏輯將會更加的方便。

DatagramRequestHandler

DatagramRequestHandler字面意思是數據報請求處理,也就是基於UDPServer的服務器才能使用該請求處理類

class DatagramRequestHandler(BaseRequestHandler):

    def setup(self):
        from io import BytesIO
        # udp的self.request包含兩部分(data,socket)它來自於
        # data, client_addr = self.socket.recvfrom(self.max_packet_size)
        #     return (data, self.socket), client_addr
        # (data, self.socket)就是這個self.request,在這里將其解構,data為recvfrom接收的數據
        self.packet, self.socket = self.request
        
        # 該數據包封裝為 BytesIO,同樣為一個類文件對象。
        self.rfile = BytesIO(self.packet)
        self.wfile = BytesIO()

    def finish(self):
        self.socket.sendto(self.wfile.getvalue(), self.client_address)

從源碼可以看出,DatagramRequestHandler將數據包封裝為一個rfile,並實例化一個ByteIO對象用於寫入數據,寫入的數據可以通過self.socket這個套接字發送。這樣可以使用rfile和wfile這兩個類文件對象的read或者write接口來進行一些IO方面的操作。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM