最近在做項目中使用python,tcp發送json數據格式,但是網上大都是python2版本的,因此自己弄了一個python3的版本,特此記錄一下。
客戶端代碼如下:
1
# python 3 #客戶端 import numpy as np import socket import json TCP_PORT = 12005 class tcp_client(): def __init__(self, tcp_port): self.s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.s.connect(('127.0.0.1',tcp_port)) print( self.s.recv(1024).decode()) data = {'command':'init'} str_json = json.dumps(data) self.s.send( str_json.encode() ) str_recv = self.s.recv(1024).decode() #print(str_recv) data_json = json.loads( str_recv ) def step(self): data ={'command':"take_picture",'parameters':{'position':"1"}} #print(data2['parameters']['position']) str_json = json.dumps(data) self.s.send( str_json.encode() ) str_recv = self.s.recv(1024).decode() data_json = json.loads( str_recv ) print("獲得拍照結果") photo_result = data_json['return']['result'] return photo_result def get_result(self): data ={'command':"get_result"} str_json = json.dumps(data) self.s.send(str_json.encode()) str_recv = self.s.recv(1024).decode() data_json = json.loads( str_recv ) res = data_json['return']['result'] print(res) #發送關閉客戶端命令 def close_tcp(self): print(" close tcp ... ") data = { 'command' : 'close' } str_json = json.dumps(data) self.s.send( str_json.encode() ) self.s.close() client = tcp_client( tcp_port = TCP_PORT ) # 測試步驟 client.step() client.get_result() client.close_tcp()
服務端代碼如下:
# python 3 # 服務端 import numpy as np import socket import threading import json import logging TCP_PORT = 12005 def one_servicer(): #Create The Socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) #Listen The Port s.bind(('',TCP_PORT)) s.listen(5) print('Waiting for connection...') def tcplink(sock,addr): print('Accept new connection from %s:%s...' % addr) sock.send( ('connect success!').encode() ) while True: data=sock.recv(1024).decode() print("data: ", data) data_json = json.loads( data ) #print('data_json["type"]: ', data_json["type"]) print('data_json["command"]: ', data_json["command"]) #if data_json["type"] == "step": #data = { 'state' : "123", 'reward' : "456", 'done' : "789", 'info' : "ok" } if data_json["command"] == "get_result": #返回圖片檢測結果是ok或者ng data = {'command':"get_result",'return':{'result':"ok"}} elif data_json["command"] == "take_picture": if data_json['parameters']['position']== "1": #執行相機拍照代碼 print("第一次拍照完成") data = {'command':"take_picture",'return':{'result':"ok"}} if data_json['parameters']['position']== "2": #執行相機拍照代碼 print("第二次拍照完成") data = {'command':"take_picture",'return':{'result':"ok"}} if data_json['parameters']['position']== "3": #執行相機拍照代碼 print("第三次拍照完成") data = {'command':"take_picture",'return':{'result':"ok"}} if data_json['parameters']['position']== "4": #執行相機拍照代碼 print("第四次拍照完成") data = {'command':"take_picture",'return':{'result':"ok"}} if data_json['parameters']['position']== "5": #執行相機拍照代碼 print("第五次拍照完成") data = {'command':"take_picture",'return':{'result':"ok"}} if data_json['parameters']['position']== "6": #執行相機拍照代碼 print("第六次拍照完成") data = {'command':"take_picture",'return':{'result':"ok"}} elif data_json["command"] == "close": break str_json = json.dumps(data) sock.send( str_json.encode() ) sock.close() print('Connection from %s:%s closed.'%addr) while True: # 開始一個新連接 sock,addr=s.accept() # 創建一個線程來處理連接 t=threading.Thread(target=tcplink(sock,addr)) if __name__ == "__main__": one_servicer()