python3之socket&socketserver網絡編程


1、套接字與套接模塊

套接字是為特定網絡協議(例如TCP/IP,ICMP/IP,UDP/IP等)套件對上的網絡應用程序提供者提供當前可移植標准的對象。它們允許程序接受並進行連接,如發送和接受數據。為了建立通信通道,網絡通信的每個端點擁有一個套接字對象極為重要。
套接字為BSD UNIX系統核心的一部分,而且他們也被許多其他類似UNIX的操作系統包括Linux所采納。許多非BSD UNIX系統(如ms-dos,windows,os/2,mac os及大部分主機環境)都以庫形式提供對套接字的支持。
三種最流行的套接字類型是:stream,datagram和raw。stream和datagram套接字可以直接與TCP協議進行接口,而raw套接字則接口到IP協議。但套接字並不限於TCP/IP。

套接字模塊是一個非常簡單的基於對象的接口,它提供對低層BSD套接字樣式網絡的訪問。使用該模塊可以實現客戶機和服務器套接字。要在python 中建立具有TCP和流套接字的簡單服務器,需要使用socket模塊。利用該模塊包含的函數和類定義,可生成通過網絡通信的程序。

python提供了兩個級別訪問的網絡服務:

  • 低級的網絡服務支持基本的socket,它提供了標准的BSD sockets API,可以訪問底層操作系統socket接口的全部方法
  • 高級別的網絡服務模塊socketServer,它提供了服務器中心類,可以簡化網絡服務器的開發。

2、socket模塊方法

socket.socketfamily = AF_INETtype = SOCK_STREAMproto = 0fileno = None   :使用給定的地址系列,套接字類型和協議號創建一個新套接字

socket.socketpairfamily [type [proto ):使用給定的地址系列,套接字類型和協議編號構建一對連接的套接字對象

socket.create_connectionaddress [timeout [source_address ):連接到偵聽Internet 地址(2元組 的TCP服務,然后返回套接字對象

socket.fromfdfdfamilytypeproto = 0 ):復制文件描述符fd(由文件對象的fileno()方法返回的整數 ),並從結果中構建一個套接字對象

socket.fromsharedata):從該socket.share() 方法獲得的數據實例化一個套接字假設套接字處於阻塞模式。

socket.SocketType:這是表示套接字對象類型的Python類型對象。這是一樣的type(socket(...))

socket.getaddrinfohostportfamily = 0type = 0proto = 0flags = 0 ):主機 / 端口參數轉換為5元組序列,其中包含創建連接到該服務的套接字的所有必要參數

socket.getfqdnname ):名稱返回完全限定的域名如果名稱被省略或為空,則被解釋為本地主機

socket.gethostbynamehostname):將主機名轉換為IPv4地址格式。IPv4地址以字符串形式返回

socket.gethostbyname_exhostname):將主機名轉換為IPv4地址格式,擴展接口。返回一個triple ,其中主機名是響應給定ip_address的主要主機名aliaslist是同一地址的備用主機名(可能為空)列表,ipaddrlist是同一主機上同一接口的IPv4地址列表經常但不總是一個地址)。不支持IPv6名稱解析,應該用於IPv4 / v6雙棧支持

socket.gethostname):返回包含Python解釋器當前正在執行的機器的主機名的字符串

socket.gethostbyaddrip_address ):返回一個triple ,其中hostname是響應給定ip_address的主要主機名aliaslist是同一地址的備用主機名(可能為空)列表, ipaddrlist是同一個接口的IPv4 / v6地址列表主機(最有可能只包含一個地址)。要找到完全限定的域名,請使用該功能支持IPv4和IPv6

socket.getnameinfosockaddrflags ):將套接字地址sockaddr翻譯成2元組根據標志的設置,結果可以在主機中包含完全限定的域名或數字地址表示同樣,端口可以包含字符串端口名稱或數字端口號。(host, port)

socket.getprotobynameprotocolname ):將Internet協議名稱(例如,'icmp')轉換為適合作為(可選)第三個參數傳遞給該socket() 函數的常量這通常只需要以“原始”模式(SOCK_RAW打開的套接字對於正常的套接字模式,如果協議被省略或為零,則自動選擇正確的協議

socket.getservbynameservicename [protocolname ):將Internet服務名稱和協議名稱轉換為該服務的端口號。可選的協議名稱,如果有,應該是'tcp'或 'udp',否則任何協議將匹配

socket.getservbyportport [protocolname ):將Internet端口號和協議名稱轉換為該服務的服務名稱。可選的協議名稱,如果有,應該是'tcp'或 'udp',否則任何協議將匹配

socket.ntohl):將32位正整數從網絡轉換為主機字節順序。在主機字節順序與網絡字節順序相同的機器上,這是無操作的; 否則,它會執行一個4字節的交換操作。

socket.ntohs):將16位正整數從網絡轉換為主機字節順序。在主機字節順序與網絡字節順序相同的機器上,這是無操作的; 否則,它執行一個2字節的交換操作

socket. htonl 將32位正整數從主機轉換為網絡字節順序。在主機字節順序與網絡字節順序相同的機器上,這是無操作的; 否則,它會執行一個4字節的交換操作。
socket. htons 將16位正整數從主機轉換為網絡字節順序。在主機字節順序與網絡字節順序相同的機器上,這是無操作的; 否則,它執行一個2字節的交換操作

socket.inet_atonip_string ):將IPv4地址從點分四字符串格式(例如“123.45.67.89”)轉換為32位打包二進制格式,作為長度為4個字符的字節對象。當與使用標准C庫的程序進行交談並且需要類型對象

socket.getdefaulttimeout():返回新套接字對象的默認超時值(秒)(float)。值None表示新的套接字對象沒有超時。首次導入套接字模塊時,默認為None。

socket.setdefaulttimeout(timeout):為新的套接字對象設置默認的超時值(秒)(float)。首次導入套接字模塊時,默認為None。請參閱 settimeout()可能的值和它們各自的含義。

socket.sethostname(name):將機器的主機名稱設為名稱。OSError如果你沒有足夠的權利,這將會提高 。

socket.if_nameindex():返回網絡接口信息列表(index int,name string)元組。 OSError如果系統調用失敗。

socket.if_nametoindex(if_name ):返回接口名稱對應的網絡接口索引號。 OSError如果沒有給定名稱的接口存在。

socket.if_indextoname(if_index ):返回接口索引號對應的網絡接口名稱。 OSError如果沒有給定索引的接口存在。

3、socket鏈接

一般socket建立鏈接需要六個步驟,其中包括:socket.socket()創建socket對象、s.bind綁定地址到socket對象、s.listen監聽地址端口、s.accept阻塞接受鏈接請求、s.send,s.recv方法處理通信數據、s.close關閉鏈接。

服務器創建套接字鏈接:

1)創建socket對象: socket.socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)

socket.socket(socket.AF_INET,socket.SOCK_STREAM)使用給定的地址族,套接字類型和協議號來創建一個新套接字.

>>> import socket
#創建TCP socket:
>>> sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#創建UDP socket:
>>> sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)

family為指定的地址族:

  socket.AF_UNIX :只能夠用於單一的Unix系統進程間通信

  socket.AF_INET :服務器之間的網絡通信(ipv4協議的TCP和UDP)ipv4,默認為這個

  socket.AF_INET6 :服務器之間的網絡通信ipv6

type為指定的套接字類型:

  socket.SOCK_STREAM :面向連接的TCP,默認為這個

  socket.SOCK_DGRAM :面向非連接的UDP

family和type參數是指定了一個協議,我們也可以使用proto第三個參數來直接指定使用的協議。我們也可以使用socket下的函數getprotobyname('tcp'),來代替IPPTOTO_XX變量.

>>> import socket
>>> socket.getprotobyname('tcp')
6
>>> socket.IPPROTO_TCP
6

proto為指定的協議號,一般為0:

  socket.IPPROTO_TCP  :TCP傳輸協議

  socket.IPPROTO_UDP   :UDP傳輸協議

  socket.IPPROTO_ICMP  :ICMP鏈接

  socket.IPPROTO_IP        :IP鏈接

  socket.IPPROTO_RAW    :要構建IP頭部和要發送的各種協議的頭部和數據,包括ip頭和協議和數據。

2)socket對象綁定地址及端口

地址必須是一個雙元素的元組,包括(host,port)主機名或IP地址+端口號。如果端口號或地址錯誤將引發socke.error異常。

import socket

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM,socket.IPPROTO_TCP)

HostPort = ('127.0.0.1',8898)
s.bind(HostPort)  #綁定地址端口

3)socket對象監聽地址端口鏈接

socket.listen(backlog)

backlog指定了最多連接數,至少為1,接到連接請求后,這些請求必須排隊等候連接,如果隊列已滿,則拒絕請求。

import socket

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM,socket.IPPROTO_TCP)
HostPort
= ('127.0.0.1',8898) s.bind(HostPort) #綁定地址端口 s.listen(5) #監聽最多5個連接請求

4)socket.accept對象阻塞等待接受鏈接

fd, addr = self._accept()  

調用accept方法時,socket會進入‘waiting’阻塞狀態,客戶請求連接時,方法會建立連接並返回服務器。

accept方法會返回一個含有兩個元素的元組,(fd,addr)。第一個元素是新的socket對象,服務器通過它與客戶端通信。第二個元素是客戶端的地址及端口信息。

import socket

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
HostPort = ('127.0.0.1',8899)
s.bind(HostPort)  #綁定地址端口
s.listen(5)  #監聽最多5個連接請求
while True:
    print('server socket waiting...')
    obj,addr = s.accept()  #阻塞等待鏈接
    print('socket object:',obj)
    print('client info:',addr)


#運行,鏈接輸出信息
server socket waiting...
socket object: <socket.socket fd=260, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8899), raddr=('127.0.0.1', 28237)>
client info: ('127.0.0.1', 28237)
server socket waiting...

5)處理階段,服務器與客戶端通過send和recv方法通信(傳輸數據)

調用新鏈接對象與客戶端或者服務器通信:

socket.recv(buffersize)  :接受客戶端信或服務器數據,buffersize指定接收數據的大小,單位為字節。

socket.send(data) :發送信息給客戶端或服務器,信息必須轉換為字節才能發送。

import socket

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
HostPort = ('127.0.0.1',8899)
s.bind(HostPort)  #綁定地址端口
s.listen(5)  #監聽最多5個連接請求
while True:
    print('server socket waiting...')
    obj,addr = s.accept()  #阻塞等待鏈接,創建新鏈接對象(obj)和客戶端地址(addr)
    while True:
        client_data = obj.recv(1024)  #通過新鏈接對象接受數據
        print(client_data)
        obj.send(client_data)  #通過新鏈接對象發送數據

6)傳輸結束,關閉鏈接

socket.close()  關閉鏈接

 

客戶端創建套接字鏈接:

1)s = socket.socket() 創建socket對象

2)s.connect('127.0.0.1','80')  綁定地址端口鏈接服務器

3)s.send(data)  發送數據到服務器

4)s.recv(1024)  接收服務器數據

5)s.close()  關閉鏈接

4、socket套接字對象方法

服務器段套接字:

s.bind()  :綁定地址(host,port)到套接字,在AF_INET下,以元組(host,port)的形式表示地址。

s.listen()  :開始TCP監聽。backlog指定在拒絕連接之前,操作系統可以掛起的最大連接數量。該值至少為1,大部分應用程序設為5就可以了。

s.accept()  :被動接受TCP客戶端連接,(阻塞式)等待連接的到來

客戶端套接字:

s.connect() :主動初始化TCP服務器連接,。一般address的格式為元組(hostname,port),如果連接出錯,返回socket.error錯誤

s.connect_ex() :connect()函數的擴展版本,出錯時返回出錯碼,而不是拋出異常

公共用途的套接字函數:

s.recv()  :接收TCP數據,數據以字符串形式返回,bufsize指定要接收的最大數據量。flag提供有關消息的其他信息,通常可以忽略。

s.send()  :發送TCP數據,將string中的數據發送到連接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。

s.sendall()  :完整發送TCP數據,完整發送TCP數據。將string中的數據發送到連接的套接字,但在返回之前會嘗試發送所有數據。成功返回None,失敗則拋出異常。

s.recvfrom()  :接收UDP數據,與recv()類似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。

s.sendto()  :發送UDP數據,將數據發送到套接字,address是形式為(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。

s.close()  :關閉套接字

s.getpeername()  :返回連接套接字的遠程地址。返回值通常是元組(ipaddr,port)

s.getsockname()  :返回套接字自己的地址。通常是一個元組(ipaddr,port)

s.setsockopt(level,optname,value)  :設置給定套接字選項的值。

s.getsockopt(level,optname[.buflen])  :返回套接字選項的值。

s.settimeout(timeout)  :設置套接字操作的超時期,timeout是一個浮點數,單位是秒。值為None表示沒有超時期。一般,超時期應該在剛創建套接字時設置,因為它們可能用於連接的操作(如connect())

s.gettimeout()  :返回當前超時期的值,單位是秒,如果沒有設置超時期,則返回None

s.fileno()  :返回套接字的文件描述符

s.setblocking(flag)  :如果flag為0,則將套接字設為非阻塞模式,否則將套接字設為阻塞模式(默認值)。非阻塞模式下,如果調用recv()沒有發現任何數據,或send()調用無法立即發送數據,那么將引起socket.error異常。

s.makefile()  :創建一個與該套接字相關連的文件

5、socket實例

1)簡單的TCP socket會話

服務器代碼:

import socket
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
HostPort = ('127.0.0.1',8898)
s.bind(HostPort)  #綁定地址端口
s.listen(5)  #監聽最多5個連接請求
while True:
    print('server socket waiting...')
    c,addr = s.accept()  #阻塞等待鏈接,創建套接字c鏈接和地址信息addr
    while True:
        try:
            client_date = c.recv(1024) #接收客戶端數據
            if str(client_date,'utf8') == 'quit':
                c.close()
                break
        except Exception:
            break
        c.send(client_date)  #發送數據給客戶端
        print('clientINFO:',str(client_date, 'utf8')) #打印數據,默認接收數據為bytes,需轉換成str

此處使用try捕捉異常,是在客戶端斷開鏈接時,服務器端會拋出異常,為了實現多連接排隊鏈接,必須捕捉異常讓程序正常運行,這種現象只在windows系統下存在;在linux下的表現形式又不同,在linux下不會拋出異常,而是正常接收客戶端數據而不會退出,只需要判斷客戶端數據長度,正常退出急可解決問題。

客戶端代碼:

import socket
hostport = ('127.0.0.1',8898)
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)  #創建TCP socket
s.connect(hostport)  #鏈接套接字

while True:
    user_input = input('>>>:').strip()
    s.send(bytes(user_input,'utf8')) #發送數據到套接字
    if not len(user_input):continue
    if user_input == 'quit':
        s.close()
        break
    server_reply = s.recv(1024) #接收套接字數據

    print(str(server_reply, 'utf8'))  #打印輸出

會話結果:

#首先運行服務器代碼,然后運行客戶端代碼鏈接服務器

#client output:
>>>:hello python
hello python
>>>:socket network programming
socket network programming
>>>:

#server output:
server socket waiting...
clientINFO: hello python
clientINFO: socket network programming

2)簡單的UDP socket會話

服務器接收端:

import socket
HostPort = ('127.0.0.1',7777)
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) #創建UDP套接字
sock.bind(HostPort) #服務器端綁定端口

while True:
    data,addr = sock.recvfrom(1024) #接收端口數據
    print(str(data,'utf8'))  #打印數據

客戶端發送端:

import socket
HostPort = ('127.0.0.1',7777)
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
while True:
    user_input = input('>>>:')
    if user_input == 'quit':break
    sock.sendto(user_input.encode(),HostPort) #指定地址端口發送數據,數據必須encode
sock.close()

3)socke實現簡單版ssh

服務器代碼:

import subprocess
import socket
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
HostPort = ('192.168.146.129',8821)
s.bind(HostPort)  
s.listen(5)  
while True:
    print('server socket waiting...')
    c,addr = s.accept()  #阻塞等待連接,並創建套接字和地址端口對象
    while True:
        client_date = c.recv(1024)  #接收數據
        if not client_date:break  #判斷數據為空則跳出循環
        if str(client_date,'utf8') == 'quit':  #指定正常退出鏈接接口
            c.close()
            break
        strule = client_date.decode()  #解碼數據
        strule_out = subprocess.Popen(strule,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)  #交互系統執行命名,並將輸出和錯誤輸出到管道
        strule_out_read = strule_out.stdout.read()  #讀取管道數據
        if not strule_out_read:  #判斷命令錯誤時,數據為錯誤輸出信息
            strule_out_read = strule_out.stderr.read()
        #在發送數據前,先發送數據長度到客戶端並確認
        coun =bytes("cmd_result_size|%s" %len(strule_out_read),'utf8') 
        #print(coun)
        c.send(coun)  #發送數據長度
       
        client_ack = c.recv(50)  #接收客戶端確認信息
        #print(str(client_ack,'utf8'))
        if str(client_ack,'utf8') == 'client ready to recv': #確認客戶端后開始發送數據
            c.send(strule_out_read)
        #print(str(strule_out_read,'utf8'))
        
        #print('clientINFO:',str(client_date, 'utf8'))

客戶端代碼:

import socket
hostport = ('192.168.146.129',8821)
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)  #創建TCP socket
s.connect(hostport)  #鏈接套接字

while True:
    user_input = input('>>>:').strip()
    if not len(user_input):continue
    if user_input == 'quit':
        s.send(bytes('quit','utf8'))
        s.close()
        break
    else:
        s.send(bytes(user_input, 'utf8'))  # 發送數據到套接字
    server_ack = s.recv(100)  #接收數據長度
    ack_msg = str(server_ack.decode()).split('|')
    if ack_msg[0] == "cmd_result_size": #判斷是否為數據長度包
        ack_size = int(ack_msg[1])  #獲取數據長度
        s.send('client ready to recv'.encode()) #發送客戶端確認信息
    tool = 0
    count_info = ''
    while tool < ack_size:  #判斷接收數據長度是否小於總數據長度,則循環接收數據
        server_reply = s.recv(1024) #接收套接字數據
        count_info +=str(server_reply.decode())
        tool += len(server_reply)
    else:
        print(count_info)  #正常接收完數據后打印

 6、socketserver框架

該socketserver模塊簡化了編寫網絡服務器的任務;共有四個基本的具體服務器類:

class socketserver.TCPServer(server_address,RequestHandlerClass,bind_and_activate=True)

class TCPServer(BaseServer):

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):

        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
 
        self.socket.listen(self.request_queue_size)

    def server_close(self):
 
        self.socket.close()

    def fileno(self):
 
        return self.socket.fileno()

    def get_request(self):
 
        return self.socket.accept()

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        try:
            #explicitly shutdown.  socket.close() merely releases
            #the socket and waits for GC to perform the actual close.
            request.shutdown(socket.SHUT_WR)
        except OSError:
            pass #some platforms may raise ENOTCONN here
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        request.close()
TCPServer源碼

它使用internet TCP協議,提供連續的數據流,如果bind_and_activate為True,則構造函數會自動嘗試調用server_bind()和server_activate()。其他參數會傳遞給BaseServer基類。

class socketserver.UDPServer(server_address,RequestHandlerClass,bind_and_activate=True)

class UDPServer(TCPServer):

    """UDP server class."""

    allow_reuse_address = False

    socket_type = socket.SOCK_DGRAM

    max_packet_size = 8192

    def get_request(self):
        data, client_addr = self.socket.recvfrom(self.max_packet_size)
        return (data, self.socket), client_addr

    def server_activate(self):
        # No need to call listen() for UDP.
        pass

    def shutdown_request(self, request):
        # No need to shutdown anything.
        self.close_request(request)

    def close_request(self, request):
        # No need to close anything.
        pass
UDPServer源碼

它使用數據報通信,不保證數據可靠性。

class socketserver.UnixStreamServer(server_address,RequestHandlerClass,bind_and_activate=True)

class socketserver.UnixDatagramServer(server_address,RequestHandlerClass,bind_and_activate=True)

這兩個很少用,與TCP和UDP類相似,但只使用於UNIX域套接字,只能在Unix平台上使用。

這四個類都是同步處理請求,即完成一個請求后才開始接收下一個請求,會造成數據處理緩慢;解決方案是創建一個單獨的進程或線程來處理每一個請求;可以使用ForkingMixIn和ThreadingMinIn混合型類來用於異步處理數據。

創建一個服務器需要幾個步驟。首先,你必須創建一個請求處理程序類,通過繼承這個BaseRequestHandler類並覆蓋它的handle()方法; 這個方法將處理傳入的請求。其次,您必須實例化其中一個服務器類,並將其傳遞給服務器的地址和請求處理程序類。然后調用服務器對象的 handle_request()or serve_forever()方法來處理一個或多個請求。最后,調用server_close() 關閉套接字。

socketserver類的繼承圖:

+ ------------ + 
|  BaseServer  | 
+ ------------ + 
      | 
      v 
+ ----------- +         + ------------------ + 
|  TCPServer  | -------> |  UnixStreamServer  | 
+ ----------- +         + ------------------ + 
      | 
      v 
+ ----------- +         + -------------------- + 
|  UDPServer  | -------> |  UnixDatagramServer  | 
+ ----------- +         + -------------------- +

請注意,UnixDatagramServer源自而UDPServer不是來自 UnixStreamServer- 一個IP和一個Unix流服務器之間的唯一區別是地址系列,這在兩個Unix服務器類中簡單地重復。

class socketserver.ForkingMixIn       進程類,它實現了異步多進程多客戶端鏈接

class ForkingMixIn:

    """Mix-in class to handle each request in a new process."""

    timeout = 300
    active_children = None
    max_children = 40

    def collect_children(self):
        """Internal routine to wait for children that have exited."""
        if self.active_children is None:
            return
        while len(self.active_children) >= self.max_children:
            try:
                pid, _ = os.waitpid(-1, 0)
                self.active_children.discard(pid)
            except ChildProcessError:
                # we don't have any children, we're done
                self.active_children.clear()
            except OSError:
                break

        # Now reap all defunct children.
        for pid in self.active_children.copy():
            try:
                pid, _ = os.waitpid(pid, os.WNOHANG)
                # if the child hasn't exited yet, pid will be 0 and ignored by
                # discard() below
                self.active_children.discard(pid)
            except ChildProcessError:
                # someone else reaped it
                self.active_children.discard(pid)
            except OSError:
                pass

    def handle_timeout(self):
        self.collect_children()

    def service_actions(self):
        self.collect_children()

    def process_request(self, request, client_address):
        """Fork a new subprocess to process the request."""
        pid = os.fork()
        if pid:
            # Parent process
            if self.active_children is None:
                self.active_children = set()
            self.active_children.add(pid)
            self.close_request(request)
            return
        else:
            # Child process.
            # This must never return, hence os._exit()!
            try:
                self.finish_request(request, client_address)
                self.shutdown_request(request)
                os._exit(0)
            except:
                try:
                    self.handle_error(request, client_address)
                    self.shutdown_request(request)
                finally:
                    os._exit(1)
ForkingMixIn源碼

class socketserver.ThreadingMixIn    線程類,它實現了異步多線程多客戶端鏈接

class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start() 
ThreadingMixIn源碼

 

每種服務器的進程版和線程版都可以使用這兩個混合類來創建,如創建一個ThreadingUDPServer:

class ThreadingUDPServer(ThreadingMixIn,UDPServer):
     pass

混合類優先,因為它覆蓋了一個定義在UDPServer中的方法,設置各種屬性也會改變底層服務器機制的行為。

TCP多進程:class socketserver.ForkingTCPServer

UDP多進程:class socketserver.ForkingUDPServer

TCP多線程:class socketserver.ThreadingTCPServer

UDP多線程:class socketserver.ThreadingUDPServer

 它們實際上繼承了ForkingMixIn或ThreadingMixIn類來處理異步通信,然后調用TCPServer服務器來實例化一個鏈接。

class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass

class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

 

 這些類是使用混合類預定義的。

要實現一個服務,你必須派生一個類BaseRequestHandler 並重新定義它的handle()方法。然后,您可以通過將其中一個服務器類與請求處理程序類相結合來運行各種版本的服務。對於數據報或流服務,請求處理程序類必須不同。這可以通過使用處理程序子類StreamRequestHandler隱藏 DatagramRequestHandler

 1)服務器對象

class socketserver.BaseServerserver_addressRequestHandlerClass 

這是模塊中所有服務器對象的超類。它定義了下面給出的接口,但不實現大多數在子類中完成的方法。兩個參數被存儲在相應的 server_addressRequestHandlerClass屬性。

class BaseServer:

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def server_activate(self):
        pass
    def serve_forever(self, poll_interval=0.5):
        self.__is_shut_down.clear()
        try:
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)

                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    if ready:
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        self.__shutdown_request = True
        self.__is_shut_down.wait()

    def service_actions(self):
        pass
    def handle_request(self):
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        if timeout is not None:
            deadline = time() + timeout

        with _ServerSelector() as selector:
            selector.register(self, selectors.EVENT_READ)

            while True:
                ready = selector.select(timeout)
                if ready:
                    return self._handle_request_noblock()
                else:
                    if timeout is not None:
                        timeout = deadline - time()
                        if timeout < 0:
                            return self.handle_timeout()

    def _handle_request_noblock(self):
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
        else:
            self.shutdown_request(request)

    def handle_timeout(self):
        pass

    def verify_request(self, request, client_address):
        return True

    def process_request(self, request, client_address):
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        pass

    def finish_request(self, request, client_address):
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        self.close_request(request)

    def close_request(self, request):
        pass

    def handle_error(self, request, client_address):
        print('-'*40)
        print('Exception happened during processing of request from', end=' ')
        print(client_address)
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print('-'*40)
BaseServer源碼
fileno

返回服務器正在偵聽的套接字的整數文件描述符。這個功能通常被傳遞給selectors,允許在同一個進程中監視多個服務器。

handle_request

處理一個請求。這個函數調用下面的方法依次是:get_request()verify_request(),和process_request()如果handle()處理程序類的用戶提供的 方法引發異常,handle_error()則將調用服務器的方法。如果在timeout 幾秒鍾內沒有收到請求handle_timeout()將會被調用並handle_request()返回。

serve_forever poll_interval = 0.5 

處理請求,直到一個明確的shutdown()請求。輪詢關閉每個poll_interval秒。忽略該timeout屬性。它還會調用service_actions()子類或mixin可能用於提供給定服務的特定操作。例如,這個 ForkingMixInservice_actions()用來清理僵屍子進程。

在版本3.3中更改:添加service_actions了對該serve_forever方法的調用

service_actions

這在serve_forever()循環中被調用這個方法可以被子類或mixin類覆蓋,以執行特定於給定服務的操作,例如清理操作。

3.3版本中的新功能

shutdown

告訴serve_forever()循環停止並等待,直到它結束。

server_close

清理服務器。可能會被覆蓋。

address_family

服務器套接字所屬的協議族。常見的例子是socket.AF_INETsocket.AF_UNIX

RequestHandlerClass

用戶提供的請求處理程序類; 這個類的一個實例是為每個請求創建的。

server_address

服務器正在偵聽的地址。地址格式因協議族而異,socket有關詳細信息,請參閱該模塊的文檔對於Internet協議,這是一個元組,其中包含一個給出地址的字符串和一個整數端口號:例如。('127.0.0.1',80)

socket

服務器將偵聽傳入請求的套接字對象。

服務器類支持以下類變量:

allow_reuse_address

服務器是否允許重用地址。這個默認為 False,可以在子類中設置來更改策略。

request_queue_size

請求隊列的大小。如果處理單個請求需要很長時間,則在服務器繁忙時到達的任何請求都被放入隊列中,直至request_queue_size請求。一旦隊列已滿,來自客戶端的進一步請求將會得到“連接被拒絕”錯誤。默認值通常是5,但這可以由子類覆蓋。

socket_type

服務器使用的套接字的類型; socket.SOCK_STREAM並且 socket.SOCK_DGRAM是兩個共同的價值。

timeout

超時持續時間,以秒為單位,或者None如果不需要超時。如果handle_request()在超時期限內沒有收到傳入的請求,handle_timeout()則調用方法。

有許多服務器方法可以被基類服務器類的子類覆蓋,比如TCPServer這些方法對服務器對象的外部用戶沒有用處。

finish_request

實際上通過實例化RequestHandlerClass和調用它的handle()方法來處理請求

get_request

必須接受來自套接字的請求,並返回包含 要用於與客戶端通信套接字對象的2元組以及客戶端的地址。

handle_error requestclient_address 

如果實例handle() 方法RequestHandlerClass引發異常,則調用此函數默認操作是將回溯打印到標准輸出,並繼續處理更多的請求。

handle_timeout

timeout屬性被設置為一個非None超時時,這個函數被調用,超時時間已經過去,沒有收到請求。派生服務器的默認動作是收集退出的任何子進程的狀態,而在線程服務器中,這個方法什么也不做。

process_request requestclient_address 

調用finish_request()來創建一個實例 RequestHandlerClass如果需要,這個函數可以創建一個新的進程或線程來處理請求; ForkingMixIn和 ThreadingMixIn班做到這一點。

server_activate

由服務器的構造函數調用以激活服務器。TCP服務器的默認行為只是listen() 在服務器的套接字上調用可能會被覆蓋。

server_bind

由服務器的構造函數調用,將套接字綁定到所需的地址。可能會被覆蓋。

verify_request requestclient_address 

必須返回一個布爾值; 如果值是True,請求將被處理,如果是False,請求將被拒絕。這個函數可以被覆蓋來實現服務器的訪問控制。默認的實現總是返回True

 2)請求處理對象

class socketserver. BaseRequestHandler
class BaseRequestHandler:
    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

這是所有請求處理程序對象的超類。它定義了下面給出的接口。一個具體的請求處理子類必須定義一個新的handle()方法,並且可以覆蓋任何其他的方法。為每個請求創建一個新的子類實例。

setup

handle()方法執行任何所需的初始化操作之前調用默認實現什么都不做。

handle

該功能必須完成所有需要的工作來處理請求。默認實現什么都不做。有幾個實例屬性可用; 該請求可用於self.request客戶地址為self.client_address和服務器實例一樣 self.server,以防需要訪問每個服務器的信息。

self.request數據報或流服務的類型是不同的。對於流服務,self.request是一個套接字對象; 對於數據報服務來說,self.request是一對字符串和套接字。

finish

在調用handle()方法之后調用所需的清理操作。默認實現什么都不做。如果setup() 引發異常,則不會調用該函數。

class socketserver. StreamRequestHandler,繼承了BaseRequestHandler類,它只是重寫了setup和finish方法。
class StreamRequestHandler(BaseRequestHandler):
    rbufsize = -1
    wbufsize = 0

    # A timeout to apply to the request socket, if not None.
    timeout = None

    # Disable nagle algorithm for this socket, if True.
    # Use only when wbufsize != 0, to avoid small packets.
    disable_nagle_algorithm = False

    def setup(self):
        self.connection = self.request
        if self.timeout is not None:
            self.connection.settimeout(self.timeout)
        if self.disable_nagle_algorithm:
            self.connection.setsockopt(socket.IPPROTO_TCP,
                                       socket.TCP_NODELAY, True)
        self.rfile = self.connection.makefile('rb', self.rbufsize)
        self.wfile = self.connection.makefile('wb', self.wbufsize)

    def finish(self):
        if not self.wfile.closed:
            try:
                self.wfile.flush()
            except socket.error:
                # A final socket error may have occurred here, such as
                # the local error ECONNABORTED.
                pass
        self.wfile.close()
        self.rfile.close()
StreamRequestHandler
class socketserver. DatagramRequestHandler,繼承了BaseRequestHandler類,它只是重寫了setup和finish方法。
class DatagramRequestHandler(BaseRequestHandler):
    def setup(self):
        from io import BytesIO
        self.packet, self.socket = self.request
        self.rfile = BytesIO(self.packet)
        self.wfile = BytesIO()

    def finish(self):
        self.socket.sendto(self.wfile.getvalue(), self.client_address)
DatagramRequestHandler

這些BaseRequestHandler子類重寫 setup()finish() 方法,提供self.rfileself.wfile屬性。self.rfileself.wfile屬性可以被讀取或寫入,分別獲得請求的數據或者數據返回給客戶端。

 3)socketserver實例

  • socketserver.TCPServer示例:實現同步多並發鏈接,一個進程只能在處理完一個鏈接后才能處理第二個鏈接。

服務器端:

import socketserver

class MyTCPHandler(socketserver.BaseRequestHandler):
    ‘‘‘服務器處理請求類程序,每連接一次服務器實例化一次,並重寫handle()方法來實現通信客戶端’’’
    def handle(self):
        print('new connection:',self.client_address) #獲取客戶端地址和端口
        while True:
            self.data = self.request.recv(1024).strip() #request獲取連接客戶端的套接字,recv接收客戶端數據
            print('{}AddrAndPort:'.format(self.client_address))
            print(self.data)
            self.request.sendall(self.data.upper())  #發送數據到客戶端並轉換大小寫

if __name__ == "__main__":
    HOSTPORT = ('127.0.0.1',9988)
    #創建服務器,綁定地址端口和類 
    server = socketserver.TCPServer(HOSTPORT,MyTCPHandler)
    #運行服務器,相對於守護進程,直到按ctrl-c來結束程序
    server.serve_forever()
  • 使用預定義包裝的類ThreadingTCPServer實現異步並發鏈接:
import socketserver
class Mysocket(socketserver.StreamRequestHandler): #重寫setup和finish方法
    def handle(self):  #處理鏈接
        print('client:',self.client_address)
        while 1:
            data = self.request.recv(1024)
            self.request.send(data)
            print(data.decode())

if __name__ == '__main__':
    AddrPort = ('127.0.0.1',7899)
    server = socketserver.ThreadingTCPServer(AddrPort,Mysocket) #多線程創建實例鏈接
    server.serve_forever()  #循環接收鏈接
  • windows系統下的一個bug:
import socketserver
class Mysocket(socketserver.BaseRequestHandler):
    def handle(self):
        print('client:',self.client_address)
        while 1:
            data = self.request.recv(1024)
            self.request.send(data)
            print(data.decode())

if __name__ == '__main__':
    AddrPort = ('127.0.0.1',7899)
    #如果此處使用ForkingTCPServer類來創建多進程的實例化鏈接,在windows系統下會報錯
    server = socketserver.ForkingTCPServer(AddrPort,Mysocket)
    server.serve_forever()
#鏈接后異常:
 File "Z:\Program Files\Python35\lib\socketserver.py", line 588, in process_request
    pid = os.fork()
AttributeError: module 'os' has no attribute 'fork'
#原因是:
`os.fork` isn't available on Windows
os.fork在windows上是不可用的。
  • 簡單版FTPServer:
#服務器代碼:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/12 11:02
# @Author  : Py.qi
# @File    : SOCKSERVER_01.py
# @Software: PyCharm

from socketserver import ThreadingTCPServer,StreamRequestHandler
import os,json
host_path = os.getcwd()
class MyHandler(StreamRequestHandler):

    def handle(self):
        print('clientaddr:',self.client_address)
        while True:
            try:
                data = self.request.recv(1024)
            except Exception:
                break

            if data.decode() == 'ls':   #簡單查看目錄下的文件
                lsdir = os.listdir(os.getcwd())
                pi = json.dumps(lsdir)
                self.request.send(pi.encode())
            if len(data.decode().split()) >= 2:
                action, filename = data.decode().split()
                print(action,filename)
                filename_cd = host_path + '\\' + filename
                if action == 'get':
                    print(action)
                    self.getfile(filename)
                    print('getfiel..end')
                elif action == 'upt':  #上傳交給類方法處理
                    print(action)
                    self.uptfile(filename)
                    print('upt---end')
                elif action == 'cd':  #簡單cd命令
                    os.chdir(filename_cd)
                    lsdir1 = os.listdir(filename_cd)
                    p2 = json.dumps(lsdir1)
                    self.request.send(p2.encode())


    def getfile(self,filename):  #文件下載
        filename_path = host_path + '\\' + filename
        print(filename_path)
        with open(filename_path,'rb') as f:
            filedata = f.read()
            self.request.send(filedata)
            self.request.close()
    def uptfile(self,filename):  #文件上傳
        self.request.send(bytes('ok','utf8'))
        filename_uptfile = host_path + '\\' + filename
        with open(filename_uptfile,'wb') as f1:
            uptdata = self.request.recv(1024)
            f1.write(uptdata)
            self.request.close()


if __name__ == "__main__":
    hostprot = ('127.0.0.1',5556)
    server = ThreadingTCPServer(hostprot,MyHandler)
    print('connection to who...')
    server.serve_forever()


#客戶端代碼:

#!/usr/bin/env python
#coding:utf8
#file:Administrator
#time:20180104

import socket,os,json
host_path = r'Z:\\'
hostport = ('127.0.0.1',5556)
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)  #創建TCP socket
#s.bind(hostport)
s.connect(hostport)  #鏈接套接字

while True:
    user_input = input('>>>:').strip()
    s.send(user_input.encode()) #發送數據到套接字
    if not len(user_input):continue
    if user_input == 'quit':
        s.close()
        break
    server_reply = s.recv(1024) #接收套接字數據
    if not server_reply:break
    if len(user_input.split()) > 1:
        if user_input.split()[0] == 'get':
            filename = user_input.split()[1]
            filename_path = host_path + filename
            with open(filename_path,'wb') as f:
                print('----')
                f.write(server_reply)
        if user_input.split()[0] == 'upt':
            #redy = s.recv(1024)
            print(server_reply)
            filename_u = user_input.split()[1]
            filename_u_path = host_path + filename_u
            with open(filename_u_path,'rb') as f1:
                upt_data = f1.read()
                print(upt_data)
                s.send(upt_data)
    elif len(user_input.split()) == 1:
        lsdata = server_reply.decode()
        print(lsdata)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM