測試環境及庫
Python版本:3.6
客戶端:
- wxpython:客戶端界面
- socket:tcp客戶端
- re:輸入處理
服務端:
- socketserver:並發服務器
- serial:串口庫
下位機:
- 下位機使用虛擬串口模擬
客戶端

import wx import socket import wx.gizmos as gizmos import re class Example(wx.Frame): def __init__(self, parent, title): super(Example, self).__init__(parent, title=title,size=(800,800)) self.Init_Panel() self.Init_Box() self.Init_Left() self.Init_Right() # 將三個垂直盒子添加到垂直盒子 self.Boxh1.Add(self.LeftPanel,proportion = 1, border = 2,flag = wx.ALL | wx.EXPAND) self.Boxh1.Add(self.RightPanel, proportion=4, border=2, flag=wx.ALL | wx.EXPAND) #將垂直盒子和主框架關聯 self.SetSizer(self.Boxh1) #顯示主框架 self.Show() self.s = None #創建兩個面板 def Init_Panel(self): self.LeftPanel = wx.Panel(self) self.RightPanel = wx.Panel(self) #創建三個盒子 #一個垂直盒子、兩個水平盒子 def Init_Box(self): #兩個垂直盒子 self.Boxv1 = wx.BoxSizer(wx.VERTICAL) self.Boxv2 = wx.BoxSizer(wx.VERTICAL) #一個水平盒子 self.Boxh1 = wx.BoxSizer(wx.HORIZONTAL) self.Boxh2 = wx.BoxSizer(wx.HORIZONTAL) def Init_Left(self): self.Label3 = wx.StaticText(self.LeftPanel, -1, "設置客戶端鏈接") self.Label3.SetForegroundColour('white') self.Label3.SetBackgroundColour('black') self.Boxv1.Add(self.Label3, 0, wx.ALIGN_CENTRE, 10) self.Label1 = wx.StaticText(self.LeftPanel, -1, "服務器:") # 服務器ip輸入文本框 self.inputText = wx.TextCtrl(self.LeftPanel, -1, "", pos=(80, 10), size=(150, -1)) self.inputText.SetInsertionPoint(0) # 設置焦點位置 self.Boxv1.Add(self.Label1, 0, wx.EXPAND | wx.ALL, 0) self.Boxv1.Add(self.inputText, 0, wx.EXPAND | wx.ALL, 0) self.Label2 = wx.StaticText(self.LeftPanel, -1, "端口:") # 服務器端口輸入框 self.pwdText = wx.TextCtrl(self.LeftPanel, -1, "", pos=(80, 50), size=(150, -1)) self.Boxv1.Add(self.Label2, 0, wx.EXPAND | wx.ALL,0) self.Boxv1.Add(self.pwdText, 0, wx.EXPAND | wx.ALL,2) self.ButtonC = wx.Button(self.LeftPanel, -1, "確認連接") self.Bind(wx.EVT_BUTTON, self.ButtonConclick, self.ButtonC) self.Boxv1.Add(self.ButtonC, 0, wx.EXPAND | wx.ALL, 5) self.Labellamp = wx.StaticText(self.LeftPanel, -1, "燈管理") self.Labellamp.SetForegroundColour('white') self.Labellamp.SetBackgroundColour('black') self.Boxv1.Add(self.Labellamp, 0, wx.ALIGN_CENTRE, 5) #創建樹表控件 self.tree = gizmos.TreeListCtrl(self.LeftPanel,-1) #添加樹表的列 self.tree.AddColumn("燈名") self.tree.AddColumn("燈狀態") # self.tree.SetColumnWidth(0,186) self.root = self.tree.AddRoot("燈") self.tree.Expand(self.root) self.Boxv1.Add(self.tree, 0, wx.EXPAND | wx.ALL, 5) self.lamp = [] child1 = self.tree.AppendItem(self.root, "led1") # 添加一行 self.tree.SetItemText(child1, "led1", 0) # 按照索引設置每一列的數據 self.tree.SetItemText(child1, "關閉", 1) # 按照索引設置每一列的數據 self.tree.Expand(self.root) self.lamp.append(child1) child2 = self.tree.AppendItem(self.root, "led2") # 添加一行 self.tree.SetItemText(child2, "led2", 0) # 按照索引設置每一列的數據 self.tree.SetItemText(child2, "關閉", 1) # 按照索引設置每一列的數據 self.tree.Expand(self.root) self.lamp.append(child2) #添加燈管理 self.Label3 = wx.StaticText(self.LeftPanel, -1, "添加燈:") self.lampText1 = wx.TextCtrl(self.LeftPanel, -1, "", pos=(80, 10), size=(150, -1)) self.lampText1.SetInsertionPoint(0) # 設置焦點位置 self.Boxv1.Add(self.Label3, 0, wx.EXPAND | wx.ALL, 0) self.Boxv1.Add(self.lampText1, 0, wx.EXPAND | wx.ALL, 0) self.ButtonD = wx.Button(self.LeftPanel, -1, "確認") self.Bind(wx.EVT_BUTTON, self.ButtonDonclick, self.ButtonD) self.Boxv1.Add(self.ButtonD, 0, wx.EXPAND | wx.ALL, 5) # 添加燈管理 self.Label4 = wx.StaticText(self.LeftPanel, -1, "刪除燈:") self.lampText2 = wx.TextCtrl(self.LeftPanel, -1, "", pos=(80, 10), size=(150, -1)) self.lampText2.SetInsertionPoint(0) # 設置焦點位置 self.Boxv1.Add(self.Label4, 0, wx.EXPAND | wx.ALL, 0) self.Boxv1.Add(self.lampText2, 0, wx.EXPAND | wx.ALL, 0) self.ButtonF = wx.Button(self.LeftPanel, -1, "確認") self.Bind(wx.EVT_BUTTON, self.ButtonFonclick, self.ButtonF) self.Boxv1.Add(self.ButtonF, 0, wx.EXPAND | wx.ALL, 5) self.logLabel = wx.StaticText(self.LeftPanel, -1, "日志") self.logLabel.SetForegroundColour('white') self.logLabel.SetBackgroundColour('black') self.Boxv1.Add(self.logLabel,0, wx.ALIGN_CENTRE, 10) # 創建文本域 self.logmultiText = wx.TextCtrl(self.LeftPanel, -1, style=wx.TE_MULTILINE) # 創建一個文本控件 self.logmultiText.SetInsertionPoint(0) # 設置插入點 # 在垂直盒子里添加StaticBoxSizer盒子 self.Boxv1.Add(self.logmultiText,5, wx.EXPAND | wx.ALL, 10) #把垂直盒子與LeftPanel關聯起來 self.LeftPanel.SetSizer(self.Boxv1) def ButtonDonclick(self,event): lampname = self.lampText1.GetValue() zhmodel = re.compile(u'[\u4e00-\u9fa5]') # 檢查中文 if(lampname): match = zhmodel.search(lampname) if match: self.logmultiText.write('添加:' + '名字含有中文字符' + "\n") return for i in self.lamp: if (self.tree.GetItemText(i, 0) == lampname): self.logmultiText.write('添加:' + lampname + '重復' + "\n") return child = self.tree.AppendItem(self.root,lampname) # 添加一行 self.tree.SetItemText(child,lampname, 0) # 按照索引設置每一列的數據 self.tree.SetItemText(child, "關閉", 1) # 按照索引設置每一列的數據 self.tree.Expand(self.root) self.lamp.append(child) self.logmultiText.write('添加:' + lampname + '成功'+"\n") return self.logmultiText.write('添加:內容為空' + "\n") def ButtonFonclick(self, event): lampname = self.lampText2.GetValue() if (lampname): for i in self.lamp: if (self.tree.GetItemText(i,0)==lampname): self.tree.Delete(i) self.logmultiText.write('刪除: '+ lampname + '成功' + "\n") self.lamp.remove(i) return self.logmultiText.write('刪除:'+lampname+"不存在"+"\n") return self.logmultiText.write('刪除:內容為空' + "\n") def ButtonConclick(self,event): ip = self.inputText.GetValue() if(ip == ''): self.logmultiText.write('連接:ip內容為空' + "\n") return #驗證服務器ip if re.match(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$", ip): self.logmultiText.write('連接:ip有效' + "\n") else: self.logmultiText.write('連接:ip無效' + "\n") return port = self.pwdText.GetValue() # 驗證端口 if (port== ''): self.logmultiText.write('連接:port內容為空' + "\n") return if re.match(r"^[1-9]$|(^[1-9][0-9]$)|(^[1-9][0-9][0-9]$)|(^[1-9][0-9][0-9][0-9]$)|(^[1-6][0-5][0-5][0-3][0-5]$)", port): port = int(port) self.logmultiText.write('連接:port有效' + "\n") else: self.logmultiText.write('連接:port無效' + "\n") return self.password = (ip,port) self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.s.connect(self.password) self.logmultiText.write('連接:' + str(self.password) + "成功"+ "\n") except ConnectionRefusedError as e: self.logmultiText.write('連接:' +str(e) + "\n") self.s = None def Init_Right(self): # 創建一個wx.StaticBox對象 # 聲明一個wx.StaticBoxSizer與創建的wx.StaticBox對象作為其參數 nm1 = wx.StaticBox(self.RightPanel, -1) nmSizer1 = wx.StaticBoxSizer(nm1, wx.VERTICAL) # 創建文本域 self.multiText3 = wx.TextCtrl(self.RightPanel, -1, style=wx.TE_MULTILINE) # 創建一個文本控件 self.multiText3.SetInsertionPoint(0) # 設置插入點 nmSizer1.Add(self.multiText3, 1, wx.EXPAND | wx.ALL, 10) # 在垂直盒子里添加StaticBoxSizer盒子 self.Boxv2.Add(nmSizer1, 10, wx.EXPAND | wx.ALL, 10) self.text1 = wx.StaticText(self.RightPanel, label="消息內容:", style=wx.ALIGN_CENTER) self.Boxh2.Add(self.text1, 1, wx.ALIGN_LEFT, 10) self.input = wx.TextCtrl(self.RightPanel, -1) self.Boxh2.Add(self.input,5, wx.ALIGN_LEFT, 10) self.Button2 = wx.Button(self.RightPanel, -1, "發送信息") self.Boxh2.Add(self.Button2, 1, wx.ALIGN_LEFT, 10) self.Bind(wx.EVT_BUTTON, self.Button2onclick,self.Button2) self.Boxv2.Add(self.Boxh2,1, wx.ALL | wx.EXPAND) self.text1 = wx.StaticText(self.RightPanel, label="消息格式:燈名:控制命令;例如:led1:1、lamp1:0", style=wx.ALIGN_CENTER) self.Boxv2.Add(self.text1, 1, wx.ALL | wx.EXPAND) # 把垂直盒子與RightPanel關聯起來 self.RightPanel.SetSizer(self.Boxv2) def Button2onclick(self,event): if(self.s): content = self.input.GetValue() if(self.s and content): ledinfo = content.split(":") if(len(ledinfo)<2): self.logmultiText.write("發送:" + "數據無效" + "\n") return if(ledinfo[1] == "1" or ledinfo[1] == "0" ): # self.input.Clear() for i in self.lamp: print(self.tree.GetItemText(i,0)) if (self.tree.GetItemText(i,0)==ledinfo[0]): content1 = str("客戶端") + ":" + content + "\n" self.multiText3.write(content1) try: self.s.send(content.encode('utf-8')) self.logmultiText.write("發送:" + content + "\n") reponse = self.s.recv(1024).decode('utf-8') self.multiText3.write("服務端:"+reponse + "\n") self.logmultiText.write("接收:" + reponse + "\n") # print(reponse) result = re.match('^(\S+)\s+(\S+)$', reponse) print(result.group(1)) print(result.group(2)) if(result.group(1).lower() == ledinfo[0]): if(result.group(2)=="on"): self.tree.SetItemText(i, "開啟", 1) elif(result.group(2)=="off"): self.tree.SetItemText(i, "關閉", 1) except ConnectionResetError as e: self.logmultiText.write('連接:' + str(e) + "\n") return self.logmultiText.write("警告:" + ledinfo[0] + "不存在" + "\n") return else: self.logmultiText.write("發送:" + "數據無效" + "\n") else: self.logmultiText.write("連接:" + "還未連接服務器,不能發送信息" + "\n") app = wx.App() Example(None, title='LED控制客戶端') app.MainLoop()
服務端

import socketserver import serial import serial.tools.list_ports class Communication(): #初始化 def __init__(self,com,bps,timeout): self.port = com self.bps = bps self.timeout =timeout global Ret try: # 打開串口,並得到串口對象 self.main_engine= serial.Serial(self.port,self.bps,timeout=self.timeout) # 判斷是否打開成功 if (self.main_engine.is_open): Ret = True except Exception as e: print("---異常---:", e) # 打印設備基本信息 def Print_Name(self): print(self.main_engine.name) #設備名字 print(self.main_engine.port)#讀或者寫端口 print(self.main_engine.baudrate)#波特率 print(self.main_engine.bytesize)#字節大小 print(self.main_engine.parity)#校驗位 print(self.main_engine.stopbits)#停止位 print(self.main_engine.timeout)#讀超時設置 print(self.main_engine.writeTimeout)#寫超時 print(self.main_engine.xonxoff)#軟件流控 print(self.main_engine.rtscts)#軟件流控 print(self.main_engine.dsrdtr)#硬件流控 print(self.main_engine.interCharTimeout)#字符間隔超時 #打開串口 def Open_Engine(self): self.main_engine.open() #關閉串口 def Close_Engine(self): self.main_engine.close() print(self.main_engine.is_open) # 檢驗串口是否打開 # 打印可用串口列表 @staticmethod def Print_Used_Com(): port_list = list(serial.tools.list_ports.comports()) print(port_list) #接收指定大小的數據 #從串口讀size個字節。如果指定超時,則可能在超時后返回較少的字節;如果沒有指定超時,則會一直等到收完指定的字節數。 def Read_Size(self,size): return self.main_engine.read(size=size) #接收一行數據 # 使用readline()時應該注意:打開串口時應該指定超時,否則如果串口沒有收到新行,則會一直等待。 # 如果沒有超時,readline會報異常。 def Read_Line(self): return self.main_engine.readline() #發數據 def Send_data(self,data): # print("向串口發送信息:"+str(data)) self.main_engine.write(data) #接收數據 #一個整型數據占兩個字節 #一個字符占一個字節 def Recive_data(self,way): # 循環接收數據,此為死循環,可用線程實現 while True: try: # 一個字節一個字節的接收 if self.main_engine.in_waiting: print("正在等待接收數據:" ) list = [] if(way == 0): for i in range(self.main_engine.in_waiting): data = self.Read_Size(1) # print("接收ascii數據:"+str(data)) # data1 = str(data.hex()) # print("收到數據十六進制:"+data1) list.append(data.decode("utf-8")) return list except Exception as e: print("異常報錯:",e) class Myserver(socketserver.BaseRequestHandler): def handle(self): print("conn is:"+str(self.request)) print("conn is:" + str(self.client_address)) #通訊循環 while True: #收信息 try: datasorues = self.request.recv(1024) if datasorues == '': continue Engine.Send_data(datasorues) list = Engine.Recive_data(0) list = "".join(list) list = list.strip() self.request.sendall(list.encode('utf-8')) except Exception as e: continue if __name__=="__main__": Ret = False # 是否創建成功標志 Com = None while (Com == None or Ret == False): Com = input("請輸入Com:") print(Com) Engine = Communication(Com, 115200, 0.5) print(Ret) #鏈接循環 #這個方法需要兩個參數,((ip,port),Myserver) s=socketserver.ThreadingTCPServer(('127.0.0.1',888),Myserver) #永遠運行 s.serve_forever()
測試
1.打開虛擬串口工具,添加虛擬串口。
2.打開服務端。
輸入串口,如果打印標明連接串口成功。
3.打開客戶端。
4.連接服務端。
注意:服務端的ip和端口我設置的固定的127.0.0.1:888,如果需要在局域網中通信,此ip需要設置為局域網中的靜態ip。
5.打開串口發送接收工具。
6.客戶端發送消息,串口工具回復信息。
客戶端:客戶端消息格式按照提示發送。
串口:串口回復格式為:燈名 on或者燈名 off,中間空格沒有限制。
日志顯示了用戶交互操作與提示,客戶端與服務端的交互。
當操作燈的狀態成功以后,燈的狀態會發送變化。