python之socket編程


一、客戶端/服務器架構

1.硬件C/S架構(打印機)

2.軟件C/S架構

  互聯網中處處是C/S架構

  如購物網站是服務端,你的瀏覽器是客戶端(B/S架構也是C/S架構的一種)

  騰訊作為服務端為你提供視頻,你得下個騰訊視頻客戶端才能看它的視頻)

C/S架構與socket的關系:

學習socket就是為了完成C/S架構的開發

二、osi七層

須知一個完整的計算機系統是由硬件、操作系統、應用軟件三者組成,具備了這三個條件,一台計算機系統就可以自己跟自己玩了(打個單機游戲,玩個掃雷啥的)

如果你要跟別人一起玩,那你就需要上網了,什么是互聯網?

互聯網的核心就是由一堆協議組成,協議就是標准,比如全世界人通信的標准是英語

如果把計算機比作人,互聯網協議就是計算機界的英語。所有的計算機都學會了互聯網協議,那所有的計算機都就可以按照統一的標准去收發信息從而完成通信了。

人們按照分工不同把互聯網協議從邏輯上划分了層級,可以看下圖:

所以,學習socket之前要先學習互聯網協議。

三、socket層

經過上圖和下圖的對比,我們不難發現socket層的所在。

socket的是什么呢?

Socket是應用層與TCP/IP協議族通信的中間軟件抽象層,它是一組接口。在設計模式中,Socket其實就是一個門面模式,它把復雜的TCP/IP協議族隱藏在Socket接口后面,對用戶來說,一組簡單的接口就是全部,讓Socket去組織數據,以符合指定的協議。

所以,我們無需深入理解tcp/udp協議,socket已經為我們封裝好了,我們只需要遵循socket的規定去編程,寫出的程序自然就是遵循tcp/udp標准的。

也有人將socket說成ip+port,ip是用來標識互聯網中的一台主機的位置,而port是用來標識這台機器上的一個應用程序,ip地址是配置到網卡上的,而port是應用程序開啟的,ip與port的綁定就標識了互聯網中獨一無二的一個應用程序,而程序的pid是同一台機器上不同進程或者線程的標識

四、套接字發展史及分類

套接字起源於 20 世紀 70 年代加利福尼亞大學伯克利分校版本的 Unix,即人們所說的 BSD Unix。 因此,有時人們也把套接字稱為“伯克利套接字”或“BSD 套接字”。一開始,套接字被設計用在同 一台主機上多個應用程序之間的通訊。這也被稱進程間通訊,或 IPC。套接字有兩種(或者稱為有兩個種族),分別是基於文件型的和基於網絡型的。

基於文件類型的套接字家族

套接字家族的名字:AF_UNIX

unix一切皆文件,基於文件的套接字調用的就是底層的文件系統來取數據,兩個套接字進程運行在同一機器,可以通過訪問同一個文件系統間接完成通信

基於網絡類型的套接字家族

套接字家族的名字:AF_INET

(還有AF_INET6被用於ipv6,還有一些其他的地址家族,不過,他們要么是只用於某個平台,要么就是已經被廢棄,或者是很少被使用,或者是根本沒有實現,所有地址家族中,AF_INET是使用最廣泛的一個,python支持很多種地址家族,但是由於我們只關心網絡編程,所以大部分時候我么只使用AF_INET)。

套接字工作流程

先從服務器端說起。服務器端先初始化Socket,然后與端口綁定(bind),對端口進行監聽(listen),調用accept阻塞,等待客戶端連接。在這時如果有個客戶端初始化一個Socket,然后連接服務器(connect),如果連接成功,這時客戶端與服務器端的連接就建立了。客戶端發送數據請求,服務器端接收請求並處理請求,然后把回應數據發送給客戶端,客戶端讀取數據,最后關閉連接,一次交互結束

socket()模塊函數用法

  import socket
  socket.socket(socket_family,socket_type,protocal=0)
  socket_family 可以是 AF_UNIX 或 AF_INET。socket_type 可以是 SOCK_STREAM 或 SOCK_DGRAM。protocol 一般不填,默認值為 0。
  
  獲取tcp/ip套接字
  tcpSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  
  獲取udp/ip套接字
  udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 
 由於 socket 模塊中有太多的屬性。我們在這里破例使用了'from module import *'語句。使用 'from socket import *',我們就把 socket 模塊里的所有屬性都帶到我們的命名空間里了,這樣能 大幅減短我們的代碼。
 例如tcpSock = socket(AF_INET, SOCK_STREAM)
View Code
服務端套接字函數
s.bind() 綁定(主機,端口號)到套接字
s.listen() 開始TCP監聽
s.accept() 被動接受TCP客戶的連接,(阻塞式)等待連接的到來

客戶端套接字函數
s.connect() 主動初始化TCP服務器連接
s.connect_ex() connect()函數的擴展版本,出錯時返回出錯碼,而不是拋出異常

公共用途的套接字函數
s.recv() 接收TCP數據
s.send() 發送TCP數據(send在待發送數據量大於己端緩存區剩余空間時,數據丟失,不會發完)
s.sendall() 發送完整的TCP數據(本質就是循環調用send,sendall在待發送數據量大於己端緩存區剩余空間時,數據不丟失,循環調用send直到發完)
s.recvfrom() 接收UDP數據
s.sendto() 發送UDP數據
s.getpeername() 連接到當前套接字的遠端的地址
s.getsockname() 當前套接字的地址
s.getsockopt() 返回指定套接字的參數
s.setsockopt() 設置指定套接字的參數
s.close() 關閉套接字
面向鎖的套接字方法
s.setblocking() 設置套接字的阻塞與非阻塞模式
s.settimeout() 設置阻塞套接字操作的超時時間
s.gettimeout() 得到阻塞套接字操作的超時時間

面向文件的套接字的函數
s.fileno() 套接字的文件描述符
s.makefile() 創建一個與該套接字相關的文件

五、基於TCP的套接字

tcp是基於鏈接的,必須先啟動服務端,然后再啟動客戶端去鏈接服務端

tcp服務端:

 ss = socket() #創建服務器套接字
 ss.bind()      #把地址綁定到套接字
 ss.listen()      #監聽鏈接
 inf_loop:      #服務器無限循環
     cs = ss.accept() #接受客戶端鏈接
     comm_loop:         #通訊循環
         cs.recv()/cs.send() #對話(接收與發送)
     cs.close()    #關閉客戶端套接字
 ss.close()        #關閉服務器套接字(可選)

tcp客戶端:

 cs = socket()    # 創建客戶套接字
 cs.connect()    # 嘗試連接服務器
 comm_loop:        # 通訊循環
     cs.send()/cs.recv()    # 對話(發送/接收)
 cs.close()            # 關閉客戶套接字

socket通信流程與打電話流程類似,我們就以打電話為例來實現一個low版的套接字通信:

import socket
ip_port=('127.0.0.1',9000)  #電話卡
BUFSIZE=1024                #收發消息的尺寸
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) #買手機
s.bind(ip_port) #手機插卡
s.listen(5)     #手機待機


conn,addr=s.accept()            #手機接電話
# print(conn)
# print(addr)
print('接到來自%s的電話' %addr[0])

msg=conn.recv(BUFSIZE)             #聽消息,聽話
print(msg,type(msg))

conn.send(msg.upper())          #發消息,說話

conn.close()                    #掛電話

s.close()                       #手機關機
服務端
import socket
ip_port=('127.0.0.1',9000)
BUFSIZE=1024
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)

s.connect_ex(ip_port)           #撥電話

s.send('linhaifeng nb'.encode('utf-8'))         #發消息,說話(只能發送字節類型)

feedback=s.recv(BUFSIZE)                           #收消息,聽話
print(feedback.decode('utf-8'))

s.close()                                       #掛電話
客戶端

加上鏈接循環與通信循環

import socket
ip_port=('127.0.0.1',8081)#電話卡
BUFSIZE=1024
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) #買手機
s.bind(ip_port) #手機插卡
s.listen(5)     #手機待機


while True:                         #新增接收鏈接循環,可以不停的接電話
    conn,addr=s.accept()            #手機接電話
    # print(conn)
    # print(addr)
    print('接到來自%s的電話' %addr[0])
    while True:                         #新增通信循環,可以不斷的通信,收發消息
        msg=conn.recv(BUFSIZE)             #聽消息,聽話

        # if len(msg) == 0:break        #如果不加,那么正在鏈接的客戶端突然斷開,recv便不再阻塞,死循環發生

        print(msg,type(msg))

        conn.send(msg.upper())          #發消息,說話

    conn.close()                    #掛電話

s.close()                       #手機關機
server改進
import socket
ip_port=('127.0.0.1',8081)
BUFSIZE=1024
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)

s.connect_ex(ip_port)           #撥電話

while True:                             #新增通信循環,客戶端可以不斷發收消息
    msg=input('>>: ').strip()
    if len(msg) == 0:continue
    s.send(msg.encode('utf-8'))         #發消息,說話(只能發送字節類型)

    feedback=s.recv(BUFSIZE)                           #收消息,聽話
    print(feedback.decode('utf-8'))

s.close()                                       #掛電話
client改進

六、基於UDP的套接字

UDP是無鏈接的,先啟動那一端都不會報錯。

udp服務端:

1 ss = socket()   #創建一個服務器的套接字
2 ss.bind()       #綁定服務器套接字
3 inf_loop:       #服務器無限循環
4     cs = ss.recvfrom()/ss.sendto() # 對話(接收與發送)
5 ss.close()                         # 關閉服務器套接字

udp客戶端

1 cs = socket()   # 創建客戶套接字
2 comm_loop:      # 通訊循環
3     cs.sendto()/cs.recvfrom()   # 對話(發送/接收)
4 cs.close()                      # 關閉客戶套接字

udp套接字簡單編程

from socket import *
ip_port = ('127.0.0.1',1234)
buffer_size = 1024

udp_server = socket(AF_INET,SOCK_DGRAM)#數據報
udp_server.bind(ip_port)

while True:
    data,addr = udp_server.recvfrom(buffer_size)

    print(data)
    udp_server.sendto(data.upper(),addr)
服務器端
from socket import *

ip_port = ('127.0.0.1', 1234)
buffer_size = 1024

udp_client = socket(AF_INET, SOCK_DGRAM)  # 數據報


while True:
    msg = input('>>:').strip()
    udp_client.sendto(msg.encode('utf-8'),ip_port)

    data,addr = udp_client.recvfrom(buffer_size)
    print(data.decode('utf-8'))
客戶端

時間服務器示例

#時間管理器

from socket import *
import time
ip_port = ('127.0.0.1',1234)
buffer_size = 1024

udp_server = socket(AF_INET,SOCK_DGRAM)#數據報
udp_server.bind(ip_port)

while True:
    data,addr = udp_server.recvfrom(buffer_size)
    if not data:
        t='%Y-%m-%d %X'
    else:
        t=data.decode('utf-8')
    back_time = time.strftime(t)
    udp_server.sendto(back_time.encode('utf-8'),addr)
ntp_server
from socket import *

ip_port = ('127.0.0.1', 1234)
buffer_size = 1024

udp_client = socket(AF_INET, SOCK_DGRAM)  # 數據報


while True:
    msg = input('>>:').strip()
    udp_client.sendto(msg.encode('utf-8'),ip_port)

    data,addr = udp_client.recvfrom(buffer_size)
    print(data.decode('utf-8'))
ntp_client

七、粘包問題

讓我們基於tcp先編程一個遠程執行命令的程序(1:執行錯誤命令 2:執行ls 3:執行ifconfig)

from socket import *
import subprocess
import struct
ip_port = ('127.0.0.1',8080)
back_log = 5
buffer_size = 1024

 tcp_server = socket(AF_INET,SOCK_STREAM)
 tcp_server.bind(ip_port)
 tcp_server.listen(back_log)

 while True:
     conn,addr = tcp_server.accept()
     print('新客戶端鏈接',addr)
     while True:
         try:#處理客戶端斷開,服務端繼續進行任務,
             cmd = conn.recv(buffer_size)
             if not cmd:break  #處理quit命令導致的數據為空,導致死循環
             print('收到客戶端的命令',cmd)
             #執行命令,得到命令的運行結果cmd _res
             res= subprocess.Popen(cmd.decode('utf-8'),shell=True,
                                   stderr= subprocess.PIPE,
                                   stdout = subprocess.PIPE,
                                   stdin = subprocess.PIPE )
             err =res.stderr.read()
             if err:
                 cmd_res = err
             else:
                 cmd_res = res.stdout.read()
             #
             if not cmd_res:
                  cmd_res='執行成功'.encode('gbk')
             conn.send(cmd_res)
         except Exception as e:
             print(e)
             break
server
from socket import *

ip_port = ('127.0.0.1',8080)
back_log = 5
buffer_size = 1024

 tcp_client=socket(AF_INET,SOCK_STREAM)
 tcp_client.connect(ip_port)

 while True:
     cmd = input('>>>:').strip()
     if not cmd:continue
     if cmd =='quit':break
     tcp_client.send(cmd.encode('utf-8'))
     cmd_res=tcp_client.recv(buffer_size)
     print('命令的執行結果是:',cmd_res.decode('gbk'))
 tcp_client.close()
client

上述程序是基於tcp的socket,在運行時會發生粘包。

在基於UDP編程一個遠程執行命令的程序

from socket import *
import subprocess

ip_port=('127.0.0.1',9003)
bufsize=1024

udp_server=socket(AF_INET,SOCK_DGRAM)
udp_server.bind(ip_port)

while True:
    #收消息
    cmd,addr=udp_server.recvfrom(bufsize)
    print('用戶命令----->',cmd)

    #邏輯處理
    res=subprocess.Popen(cmd.decode('utf-8'),shell=True,stderr=subprocess.PIPE,stdin=subprocess.PIPE,stdout=subprocess.PIPE)
    stderr=res.stderr.read()
    stdout=res.stdout.read()

    #發消息
    udp_server.sendto(stderr,addr)
    udp_server.sendto(stdout,addr)
udp_server.close()
server_udp
from socket import *
ip_port=('127.0.0.1',9003)
bufsize=1024

udp_client=socket(AF_INET,SOCK_DGRAM)


while True:
    msg=input('>>: ').strip()
    udp_client.sendto(msg.encode('utf-8'),ip_port)

    data,addr=udp_client.recvfrom(bufsize)
    print(data.decode('utf-8'),end='')
client _udp

上述程序是基於udp的socket,在運行時永遠不會發生粘包

那么,粘包是怎么造成的?

 

只有TCP有粘包現象,UDP永遠不會粘包,首先需要掌握一個socket收發消息的原理

發送端可以是一K一K地發送數據,而接收端的應用程序可以兩K兩K地提走數據,當然也有可能一次提走3K或6K數據,或者一次只提走幾個字節的數據,也就是說,應用程序所看到的數據是一個整體,或說是一個流(stream),一條消息有多少字節對應用程序是不可見的,因此TCP協議是面向流的協議,這也是容易出現粘包問題的原因。而UDP是面向消息的協議,每個UDP段都是一條消息,應用程序必須以消息為單位提取數據,不能一次提取任意字節的數據,這一點和TCP是很不同的。怎樣定義消息呢?可以認為對方一次性write/send的數據為一個消息,需要明白的是當對方send一條信息的時候,無論底層怎樣分段分片,TCP協議層會把構成整條消息的數據段排序完成后才呈現在內核緩沖區。

例如基於tcp的套接字客戶端往服務端上傳文件,發送時文件內容是按照一段一段的字節流發送的,在接收方看了,根本不知道該文件的字節流從何處開始,在何處結束.

所謂粘包問題主要還是因為接收方不知道消息之間的界限,不知道一次性提取多少字節的數據所造成的。

此外,發送方引起的粘包是由TCP協議本身造成的,TCP為提高傳輸效率,發送方往往要收集到足夠多的數據后才發送一個TCP段。若連續幾次需要send的數據都很少,通常TCP會根據優化算法把這些數據合成一個TCP段后一次發送出去,這樣接收方就收到了粘包數據。

  1. TCP(transport control protocol,傳輸控制協議)是面向連接的,面向流的,提供高可靠性服務。收發兩端(客戶端和服務器端)都要有一一成對的socket,因此,發送端為了將多個發往接收端的包,更有效的發到對方,使用了優化方法(Nagle算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然后進行封包。這樣,接收端,就難於分辨出來了,必須提供科學的拆包機制。 即面向流的通信是無消息保護邊界的。
  2. UDP(user datagram protocol,用戶數據報協議)是無連接的,面向消息的,提供高效率服務。不會使用塊的合並優化算法,, 由於UDP支持的是一對多的模式,所以接收端的skbuff(套接字緩沖區)采用了鏈式結構來記錄每一個到達的UDP包,在每個UDP包中就有了消息頭(消息來源地址,端口等信息),這樣,對於接收端來說,就容易進行區分處理了。 即面向消息的通信是有消息保護邊界的。
  3. tcp是基於數據流的,於是收發的消息不能為空,這就需要在客戶端和服務端都添加空消息的處理機制,防止程序卡住,而udp是基於數據報的,即便是你輸入的是空內容(直接回車),那也不是空消息,udp協議會幫你封裝上消息頭.

 

udp的recvfrom是阻塞的,一個recvfrom(x)必須對唯一一個sendinto(y),收完了x個字節的數據就算完成,若是y>x數據就丟失,這意味着udp根本不會粘包,但是會丟數據,不可靠

tcp的協議數據不會丟,沒有收完包,下次接收,會繼續上次繼續接收,己端總是在收到ack時才會清除緩沖區內容。數據是可靠的,但是會粘包。

兩種情況下會發生粘包。

發送端需要等緩沖區滿才發送出去,造成粘包(發送數據時間間隔很短,數據了很小,會合到一起,產生粘包)

from socket import *
ip_port=('127.0.0.1',8080)

tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(5)


conn,addr=tcp_socket_server.accept()


data1=conn.recv(10)
data2=conn.recv(10)

print('----->',data1.decode('utf-8'))
print('----->',data2.decode('utf-8'))

conn.close()
server
import socket
BUFSIZE=1024
ip_port=('127.0.0.1',8080)

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(ip_port)


s.send('hello'.encode('utf-8'))
s.send('feng'.encode('utf-8'))
client

接收方不及時接收緩沖區的包,造成多個包接收(客戶端發送了一段數據,服務端只收了一小部分,服務端下次再收的時候還是從緩沖區拿上次遺留的數據,產生粘包)

from socket import *
ip_port=('127.0.0.1',8080)

tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(5)


conn,addr=tcp_socket_server.accept()


data1=conn.recv(2) #一次沒有收完整
data2=conn.recv(10)#下次收的時候,會先取舊的數據,然后取新的

print('----->',data1.decode('utf-8'))
print('----->',data2.decode('utf-8'))

conn.close()
server
import socket
BUFSIZE=1024
ip_port=('127.0.0.1',8080)

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(ip_port)


s.send('hello feng'.encode('utf-8'))
client

拆包的發生情況

當發送端緩沖區的長度大於網卡的MTU時,tcp會將這次發送的數據拆成幾個數據包發送出去。

補充問題一:為何tcp是可靠傳輸,udp是不可靠傳輸

tcp在數據傳輸時,發送端先把數據發送到自己的緩存中,然后協議控制將緩存中的數據發往對端,對端返回一個ack=1,發送端則清理緩存中的數據,對端返回ack=0,則重新發送數據,所以tcp是可靠的

而udp發送數據,對端是不會返回確認信息的,因此不可靠

補充問題二:send(字節流)和recv(1024)及sendall

recv里指定的1024意思是從緩存里一次拿出1024個字節的數據

send的字節流是先放入己端緩存,然后由協議控制將緩存內容發往對端,如果待發送的字節流大小大於緩存剩余空間,那么數據丟失,用sendall就會循環調用send,數據不會丟失.

八、粘包的解決方法

問題的根源在於,接收端不知道發送端將要傳送的字節流的長度,所以解決粘包的方法就是圍繞,如何讓發送端在發送數據前,把自己將要發送的字節流總大小讓接收端知曉,然后接收端來一個死循環接收完所有數據。

第一種的解決方法

import socket,subprocess
ip_port=('127.0.0.1',8080)
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

s.bind(ip_port)
s.listen(5)

while True:
    conn,addr=s.accept()
    print('客戶端',addr)
    while True:
        msg=conn.recv(1024)
        if not msg:break
        res=subprocess.Popen(msg.decode('utf-8'),shell=True,\
                            stdin=subprocess.PIPE,\
                         stderr=subprocess.PIPE,\
                         stdout=subprocess.PIPE)
        err=res.stderr.read()
        if err:
            ret=err
        else:
            ret=res.stdout.read()
        data_length=len(ret)
        conn.send(str(data_length).encode('utf-8'))
        data=conn.recv(1024).decode('utf-8')
        if data == 'recv_ready':
            conn.sendall(ret)
    conn.close()
server
import socket,time
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if len(msg) == 0:continue
    if msg == 'quit':break

    s.send(msg.encode('utf-8'))
    length=int(s.recv(1024).decode('utf-8'))
    s.send('recv_ready'.encode('utf-8'))
    send_size=0
    recv_size=0
    data=b''
    while recv_size < length:
        data+=s.recv(1024)
        recv_size+=len(data)


    print(data.decode('utf-8'))
client

上訴程序,程序的運行速度遠快於網絡傳輸速度,所以在發送一段字節前,先用send去發送該字節流長度,這種方式會放大網絡延遲帶來的性能損耗。

所以我們可以,為字節流加上自定義固定長度報頭,報頭中包含字節流長度,然后一次send到對端,對端在接收時,先從緩存中取出定長的報頭,然后再取真實數據,引用struct模塊。

import json,struct
#假設通過客戶端上傳1T:1073741824000的文件a.txt

#為避免粘包,必須自定制報頭
header={'file_size':1073741824000,'file_name':'/a/b/c/d/e/a.txt','md5':'8f6fbf8347faa4924a76856701edb0f3'} #1T數據,文件路徑和md5值

#為了該報頭能傳送,需要序列化並且轉為bytes
head_bytes=bytes(json.dumps(header),encoding='utf-8') #序列化並轉成bytes,用於傳輸

#為了讓客戶端知道報頭的長度,用struck將報頭長度這個數字轉成固定長度:4個字節
head_len_bytes=struct.pack('i',len(head_bytes)) #這4個字節里只包含了一個數字,該數字是報頭的長度

#客戶端開始發送
conn.send(head_len_bytes) #先發報頭的長度,4個bytes
conn.send(head_bytes) #再發報頭的字節格式
conn.sendall(文件內容) #然后發真實內容的字節格式

#服務端開始接收
head_len_bytes=s.recv(4) #先收報頭4個bytes,得到報頭長度的字節格式
x=struct.unpack('i',head_len_bytes)[0] #提取報頭的長度

head_bytes=s.recv(x) #按照報頭長度x,收取報頭的bytes格式
header=json.loads(json.dumps(header)) #提取報頭

#最后根據報頭的內容提取真實的數據,比如
real_data_len=s.recv(header['file_size'])
s.recv(real_data_len)
import socket,struct,json
import subprocess
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加

phone.bind(('127.0.0.1',8080))

phone.listen(5)

while True:
    conn,addr=phone.accept()
    while True:
        cmd=conn.recv(1024)
        if not cmd:break
        print('cmd: %s' %cmd)

        res=subprocess.Popen(cmd.decode('utf-8'),
                             shell=True,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        err=res.stderr.read()
        print(err)
        if err:
            back_msg=err
        else:
            back_msg=res.stdout.read()


        conn.send(struct.pack('i',len(back_msg))) #先發back_msg的長度
        conn.sendall(back_msg) #在發真實的內容

    conn.close()
server
import socket,time,struct

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if len(msg) == 0:continue
    if msg == 'quit':break

    s.send(msg.encode('utf-8'))



    l=s.recv(4)
    x=struct.unpack('i',l)[0]
    print(type(x),x)
    # print(struct.unpack('I',l))
    r_s=0
    data=b''
    while r_s < x:
        r_d=s.recv(1024)
        data+=r_d
        r_s+=len(r_d)

    # print(data.decode('utf-8'))
    print(data.decode('gbk')) #windows默認gbk編碼
client

我們可以把報頭做成字典,字典里包含將要發送的真實數據的詳細信息,然后json序列化,然后用struck將序列化后的數據長度打包成4個字節(4個自己足夠用了)

發送時:先發報頭長度,再編碼報頭內容然后發送,最后發真實內容

接收時:先手報頭長度,用struct取出來,根據取出的長度收取報頭內容,然后解碼,反序列化,從反序列化的結果中取出待取數據的詳細信息,然后去取真實的數據內容

九、socketserver實現並發

 

基於tcp的套接字,關鍵就是兩個循環,一個鏈接循環,一個通信循環

 

socketserver模塊中分兩大類:server類(解決鏈接問題)和request類(解決通信問題)

 

server類:

request類:

繼承關系;

 

 

 

 

以下述代碼為例,分析socketserver源碼:

ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()

查找屬性的順序:ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer

  1. 實例化得到ftpserver,先找類ThreadingTCPServer的__init__,在TCPServer中找到,進而執行server_bind,server_active
  2. 找ftpserver下的serve_forever,在BaseServer中找到,進而執行self._handle_request_noblock(),該方法同樣是在BaseServer中
  3. 執行self._handle_request_noblock()進而執行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),然后執行self.process_request(request, client_address)
  4. 在ThreadingMixIn中找到process_request,開啟多線程應對並發,進而執行process_request_thread,執行self.finish_request(request, client_address)
  5. 上述四部分完成了鏈接循環,本部分開始進入處理通訊部分,在BaseServer中找到finish_request,觸發我們自己定義的類的實例化,去找__init__方法,而我們自己定義的類沒有該方法,則去它的父類也就是BaseRequestHandler中找....

源碼分析總結:

基於tcp的socketserver我們自己定義的類中的

  1.   self.server即套接字對象
  2.   self.request即一個鏈接
  3.   self.client_address即客戶端地址

基於udp的socketserver我們自己定義的類中的

  1. self.request是一個元組(第一個元素是客戶端發來的數據,第二部分是服務端的udp套接字對象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
  2. self.client_address即客戶端地址

示例代碼:

import socketserver
import struct
import json
import os
class FtpServer(socketserver.BaseRequestHandler):
    coding='utf-8'
    server_dir='file_upload'
    max_packet_size=1024
    BASE_DIR=os.path.dirname(os.path.abspath(__file__))
    def handle(self):
        print(self.request)
        while True:
            data=self.request.recv(4)
            data_len=struct.unpack('i',data)[0]
            head_json=self.request.recv(data_len).decode(self.coding)
            head_dic=json.loads(head_json)
            # print(head_dic)
            cmd=head_dic['cmd']
            if hasattr(self,cmd):
                func=getattr(self,cmd)
                func(head_dic)
    def put(self,args):
        file_path = os.path.normpath(os.path.join(
            self.BASE_DIR,
            self.server_dir,
            args['filename']
        ))

        filesize = args['filesize']
        recv_size = 0
        print('----->', file_path)
        with open(file_path, 'wb') as f:
            while recv_size < filesize:
                recv_data = self.request.recv(self.max_packet_size)
                f.write(recv_data)
                recv_size += len(recv_data)
                print('recvsize:%s filesize:%s' % (recv_size, filesize))


ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()
server
import socket
import struct
import json
import os



class MYTCPClient:
    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    allow_reuse_address = False

    max_packet_size = 8192

    coding='utf-8'

    request_queue_size = 5

    def __init__(self, server_address, connect=True):
        self.server_address=server_address
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if connect:
            try:
                self.client_connect()
            except:
                self.client_close()
                raise

    def client_connect(self):
        self.socket.connect(self.server_address)

    def client_close(self):
        self.socket.close()

    def run(self):
        while True:
            inp=input(">>: ").strip()
            if not inp:continue
            l=inp.split()
            cmd=l[0]
            if hasattr(self,cmd):
                func=getattr(self,cmd)
                func(l)


    def put(self,args):
        cmd=args[0]
        filename=args[1]
        if not os.path.isfile(filename):
            print('file:%s is not exists' %filename)
            return
        else:
            filesize=os.path.getsize(filename)

        head_dic={'cmd':cmd,'filename':os.path.basename(filename),'filesize':filesize}
        print(head_dic)
        head_json=json.dumps(head_dic)
        head_json_bytes=bytes(head_json,encoding=self.coding)

        head_struct=struct.pack('i',len(head_json_bytes))
        self.socket.send(head_struct)
        self.socket.send(head_json_bytes)
        send_size=0
        with open(filename,'rb') as f:
            for line in f:
                self.socket.send(line)
                send_size+=len(line)
                print(send_size)
            else:
                print('upload successful')




client=MYTCPClient(('127.0.0.1',8080))

client.run()
client

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM