案例如下:
1、啟動一個服務端套接字服務
2、啟動一個客戶端套接字服務
3、客戶端向服務端發送一個hello,服務端則回復一個word,並打印
參考地址:https://www.cnblogs.com/xilouch/p/4618903.html
服務端代碼:
#coding:utf-8 import socket import time class Server: def __init__(self,host,port): self.port = port self.host = host self.status = 0 self.BUF_SIZE = 1024 def createServer(self): self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.bind((self.host, self.port)) def startServer(self): self.status = 1 self.createServer() #設置接收的連接數為1 self.server.listen(1) client, address = self.server.accept() while self.status == 1: # 循環收發數據包,長連接 data = client.recv(self.BUF_SIZE) text = data.decode() if text != "": print(text) # python3 要使用decode client.send("world".encode()) # client.close() #連接不斷開,長連接 if __name__ == "__main__": server = Server("localhost",8083) server.createServer() server.startServer()
客戶端代碼:
#coding:utf-8 import socket import time import threading class Client: def __init__(self,host,port): self.port = port self.host = host self.status = 0 self.BUF_SIZE = 1024 def connect(self): self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客戶端開啟心跳 self.client.connect((self.host, self.port)) def send(self,mes): client.startResv() while True: self.client.send(mes.encode()) time.sleep(1) # 如果想驗證長時間沒發數據,SOCKET連接會不會斷開,則可以設置時間長一點 def resv(self): while True: data = self.client.recv(self.BUF_SIZE) text = data.decode() print(text) def startResv(self): t = threading.Thread(target=self.resv) t.start() def close(self): self.client.close() if __name__ == "__main__": client = Client("localhost",8083) client.connect() client.send("hello")
運行如下圖: