python多線程網絡編程


背景

使用過flask框架后,我對request這個全局實例非常感興趣。它在客戶端發起請求后會保存着所有的客戶端數據,例如用戶上傳的表單或者文件等。那么在很多客戶端發起請求時,服務器是怎么去區分不同的request對象呢?當查看了大量的資料后,發現它使用了一種稱為thread local的技術。關於thread local的實現原理其實很簡單,就是聲明一個全局的字典並且以線程的名字作為字典的鍵,然后其值就是該線程下的私有數據。具體可以參考這篇ThreadLocal文章。我們都知道http服務器相對socket服務器要更加上層的網絡服務,所以研究flaskrequest實現原理前最好能夠先理解scoket是怎么實現的。

運行平台與python版本要求

  • 1.使用的是python版本;
  • 2.在mac下測試通過(理論上在所有系統上都能夠運行);
  • 3.可以github上獲得所有源代碼;
  • 4.使用nc命令模擬客戶端;

研究目標

  • 1.我將會使用socket完成一個echo服務器的設計;
  • 2.這個服務器使用多線程實現非阻塞;
  • 3.我要使用thread local技術使數據獨立安全;

socket的阻塞式實現

程序的設計應該是從實現最簡單核心的任務開始的,所以下面我先實現一個簡單的socket服務器。

import socket

class ThreadSocket(object):
	"""
	
	"""
	def __init__(self, host, port):
		self.host = host
		self.port = port
		self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
		self.sock.bind((self.host, self.port))

	def listen(self):
		self.sock.listen(5)
		while True:
			client, address = self.sock.accept()
			client.settimeout(60)
			try:
				data = client.recv(1024)
				if data:
					client.send(data)
				else:
					raise error("Client has disconnected")
			except:
				client.close()
			
if __name__ == '__main__':
	server=ThreadSocket('',9000)
	server.listen()

你也可以直接通過git checkout v0.1命令獲得這個版本的代碼,代碼寫完后可以通過下面的方式完成測試。

python simpleSocketServer.py

新打開一個終端,輸入下面這個命令進行測試。

nc 127.0.0.1 9000

下面可以看看客戶端運行后的輸出結果。

可以發現在第二次輸入的數據並沒有得到服務器的反饋,這是因為服務器還沒有切換到讀取等待模式。可以通過下面的方式修改從而實現不間斷的發送接收任務。

import socket

class ThreadSocket(object):
        """
        
        """
        def __init__(self, host, port):
                self.host = host
                self.port = port
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self.sock.bind((self.host, self.port))

        def listen(self):
                self.sock.listen(5)
                while True:
                        client, address = self.sock.accept()
                        client.settimeout(60)
                        while True:
                                try:
                                        data = client.recv(1024)
                                        if data:
                                                client.send(data)
                                        else:
                                                raise error("Client has disconne
cted")
                                except:
                                        client.close()

if __name__ == '__main__':
        server=ThreadSocket('',9000)
        server.listen()

現在再看到我們的客戶端輸出結果如下。

代碼可以通過下面的的命令git checkout v0.2獲得。但當你啟動多個客戶端時,你會發現這種實現方式的局限性。
下面啟動兩個客戶端進行演示的輸出結果。

你會發現當再使用nc啟動一個進程訪問服務器的時候是無法與服務器通訊的,因此這種阻塞的實現方式是非常低效的。那么如何才能編寫出支持多個客戶端的服務器呢?請繼續看下去。

socket的多線程實現

為了使每一個客戶端都能夠得到服務器的響應我們的設計思路是讓主線程等待客戶端的連接,一旦連接成功就啟動一個新的子線程,並且把讀寫相關的操作都扔給子線程處理。好有了思路下面可以編寫代碼了。

port socket
import threading

class ThreadSocket(object):
        """
        
        """
        def __init__(self, host, port):
                self.host = host
                self.port = port
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self.sock.bind((self.host, self.port))

        def listen(self):
                self.sock.listen(5)
                while True:
                        client, address = self.sock.accept()
                        client.settimeout(60)
                        threading.Thread(target=self.handleClientRequest, args=(
client, address)).start()

        def handleClientRequest(self, client, address):
                while True:
                        try:
                                data = client.recv(1024)
                                if data:
                                        client.send(data)
                                else:
                                        raise error("Client has disconnected")
                        except:
                                client.close()

if __name__ == '__main__':
        server=ThreadSocket('',9000)
        server.listen()

注意在頂部導入threading模塊,然后把讀寫網絡套接字部分放入線程回調函數里,你可以使用git checkout v0.3獲取這個版本的代碼。
下面看看運行的結果。

現在我們的服務器可以多人同時訪問,並且能夠同時提供服務了。

使用現有的socket實現一個ToDo服務器

在編寫api時通常我們都會以實現一個ToDo服務功能作為演示示例,下面我們來簡單設計一下。

  • 1.使用字典作為數據庫保存用戶提交的數據;
  • 2.使用GET方法實現獲取數據功能;
  • 3.使用POST實現提交數據功能;

要實現上面這幾個點也不難,首先我們先實現第一個目標。使用字典保存我們的數據,我們的服務器將不會再返回一個用戶的輸入數據,而是返回一個字典格式的數據。實現方式如下。

import socket
import threading

class ThreadSocket(object):
        """
        
        """
        todo_list = {
                'task_01':'see someone',
                'task_02':'read book',
                'task_03':'play basketball'

        }

        def __init__(self, host, port):
                self.host = host
                self.port = port
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self.sock.bind((self.host, self.port))

        def listen(self):
                self.sock.listen(5)
                while True:
                        client, address = self.sock.accept()
                        client.settimeout(60)
                        threading.Thread(target=self.handleClientRequest, args=(
client, address)).start()

        def handleClientRequest(self, client, address):
                while True:
                        try:
                                data = client.recv(1024)
                                if data:
                                        #client.send(data)
                                        response = str(self.todo_list)
                                        client.send(response)
                                else:
                                        raise error("Client has disconnected")
                        except:
                                client.close()

if __name__ == '__main__':
        server=ThreadSocket('',9000)
        server.listen()

通過字典保存了一些數據,當客戶發送數據請求時就把這個字典反饋給客戶。在把字典發送給客戶之前需要使用str()函數強制轉換數據為字符串,不然就會出現問題。這里的代碼可以使用git checkout v0.4獲得。
下面可以查看運行結果如下所示。

可以看到只要隨便輸入一些數據,它就會反饋服務器中的數據給客戶端。這種請求的方式也太簡單了,所以我們現在要使用一些方式進行限制,只有特定的命令才能請求到我們的數據。下面我們來實現一個GET方法。

if data:
        #client.send(data)
         if 'GET' in data:
                response = str(self.todo_list)
         else:
                response = 'data no found'
         client.send(response)
else:
       raise error("Client has disconnected")

現在只是簡單的通過判斷客戶端的請求數據中是否包含有關鍵字GET字符,為了能夠在客戶端中輸出結果完后換行,現在我們可以通過下面這種方式優化一下代碼。

response = response+'\r\n'
client.send(response)

運行后可以看到改進后的運行結果如下。

為了使GET方法能夠起到真正的查詢特定數據的作用,可以先構造一種查詢方式。

GET/task_id/status

我們設計的查詢語句非常簡單,每一個域都是通過斜線分割,那么服務器應該如何解析這種查詢語句呢?請看下面的實現方式。

if 'GET' in data:
      method,task_id,status = data.split('/')
      result = self.todo_list.get(task_id,'no key match')
      response = str(result)

可以發現,我使用字符串內置函數直接分割客戶的請求數據,然后通過簡單的字典查詢得到用戶指定的數據。
全部代碼如下:

import socket
import threading

class ThreadSocket(object):
        """
        
        """
        todo_list = {
                'task_01':'see someone',
                'task_02':'read book',
                'task_03':'play basketball'

        }

        def __init__(self, host, port):
                self.host = host
                self.port = port
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self.sock.bind((self.host, self.port))

        def listen(self):
                self.sock.listen(5)
                while True:
                        client, address = self.sock.accept()
                        client.settimeout(60)
                        threading.Thread(target=self.handleClientRequest, args=(client,
address)).start()

        def handleClientRequest(self, client, address):
                while True:
                        try:
                                data = client.recv(1024)
                                if data:
                                        #client.send(data)
                                        if 'GET' in data:
                                                method,task_id,status = data.split('/')
                                                result = self.todo_list.get(task_id,'no 
key match')
                                                print 'result',result
                                                response = str(result)
                                        else:
                                                response = 'data no found'

                                        response = response+'\r\n'
                                        client.send(response)
                                else:
                                        raise error("Client has disconnected")
                except:
                                client.close()

if __name__ == '__main__':
        server=ThreadSocket('',9000)
        server.listen()

當前版本的代碼可以通過git checkout v0.5獲得。下面來看運行的結果。

如何更新我們的數據呢?我們需要實現一個POST方法,下面我們可以設計一下我們的POST語句了。

POST/task_id=value/status

可以看到語句設計的非常簡單,下面我們來實現它。

if 'GET' in data:
     method,task_id,status = data.split('/')
     result = self.todo_list.get(task_id,'no key match')
     print 'result',result
     response = str(result)
elif 'POST' in data:
     method,command,status = data.split('/')
     key,value = command.split('=')
     self.todo_list[key] = value
     response = 'submit success'
else:
     response = 'data no found'

  response = response+'\r\n'
  client.send(response)

所有代碼將不再粘貼到文章中,可以通過git checkout v0.6獲得,下面是運行截圖。

總結

目前我們已經實現了socket在多線程下的一個簡單的todo應用,通過一點一點的實現整個框架能夠深入理解網絡編程的原理。那么現在我們還有什么方面可以改進,還有什么地方需要考慮呢?下面給大家列出下一章將會覆蓋的話題。

  • 1.使用正則表達式解析用戶輸入;
  • 2.使用Thread Local技術;
  • 3.如何讓瀏覽器也可以訪問我們的服務器;

希望下一章能夠見到你:)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM