Python自動化運維之15、網絡編程之socket、socketserver、select、twisted


一、TCP/IP相關知識

TCP/UDP提供進程地址,兩個協議互不干擾的獨自的協議
      TCP :Transmission Control Protocol 傳輸控制協議,面向連接的協議,通信前需要建立通信信道(虛擬鏈路),結束后拆除鏈路,流式數據協議,可靠的連接                      
  UDP:User Datagram Protocol 用戶數據報協議,無連接的協議,不可靠的連接
 
IP是主機到主機之間,在傳輸過程中是不會變的,不能超過MTU:最大傳輸單元
MAC是設備到設備之間通信的,在傳輸中會不斷封裝與解封裝,會不斷變化的
 
數據傳輸過程,tcp/ip各種協議會將數據切割成數據報文,IP協議是無連接的,不可靠的,但是上傳的傳輸層協議中的tcp是可靠的,安全的,IP可能在分段,最大數據報文不能超過MTU,1500字節,IP報文就剩1400左右字節

                                  

應用層、表示層、會話層(資源子網,用戶進程)
傳輸層、網絡層、數據鏈路層、物理層(通信子網,內核空間)
 
 
TCP協議的特性:
  • 建立連接,三次握手
  • 將數據打包成段,校驗和(CRC-32循環冗余校驗法),檢驗報文的完整性
  • 確認、重傳以及超時
  • 排序,邏輯序號
  • 流量控制:防止快發慢收,滑動窗口(算法)來實現
  • 擁塞控制:慢啟動和擁塞避免算法避免擁塞
 

二、socket

  socket通常也稱作"套接字",用於描述IP地址和端口,是一個通信鏈的句柄,應用程序通常通過"套接字"向網絡發出請求或者應答網絡請求。
socket起源於Unix,而Unix/Linux基本哲學之一就是“一切皆文件”,對於文件用【打開】【讀寫】【關閉】模式來操作。socket就是該模式的一個實現,socket即是一種特殊的文件,一些socket函數就是對其進行的操作(讀/寫IO、打開、關閉)

socket和file的區別:

  • file模塊是針對某個指定文件進行【打開】【讀寫】【關閉】
  • socket模塊是針對 服務器端 和 客戶端Socket 進行【打開】【讀寫】【關閉】

socket相關知識

  應用程序中進程要用某個套接字要向內核注冊申請端口,比如http監聽在80端口,客戶端web瀏覽器進程則開啟的是隨機端口41052-65535之間的端口,連接http的80端口。
 
socket:允許位於不同主機(甚至同一主機)上不同進程之間進程之間通信(數據交換)Socket API,早期在BSD中使用
  • SOCK_STREAM:tcp 套接字
  • SOCK_DGRAM: udp 套接字
  • SOCK:裸套接字
socket domain:套接字域(根據其所使用的地址)
  • AF_INET:Address Family,IPv4
  • AF_INET6:IPv6
  • AF_UNIX:同一主機上不同進程的通信,不經過tcp/ip,直接通過內核調用套接字通信,共享內存通信
每類套接字都至少提供了兩種socket:流,數據報文
  • 流:tcp可靠的傳遞、面向連接、無邊界數據傳遞
  • 數據報文:udp不可靠的傳遞、有邊界,無連接
IANA(互聯網數字分配機構)規定:
  • 0-1023:眾所周知,永久的分配給固定的應用使用,特權端口,只有管理員root才有權限使用;22/tcp(ssh),80/tcp(http),443/tcp(https)
  • 1024-41951:亦為注冊端口,但要求並不是特別嚴格,分配給程序注冊為某應用使用
  • 41952-65535:客戶端程序隨機使用的端口,稱為動態端口,或私有端口,其范圍的定義:/proc/sys/net/ipv4/ip_local_port_range,並發場景中可以修改大一些只要不包含1023之前的端口即可。1024 65535
socket模塊是針對 服務器端 和 客戶端Socket 進行【打開】【讀寫】【關閉】,一個完整的套接字模型圖如下圖所示:
                                    

1、簡單運用

(1)簡單實現一個TCP SOCKET的C/S架構,S服務器,C客戶端:

socker_server.py 
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
ip_add=('127.0.0.1',9000)

s = socket.socket()     # 創建套接字對象
s.bind(ip_add)          # 綁定ip和端口必須是元組
s.listen(5)             # 設置連接池掛起的數量

while True:
    conn,addr = s.accept()  # 接受客戶端的連接,conn是客戶端連接服務端的電信號,addr客戶端ip,port

    while True:
        try:
            recv_data = conn.recv(1024)    # conn.recv接收客戶端信息1024允許接受的字節最大8k
            if len(recv_data) == 0:break

            send_data = recv_data.upper()
            conn.send(send_data)   # conn.send發送信息
        except Exception:
            break

    conn.close()
socker_client.py 
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
ip_add=('127.0.0.1',9000)

s = socket.socket()  # 創建套接字對象
s.connect(ip_add)    # s.connect連接服務器

while True:
    send_data = input(">>>:")    
    if send_data == 'exit': break
    if len(send_data) == 0:continue
    s.send(bytes(send_data,encoding="utf-8"))   # s.send發送信息

    recv_data = s.recv(1024)    # s.recv接收服務器發來的信息
    print(str(recv_data,encoding="utf-8"))

s.close()

根據socket的流程圖簡單實現了,客戶端與服務器的交互,socket的工作流程圖很重要,上面的流程圖是TCP的,UDP簡單些沒有那么多的交互

(2)簡單實現一個UDP SOCKET的C/S架構,S服務器,C客戶端:

# 服務端
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
sk.bind(ip_port)

while True:
    data,(host,port) = sk.recvfrom(1024)
    print(data,host,port)
    sk.sendto(bytes('ok', encoding='utf-8'), (host,port))


#客戶端
import socket
ip_port = ('127.0.0.1',9999)

sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
while True:
    inp = input('數據:').strip()
    if inp == 'exit':
        break
    sk.sendto(bytes(inp, encoding='utf-8'),ip_port)
    data = sk.recvfrom(1024)
    print(data)

sk.close()

2、socker()類相關方法:

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)

參數一:地址簇

  socket.AF_INET IPv4(默認)
  socket.AF_INET6 IPv6

  socket.AF_UNIX 只能夠用於單一的Unix系統進程間通信

參數二:類型

  socket.SOCK_STREAM  流式socket , for TCP (默認)
  socket.SOCK_DGRAM   數據報式socket , for UDP

  socket.SOCK_RAW 原始套接字,普通的套接字無法處理ICMP、IGMP等網絡報文,而SOCK_RAW可以;其次,SOCK_RAW也可以處理特殊的IPv4報文;此外,利用原始套接字,可以通過IP_HDRINCL套接字選項由用戶構造IP頭。
  socket.SOCK_RDM 是一種可靠的UDP形式,即保證交付數據報但不保證順序。SOCK_RAM用來提供對原始協議的低級訪問,在需要執行某些特殊操作時使用,如發送ICMP報文。SOCK_RAM通常僅限於高級用戶或管理員運行的程序使用。
  socket.SOCK_SEQPACKET 可靠的連續數據包服務

參數三:協議

      0  (默認)與特定的地址家族相關的協議,如果是 0 ,則系統就會根據地址格式和套接類別,自動選擇一個合適的協議

sk.bind(address)

  s.bind(address) 將套接字綁定到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。

sk.listen(backlog)

  開始監聽傳入連接。backlog指定在拒絕連接之前,可以掛起的最大連接數量。

      backlog等於5,表示內核已經接到了連接請求,但服務器還沒有調用accept進行處理的連接個數最大為5
      這個值不能無限大,因為要在內核中維護連接隊列

sk.setblocking(bool)

  是否阻塞(默認True),如果設置False,那么accept和recv時一旦無數據,則報錯。

sk.accept()

  接受連接並返回(conn,address),其中conn是新的套接字對象,可以用來接收和發送數據。address是連接客戶端的地址。
  接收TCP 客戶的連接(阻塞式)等待連接的到來

sk.connect(address)

  連接到address處的套接字。一般,address的格式為元組(hostname,port),如果連接出錯,返回socket.error錯誤。

sk.connect_ex(address)

  同上,只不過會有返回值,連接成功時返回 0 ,連接失敗時候返回編碼,例如:10061

sk.close()

  關閉套接字

sk.recv(bufsize[,flag])

  接受套接字的數據。數據以字符串形式返回,bufsize指定最多可以接收的數量。flag提供有關消息的其他信息,通常可以忽略。

sk.recvfrom(bufsize[.flag])

  與recv()類似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。

sk.send(bytes[,flag])

  將string中的數據發送到連接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。即:可能未將指定內容全部發送。

sk.sendall(bytes[,flag])

  將string中的數據發送到連接的套接字,但在返回之前會嘗試發送所有數據。成功返回None,失敗則拋出異常。
   內部通過遞歸調用send,將所有內容發送出去。

sk.sendto(bytes[,flag],address)

  將數據發送到套接字,address是形式為(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。該函數主要用於UDP協議。

sk.settimeout(timeout)

  設置套接字操作的超時期,timeout是一個浮點數,單位是秒。值為None表示沒有超時期。一般,超時期應該在剛創建套接字時設置,因為它們可能用於連接的操作(如 client 連接最多等待5s )

sk.getpeername()

  返回連接套接字的遠程地址。返回值通常是元組(ipaddr,port)。

sk.getsockname()

  返回套接字自己的地址。通常是一個元組(ipaddr,port)

sk.fileno()

  套接字的文件描述符

3、復雜運用

運用socket()實現C/S以及ssh相關操作和解決粘包問題:

服務端在發送數據之前,先把發送數據的長度告訴客戶端,要發送多少數據,然后客戶端根據這個數據的長度循環接收來解決粘包,傳輸過程:
服務端:
    (1).send  #數據長度
    (4).recv  #收到確認信息,開始下一步發送
    send  #發送數據
    
客戶端 :
    (2).recv #獲取數據長度
    (3).send #發送確認信息
    recv #循環接收

socket_ssh_nianbao_server.py  

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
import subprocess
ip_add=('127.0.0.1',9001)


s = socket.socket()
s.bind(ip_add)
s.listen(5)

while True:
    conn,addr = s.accept()

    while True:
        try:
            recv_data = conn.recv(1024)
            if len(recv_data) == 0:break

            p = subprocess.Popen(str(recv_data,encoding="utf-8"),
                                 shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
            res = p.stdout.read()
            err = p.stderr.read()
            if len(res) == 0:
                conn.send(err)
            else:
                send_data = bytes("Ready|%s" %len(res),encoding="utf-8")

            conn.send(send_data)

            feed_back = conn.recv(1024)
            start_tag = str(feed_back,encoding="utf-8")

            if start_tag.startswith("start"):
                conn.send(res)
        except Exception:
            break

    conn.close()

socket_ssh_nianbao_client.py  

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
ip_add=('127.0.0.1',9001)

s = socket.socket()
s.connect(ip_add)

while True:
    send_data = input(">>>:")
    if send_data == 'exit': break
    if len(send_data) == 0:continue
    s.send(bytes(send_data,encoding="utf-8"))

    recv_tag = s.recv(1024)
    recv_tag = str(recv_tag,encoding="utf-8")
    if recv_tag.startswith("Ready"):
        msg_size = int(recv_tag.split("|")[1])

    s.send(bytes("start",encoding="utf-8"))

    recv_size = 0
    msg_data = b''
    while recv_size != msg_size:
        recv_data=s.recv(100)
        recv_size+=len(recv_data)
        msg_data+=recv_data
        print('message size %s recv size %s' %(msg_size,recv_size))

    print(str(msg_data,encoding="utf-8"))

s.close()

4、設置socket 

  setsockopt()和getsockopt(),一個是設置選項,一個是得到設置。這里主要使用setsockopt(),setsockopt(level,optname,value),level定義了哪個選項將被使用。通常情況下是SOL_SOCKET,意思是正在使用的socket選項。

optname參數提供使用的特殊選項。關於可用選項的設置,會因為操作系統的不同而有少許不同。如果level選定了SOL_SOCKET,那么一些常用的選項見下表:


比較常用的用法是,setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) 這里value設置為1,表示將SO_REUSEADDR標記為TRUE,操作系統會在服務器socket被關閉或服務器進程終止后馬上釋放該服務器的端口,否則操作系統會保留幾分鍾該端口。
 

三、socketserver

  socketserver內部使用 IO多路復用 以及 “多線程” 和 “多進程” ,從而實現並發處理多個客戶端請求的Socket服務端。即:每個客戶端請求連接到服務器時,Socket服務端都會在服務器是創建一個“線程”或者“進程” 專門負責處理當前客戶端的所有請求。底層還是對socket進行了封裝和加入線程、進程就實現了socketserver,后面會對socketserver源碼分析
                    

ThreadingTCPServer

ThreadingTCPServer是實現socket服務器的類,內部會為每個client創建一個 “線程”,該線程用來和客戶端進行交互。

ThreadingTCPServer:

  • 創建一個繼承自 SocketServer.BaseRequestHandler 的類
  • 類中必須定義一個名稱為 handle 的方法
  • 啟動ThreadingTCPServer

1、socketserver簡單運用,實現C/S交互通信

socketserver_server.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socketserver
import subprocess
import os

class MyServer(socketserver.BaseRequestHandler):

    def handle(self):

        conn = self.request
        conn.sendall(bytes('歡迎致電 10086,請輸入1xxx,0轉人工服務.',encoding="utf-8"))

        while True:
            data = conn.recv(2014)
            if len(data) == 0: break
            cmd = subprocess.Popen(data.decode(),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
            send_data = cmd.stdout.read()
            err_data = cmd.stderr.read()
            if len(send_data) == 0:
                send_data = bytes("current dirctory is %s" % os.getcwd(),encoding="utf-8")
            if err_data:
                send_data = err_data
            conn.sendall(send_data)

if __name__ == '__main__':
    server = socketserver.ThreadingTCPServer(('127.0.0.1',9000),MyServer)
    server.serve_forever()

socketserver_client.py  

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
ip_add=('127.0.0.1',9000)

s = socket.socket()
s.connect(ip_add)

welcome_msg = s.recv(1024)
print(welcome_msg.decode())
while True:
    send_data = input(">>>:")
    if send_data == 'exit': break
    if len(send_data) == 0:continue
    s.send(bytes(send_data,encoding="utf-8"))

    recv_data = s.recv(1024)
    print(str(recv_data,encoding="utf-8"))

s.close()

2、socketserver復雜運用:實現文件上傳類似 

socketserver_ftp_server.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socketserver
import json

class MyServer(socketserver.BaseRequestHandler):

    def handle(self):

        conn = self.request
        conn.sendall(bytes('歡迎致電 10086,請輸入1xxx,0轉人工服務.',encoding="utf-8"))

        while True:
            data = conn.recv(2014)
            if len(data) == 0: break
            task_data = json.loads(data.decode())
            print(task_data)
            task_action = task_data.get("action")
            if hasattr(self,"task_%s" %task_action):
                func = getattr(self,"task_%s" %task_action)
                func(task_data)

            else:
                print("task action is not supported",task_action)


    def task_put(self,*args,**kwargs):
        print("put",args,kwargs)
        file_size = args[0].get('filesize')
        filename = args[0].get('filename')
        server_response = {"status":200}
        self.request.send(bytes(json.dumps(server_response),encoding="utf"))
        with open('/tmp/' + filename,'wb') as f:
            recv_size = 0
            while recv_size != file_size:
                data = self.request.recv(4096)
                f.write(data)
                recv_size += len(data)
                print("filesize: %s recvsize:%s" %(file_size,recv_size))
            print("recv success")

if __name__ == '__main__':
    server = socketserver.ThreadingTCPServer(('127.0.0.1',9000),MyServer)
    server.serve_forever()

socketserver_ftp_client.py  

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
import os
import json
ip_add=('127.0.0.1',9000)

s = socket.socket()
s.connect(ip_add)

welcome_msg = s.recv(1024)
print(welcome_msg.decode())
while True:

    send_data = input(">>>:").strip()
    if send_data == 'exit': break
    if len(send_data) == 0:continue

    cmd_list = send_data.split()
    if len(cmd_list) < 2:
        print("Usage: put path/to/file")
        continue

    task_type = cmd_list[0]
    if task_type == 'put':
        abs_filepath = cmd_list[1]
        if os.path.isfile(abs_filepath):
            file_size = os.stat(abs_filepath).st_size
            filename = abs_filepath.split('/')[-1]
            print('file:%s size:%s' %(abs_filepath,file_size))

            msg_data = {"action":"put","filename":filename,"filesize":file_size}
            s.send(bytes(json.dumps(msg_data),encoding="utf-8"))

            server_confirmation_msg = s.recv(1024)
            confirm_data = json.loads(server_confirmation_msg.decode())
            if confirm_data['status'] == 200:

                print("start sending file",filename)
                with open(abs_filepath,'rb') as f:
                    for line in f:
                        s.send(line)

                    print("send file done")

        else:
            print("\033[31m file [%s] is not exist \033[0m" %abs_filepath)
            continue

    else:
        print("don't support %s command" % task_type)
        continue


    recv_data = s.recv(1024)
    print(str(recv_data,encoding="utf-8"))

s.close()

3、socketserver源碼分析 

ThreadingTCPServer實現的Soket服務器內部會為每個client創建一個 “線程”,該線程用來和客戶端進行交互。首先來看一下繼承關系圖

 

            

if __name__ == '__main__':
    address = ('127.0.0.1', 8000)
    server = socketserver.ThreadingTCPServer(address, Myserver)
    server.serve_forever() 

上述代碼的內部調用流程為:

  • 啟動服務端程序
  • 執行 TCPServer.__init__ 方法,創建服務端Socket對象並綁定 IP 和 端口
  • 執行 BaseServer.__init__ 方法,將自定義的繼承自SocketServer.BaseRequestHandler 的類 MyRequestHandle賦值給 self.RequestHandlerClass
  • 執行 BaseServer.server_forever 方法,While 循環一直監聽是否有客戶端請求到達 ...
  • 當客戶端連接到達服務器
  • 執行 ThreadingMixIn.process_request 方法,創建一個 “線程” 用來處理請求
  • 執行 ThreadingMixIn.process_request_thread 方法
  • 執行 BaseServer.finish_request 方法,執行 self.RequestHandlerClass()  即:執行 自定義 MyRequestHandler 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyRequestHandler的handle方法)

簡單例子實現源碼重現

import socket
import threading
import select


def process(request, client_address):
    print request,client_address
    conn = request
    conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
    flag = True
    while flag:
        data = conn.recv(1024)
        if data == 'exit':
            flag = False
        elif data == '0':
            conn.sendall('通過可能會被錄音.balabala一大推')
        else:
            conn.sendall('請重新輸入.')

sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(('127.0.0.1',8002))
sk.listen(5)

while True:
    r, w, e = select.select([sk,],[],[],1)
    print 'looping'
    if sk in r:
        print 'get request'
        request, client_address = sk.accept()
        t = threading.Thread(target=process, args=(request, client_address))
        t.daemon = False
        t.start()

sk.close()

可以看出,SocketServer的ThreadingTCPServer之所以可以同時處理請求得益於 select 和 Threading 兩個東西,其實本質上就是在服務器端為每一個客戶端創建一個線程,當前線程用來處理對應客戶端的請求,所以,可以支持同時n個客戶端鏈接(長連接)。  

四、I/O復用 

1、I/O相關知識:點擊這里 

  I/O多路復用指:通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。
I/O類型:
       同步和異步
       阻塞和非阻塞
I/O模型:
  blocking IO: 阻塞IO
  nonbocking IO:非阻塞IO
  IO multiplexing:IO復用
    select()BSD
            poll()SysV
  signal driven IO:事件驅動IO
    epoll(linux上的邊緣觸發)、kqueue
    水平觸發:多次通知,浪費資源
    邊緣觸發:只通知一次
aysnchronous IO:異步IO

2、I/O簡介

Linux中的 select,poll,epoll 都是IO多路復用的機制。
select
 
select最早於1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。
select目前幾乎在所有的平台上支持,其良好跨平台支持也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。
select的一個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其復制的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量TCP連接處於非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。
 
poll
 
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制。
poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增加而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)。
 
epoll
 
直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。
epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。
epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,
這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。 另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基於某個文件描述符就緒時,
內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。

Python中的select、poll、epoll三個方法,在不同系統中實現IO多路復用。  

Windows Python:
    提供: select
Mac Python:
    提供: select
Linux Python:
    提供: select、poll、epoll

注意:網絡操作、文件操作、終端操作等均屬於IO操作,對於windows只支持Socket操作,其他系統支持其他IO操作,但是無法檢測 普通文件操作 自動上次讀取是否已經變化。  

3、select()方法 

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超時時間)
 
參數: 可接受四個參數(前三個必須)
返回值:三個列表
 
select方法用來監視文件句柄,如果句柄發生變化,則獲取該句柄。
1、當 參數1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中
2、當 參數2 序列中含有句柄時,則將該序列中所有的句柄添加到 返回值2 序列中
3、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中
4、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化
   當 超時時間 = 1時,那么如果監聽的句柄均無任何變化,則select會阻塞 1 秒,之后返回三個空列表,如果監聽的句柄有變化,則直接執行。

利用select實現偽同時處理多個Socket客戶端請求:服務端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
import select

ip_add=('127.0.0.1',9000)

sk = socket.socket()
sk.bind(ip_add)
sk.listen(5)

inputs = [sk,]
outputs = []
while True:

    rlist,wlist,e = select.select(inputs,outputs,[],1)
    print(len(inputs),len(rlist),len(wlist),len(outputs))
    # 監聽sk(服務器端)對象,如果sk對象發生變化,表示有客戶端來鏈接了,此時rlist值為【sk】
    # 監聽conn對象,如果conn發生變化,表示客戶端有新消息發送過來了,此時rlist的值為【客戶端】

    for r in rlist:
        if r == sk:
            # 新客戶端來鏈接
            print(r)
            conn,addr = r.accept()
            # conn是什么?其實也是socket對象
            inputs.append(conn)
            conn.sendall(bytes('hello', encoding='utf-8'))
        else:
            # 是以存在的conn對象,給我發消息了
            try:
                ret = r.recv(1024)
                # r.sendall(ret)
                if not ret:
                    raise Exception('斷開鏈接')
                else:
                    outputs.append(r)
            except Exception as e:
                inputs.remove(r)

    # 所有給我發過消息的人
    for w in wlist:

        w.sendall(bytes('response',encoding='utf-8'))
        outputs.remove(w)

利用select實現偽同時處理多個Socket客戶端請求:客戶端  

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
ip_add=('127.0.0.1',9000)

s = socket.socket()
s.connect(ip_add)
data = s.recv(1024)
print(data)
while True:
    send_data = input(">>>:")
    if send_data == 'exit': break
    if len(send_data) == 0:continue

    s.sendall(bytes(send_data,encoding='utf-8'))

    print(s.recv(1024))

s.close()

此處的Socket服務端相比與原生的Socket,他支持當某一個請求不再發送數據時,服務器端不會等待而是可以去處理其他請求的數據。但是,如果每個請求的耗時比較長時,select版本的服務器端也無法完成同時操作。  

五、Twisted

Twisted是一個事件驅動的網絡框架,其中包含了諸多功能,例如:網絡協議、線程、數據庫管理、網絡操作、電子郵件等。

事件驅動

簡而言之,事件驅動分為二個部分:第一,注冊事件;第二,觸發事件。

自定義事件驅動框架,命名為:“弒君者”:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# event_drive.py

event_list = []


def run():
    for event in event_list:
        obj = event()
        obj.execute()


class BaseHandler(object):
    """
    用戶必須繼承該類,從而規范所有類的方法(類似於接口的功能)
    """
    def execute(self):
        raise Exception('you must overwrite execute')

程序員使用“弒君者框架”:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from source import event_drive


class MyHandler(event_drive.BaseHandler):

    def execute(self):
        print 'event-drive execute MyHandler'


event_drive.event_list.append(MyHandler)
event_drive.run()

如上述代碼,事件驅動只不過是框架規定了執行順序,程序員在使用框架時,可以向原執行順序中注冊“事件”,從而在框架執行時可以出發已注冊的“事件”。

 

基於事件驅動Socket

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from twisted.internet import protocol
from twisted.internet import reactor
 
class Echo(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)
 
def main():
    factory = protocol.ServerFactory()
    factory.protocol = Echo
 
    reactor.listenTCP(8000,factory)
    reactor.run()
 
if __name__ == '__main__':
    main()

程序執行流程:

  • 運行服務端程序
  • 創建Protocol的派生類Echo
  • 創建ServerFactory對象,並將Echo類封裝到其protocol字段中
  • 執行reactor的 listenTCP 方法,內部使用 tcp.Port 創建socket server對象,並將該對象添加到了 reactor的set類型的字段 _read 中
  • 執行reactor的 run 方法,內部執行 while 循環,並通過 select 來監視 _read 中文件描述符是否有變化,循環中...
  • 客戶端請求到達
  • 執行reactor的 _doReadOrWrite 方法,其內部通過反射調用 tcp.Port 類的 doRead 方法,內部 accept 客戶端連接並創建Server對象實例(用於封裝客戶端socket信息)和 創建 Echo 對象實例(用於處理請求) ,然后調用 Echo 對象實例的 makeConnection 方法,創建連接。
  • 執行 tcp.Server 類的 doRead 方法,讀取數據,
  • 執行 tcp.Server 類的 _dataReceived 方法,如果讀取數據內容為空(關閉鏈接),否則,出發 Echo 的 dataReceived 方法
  • 執行 Echo 的 dataReceived 方法

從源碼可以看出,上述實例本質上使用了事件驅動的方法 和 IO多路復用的機制來進行Socket的處理。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from twisted.internet import reactor, protocol
from twisted.web.client import getPage
from twisted.internet import reactor
import time

class Echo(protocol.Protocol):

    def dataReceived(self, data):
        deferred1 = getPage('http://cnblogs.com')
        deferred1.addCallback(self.printContents)

        deferred2 = getPage('http://baidu.com')
        deferred2.addCallback(self.printContents)

        for i in range(2):
            time.sleep(1)
            print 'execute ',i


    def execute(self,data):
        self.transport.write(data)

    def printContents(self,content):
        print len(content),content[0:100],time.time()

def main():

    factory = protocol.ServerFactory()
    factory.protocol = Echo

    reactor.listenTCP(8000,factory)
    reactor.run()

if __name__ == '__main__':
    main()

更多請見:

https://twistedmatrix.com/trac
http://twistedmatrix.com/documents/current/api/
http://blog.sina.com.cn/s/blog_704b6af70100py9n.html
http://blog.csdn.net/hanhuili/article/details/9389433


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM