並發服務器幾種實現方法總結


今天主題是實現並發服務器,實現方法有多種版本,先從簡單的單進程代碼實現到多進程,多線程的實現,最終引入一些高級模塊來實現並發TCP服務器。

說到TCP,想起吐槽大會有個段子提到三次握手,也只有程序猿(媛)能get。

UDP服務器數據傳輸不可靠,這里就忽略了。

>>:

簡單的單進程TCP服務器

假代碼:

#創建tcp服務器套接字

#綁定端口

#設置正常情況退出的服務器下,端口可以重用

#設置監聽,變為主動監聽

# 等待客戶端的鏈接,返回新的socket和地址

#關閉tcp服務器套接字

from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR #創建tcp服務器套接字
server_socket = socket(AF_INET,SOCK_STREAM) #綁定端口
server_socket.bind(("",9999)) #設置正常情況退出的服務器下,端口可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #設置監聽,變為主動監聽
server_socket.listen(5) while True: # 等待客戶端的鏈接,返回新的socket和地址
   new_socket,new_address = server_socket.accept() #接收數據,並且發送數據
   try: while True: recv_data = new_socket.recv(1024) #當有客戶端關閉后,recv解除阻塞,並且返回長度為0
         if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address),recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客戶端%s已經關閉" % (str(new_address))) break
   finally: new_socket.close() print("關閉%s客戶端" % (str(new_address))) #關閉tcp服務器套接字
server_socket.close()

 

多進程TCP服務器

from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR from multiprocessing import Process #在子進程中接收消息
def recv_data(new_socket,new_address): while True: recv_data = new_socket.recv(1024) # 當有客戶端關閉后,recv解除阻塞,並且返回長度為0
      if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address), recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客戶端%s已經關閉" % (str(new_address))) break
   #關閉與客戶端的連接
   print("關閉與客戶端的連接") new_socket.close() def main(): #創建tcp服務器套接字
   server_socket = socket(AF_INET,SOCK_STREAM) #綁定端口
   server_socket.bind(("",8888)) #設置正常情況退出的服務器下,端口可以重用
   server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #設置監聽,變為被動連接
   server_socket.listen(3) try: while True: # 等待客戶端的鏈接,返回新的socket和地址
         new_socket,new_address = server_socket.accept() #接收數據,並且發送數據
         Process(target=recv_data,args=(new_socket,new_address)).start() #因為主進程和子進程不共享數據
         #如果我們直接關閉new_socket,只是關閉主進程的new_socket,而子進程的不受影響
 new_socket.close() finally: #關閉tcp服務器套接字
 server_socket.close() if __name__ == "__main__": main()

 

 多進程TCP服務器

from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR from multiprocessing import Process #在子進程中接收消息
def recv_data(new_socket,new_address): while True: recv_data = new_socket.recv(1024) # 當有客戶端關閉后,recv解除阻塞,並且返回長度為0
      if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address), recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客戶端%s已經關閉" % (str(new_address))) break
   #關閉與客戶端的連接
   print("關閉與客戶端的連接") new_socket.close() def main(): #創建tcp服務器套接字
   server_socket = socket(AF_INET,SOCK_STREAM) #綁定端口
   server_socket.bind(("",8888)) #設置正常情況退出的服務器下,端口可以重用
   server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #設置監聽,變為被動連接
   server_socket.listen(3) try: while True: # 等待客戶端的鏈接,返回新的socket和地址
         new_socket,new_address = server_socket.accept() #接收數據,並且發送數據
         Process(target=recv_data,args=(new_socket,new_address)).start() #因為主進程和子進程不共享數據
         #如果我們直接關閉new_socket,只是關閉主進程的new_socket,而子進程的不受影響
 new_socket.close() finally: #關閉tcp服務器套接字
 server_socket.close() if __name__ == "__main__": main()

 

 多線程TCP服務器

from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR from threading import Thread #接收消息
def recv_data(new_socket,new_address): while True: recv_data = new_socket.recv(1024) # 當有客戶端關閉后,recv解除阻塞,並且返回長度為0
      if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address), recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客戶端%s已經關閉" % (str(new_address))) break

def main(): #創建tcp服務器套接字
   server_socket = socket(AF_INET,SOCK_STREAM) #綁定端口
   server_socket.bind(("",9999)) #設置正常情況退出的服務器下,端口可以重用
   server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #設置監聽,變為被動連接
   server_socket.listen(3) try: while True: # 等待客戶端的鏈接,返回新的socket和地址
         new_socket,new_address = server_socket.accept() #接收數據,並且發送數據
         Thread(target=recv_data,args=(new_socket,new_address)).start() finally: #關閉tcp服務器套接字
 server_socket.close() if __name__ == "__main__": main()

 

多任務協程實現 ——

greenlet和gevent 

 

#coding=utf-8
from greenlet import greenlet import time def test1(): while True: print "---A--" gr2.switch() time.sleep(0.5) def test2(): while True: print "---B--" gr1.switch() time.sleep(0.5) gr1 = greenlet(test1) gr2 = greenlet(test2) #切換到gr1中運行
gr1.switch()

-----------------------------------------------

import gevent #函數
def f(n): for i in range(n): print("%s:%s" % (gevent.getcurrent(),i)) f1 = gevent.spawn(f,5) f2 = gevent.spawn(f,5) f3 = gevent.spawn(f,5) #讓主線程等待三個協程執行完畢,否則沒有機會執行
f1.join() f2.join() f3.join() #可以看到,3個greenlet是依次運行而不是交替運行。要讓greenlet交替運行,可以通過gevent.sleep()交出控制權。

 --------------------------------------------------

#coding=utf-8
import gevent def f(n): for i in range(n): print gevent.getcurrent(), i #用來模擬一個耗時操作,注意不是time模塊中的sleep
        gevent.sleep(1) g1 = gevent.spawn(f, 5) g2 = gevent.spawn(f, 5) g3 = gevent.spawn(f, 5) #下面三行代碼意思:主線程等待各個協成支持完,否則協成沒有機會執行
g1.join() g2.join() g3.join()

 

單進程TCP服務器 ——

非堵塞式

 

from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET def main(): #創建tcp的socket套接字
   server_socket = socket(AF_INET,SOCK_STREAM) server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #綁定端口
   server_socket.bind(("",9999)) #設置非阻塞,也就是說accept方法不阻塞了,
   # 但是在沒有客戶端鏈接且被執行的時候會報錯
   #有客戶端鏈接的時候正常執行
 server_socket.setblocking(False) #設置監聽
   server_socket.listen(5) #客戶端列表
   client_lists = [] try: #不斷調用accept
      while True: try: # print("accept--111")
            new_socket,new_address = server_socket.accept() print("accept--2222") except Exception as result: # print(result)
            pass
         else: print("新的客戶%s鏈接上" % str(new_address)) #新鏈接的new_sokect默認也是阻塞,也設置為非阻塞后,recv為非阻塞
 new_socket.setblocking(False) client_lists.append((new_socket,new_address)) # print(111)
         for client_sokect,client_address in client_lists: #接收數據
            try: recv_data = client_sokect.recv(1024) except Exception as result: # print(result)
               pass
            else: # print("正常數據:%s" %recv_data)
               if len(recv_data) > 0 : print("收到%s:%s" % (str(client_address),recv_data)) client_sokect.send("thank you!".encode("gb2312")) else: #客戶端已經端口,要把該客戶端從列表中異常
 client_lists.remove((client_sokect,new_address)) client_sokect.close() print("%s已經斷開" % str(new_address)) finally: #關閉套接字
 server_socket.close() if __name__ == "__main__": main()

 

單進程TCP服務器 ——

select版

 

select 原理

其他語言(c或者c++)也有使用select實現多任務服務器。

select 能夠完成一些套接字的檢查,從頭到尾檢查一遍后,標記哪些套接字是否可以收數據,返回的時候,就返回能接收數據的套接字,返回的是列表。select是由操作系統提供的,效率要高些,非常快的方式檢測哪些套接字可以接收數據。select是跨平台的,在window也可以用。

io多路復用:沒有使用多進程和多線程的情況下完成多個套接字的使用

from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET from select import select import sys def main(): #創建tcp的socket套接字
   server_socket = socket(AF_INET,SOCK_STREAM) server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #綁定端口
   server_socket.bind(("",9999)) #設置監聽
   server_socket.listen(5) #客戶端列表
   socket_lists = [server_socket,sys.stdin] wirte_list = [] #是否退出
   is_run = False try: while True: #檢測列表client_lists那些socket可以接收數據,
         #檢測列表[]那些套接字(socket)可否發送數據
         #檢測列表[]那些套接字(socket)是否產生了異常
         print("select--111") #這個select函數默認是堵塞,當有客戶端鏈接的時候解除阻塞,
         # 當有數據可以接收的時候解除阻塞,當客戶端斷開的時候解除阻塞
         readable, wirteable,excep = select(socket_lists,wirte_list,[]) # print("select--2222")
         # print(111)
         for sock in wirteable: #這個會一直發送,因為他是處於已經發的狀態
            sock.send("thank you!".encode("gb2312")) for sock in readable: #接收數據
            if sock == server_socket: print("sock == server_socket") #有新的客戶端鏈接進來
               new_socket,new_address = sock.accept() #新的socket添加到列表中,便於下次socket的時候能檢查到
 socket_lists.append(new_socket) elif sock == sys.stdin: cmd = sys.stdin.readline() print(cmd) is_run = cmd else: # print("sock.recv(1024)....")
               #此時的套接字sock是直接可以取數據的
               recv_data = sock.recv(1024) if len(recv_data) > 0: print("從[%s]:%s" % (str(new_address),recv_data)) sock.send(recv_data) #把鏈接上有消息接收的socket添加到監聽寫的列表中
 wirte_list.append(sock) else: print("客戶端已經斷開") #客戶端已經斷開,要移除
 sock.close() socket_lists.remove(sock) #是否退出程序
         if is_run: break

   finally: #關閉套接字
 server_socket.close() if __name__ == "__main__": main()

 

單進程TCP服務器 ——

epoll版

 

from socket import *
import select def main(): #創建tcp服務器套接字
   server_socket = socket(AF_INET,SOCK_STREAM) #設置端口可以重用
   server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #綁定端口
   server_socket.bind(("",9999)) #設置監聽
   server_socket.listen(5) #用epoll設置監聽收數據
   epoll = select.epoll() #把server_socket注冊到epoll的事件監聽中,如果已經注冊過會發生異常
   epoll.register(server_socket.fileno(),select.EPOLLIN|select.EPOLLET) #裝socket列表
   socket_lists = {} #裝socket對應的地址
   socket_address = {} while True: #返回套接字列表[(socket的文件描述符,select.EPOLLIN)],
      # 如果有新的鏈接,有數據發過來,斷開鏈接等都會解除阻塞
      print("epoll.poll--111") epoll_list = epoll.poll() print("epoll.poll--222") print(epoll_list) for fd,event in epoll_list: #有新的鏈接
         if fd == server_socket.fileno(): print("新的客戶fd==%s" % fd) new_sokect,new_address = server_socket.accept() #往字典添加數據
            socket_lists[new_sokect.fileno()] = new_sokect socket_address[new_sokect.fileno()] = new_address #注冊新的socket也注冊到epoll的事件監聽中
            epoll.register(new_sokect.fileno(), select.EPOLLIN | select.EPOLLET) elif event ==select.EPOLLIN: print("收到數據了") #根據文件操作符取出對應socket
            new_sokect = socket_lists[fd] address = socket_address[fd] recv_data = new_sokect.recv(1024) if len(recv_data) > 0: print("已經收到[%s]:%s" % (str(address),recv_data.decode("gb2312"))) else: #客戶端端口,取消監聽
 epoll.unregister(fd) #關閉鏈接
 new_sokect.close() print("[%s]已經下線" % str(address)) #關閉套接字鏈接
 server_socket.close() if __name__ == "__main__": main()

 

單進程TCP服務器 ——

gevent版

 

gevent原理

greenlet已經實現了協程,但是這個還得人工切換,是不是覺得太麻煩了,莫要捉急,python還有一個比greenlet更強大的並且能夠自動切換任務的模塊gevent 

原理------當一個greenlet遇到IO(指的是input output 輸入輸出,比如網絡、文件操作等)操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。

由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO.

import sys import time import gevent from gevent import socket,monkey monkey.patch_all() def handle_request(conn): while True: data = conn.recv(1024) if not data: conn.close() break
        print("recv:", data) conn.send(data) def server(port): s = socket.socket() s.bind(('', port)) s.listen(5) while True: newSocket, addr = s.accept() gevent.spawn(handle_request, newSocket) if __name__ == '__main__': server(7788)

 

首先基於以上代碼模塊,撒點概念問題:

1.什么是協程?

協程:存在線程中,是比線程更小的執行單元,又稱微線程,纖程。自帶cpu上下文,操作協程由程序員決定,它可以將一個線程分解為多個微線程,每個協程間共享全局空間的變量,每秒鍾切換頻率高達百萬次。

2. 什么是計算密集型IO密集型

計算密集型:要進行大量的計算,消耗cpu資源。如復雜計算,對視頻進行高清解碼等,全靠cpu的運算能力。而計算密集型任務完成多任務切換任務比較耗時,cpu執行任務效率就越低。在python中,多進程適合計算密集型任務。

IO密集型:涉及到網絡、磁盤io的任務都是io密集型。cpu消耗少,計算量小,如請求網頁,讀寫文件等。在python中,使用sleep達到IO密集型任務的目的,多線程適合IO密集型任務。

 

各大實現版本對比:

select:

1)支持跨平台,最大缺陷是單個進程打開的FD是有限的,由FD_SETSIZE設置,默認是1024;

2)socket掃描時是線性掃描,及采用輪詢方式,效率低;

3)需要維護一個存放大量FD的數據結構,使得用戶空間和內核空間在傳遞該數據結構時復制開銷大。

poll: 

1)poll與select本質上沒有區別,但poll沒有最大連接數的限制;

2)大量的fd數組被整體復制於用戶態和內核地址空間之間,不管這樣的復制是不是有意義;

3)‘水平觸發’,如果報告了fd后,沒有被處理,下次poll時還會再次報告該fd。

epoll:

1)是之前poll和select的增強版,epoll更靈活,沒有描述符限制,能打開的fd遠大於1024(1G的內存上能監聽約10萬個端口);

2)‘邊緣出發’,事件通知機制,效率提升,最大的特點在於它只管你活躍的連接,而跟連接總數無關。而epoll對文件描述符的操作模式之一ET是一種高效的工作方式,很大程度減少事件反復觸發的次數,內核不會發送更多的通知(only once)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM