Python實現遠程控制單片機led狀態


 

測試環境及庫

Python版本:3.6

客戶端:

  • wxpython:客戶端界面
  • socket:tcp客戶端
  • re:輸入處理

服務端:

  • socketserver:並發服務器
  • serial:串口庫

下位機:

  • 下位機使用虛擬串口模擬

 

客戶端

import wx
import socket
import wx.gizmos as gizmos
import re

class Example(wx.Frame):
    def __init__(self, parent, title):
        super(Example, self).__init__(parent, title=title,size=(800,800))
        self.Init_Panel()
        self.Init_Box()
        self.Init_Left()
        self.Init_Right()
        # 將三個垂直盒子添加到垂直盒子
        self.Boxh1.Add(self.LeftPanel,proportion = 1, border = 2,flag = wx.ALL | wx.EXPAND)
        self.Boxh1.Add(self.RightPanel, proportion=4, border=2, flag=wx.ALL | wx.EXPAND)

        #將垂直盒子和主框架關聯
        self.SetSizer(self.Boxh1)
        #顯示主框架
        self.Show()
        self.s = None
    #創建兩個面板
    def Init_Panel(self):
        self.LeftPanel = wx.Panel(self)
        self.RightPanel = wx.Panel(self)

    #創建三個盒子
    #一個垂直盒子、兩個水平盒子
    def Init_Box(self):
        #兩個垂直盒子
        self.Boxv1 = wx.BoxSizer(wx.VERTICAL)
        self.Boxv2 = wx.BoxSizer(wx.VERTICAL)
        #一個水平盒子
        self.Boxh1 = wx.BoxSizer(wx.HORIZONTAL)
        self.Boxh2 = wx.BoxSizer(wx.HORIZONTAL)
    def Init_Left(self):
        self.Label3 = wx.StaticText(self.LeftPanel, -1, "設置客戶端鏈接")
        self.Label3.SetForegroundColour('white')
        self.Label3.SetBackgroundColour('black')
        self.Boxv1.Add(self.Label3, 0, wx.ALIGN_CENTRE, 10)
        self.Label1 = wx.StaticText(self.LeftPanel, -1, "服務器:")
        # 服務器ip輸入文本框
        self.inputText = wx.TextCtrl(self.LeftPanel, -1, "", pos=(80, 10), size=(150, -1))
        self.inputText.SetInsertionPoint(0)  # 設置焦點位置
        self.Boxv1.Add(self.Label1, 0, wx.EXPAND | wx.ALL, 0)
        self.Boxv1.Add(self.inputText, 0, wx.EXPAND | wx.ALL, 0)

        self.Label2 = wx.StaticText(self.LeftPanel, -1, "端口:")
        # 服務器端口輸入框
        self.pwdText = wx.TextCtrl(self.LeftPanel, -1, "", pos=(80, 50), size=(150, -1))
        self.Boxv1.Add(self.Label2, 0, wx.EXPAND | wx.ALL,0)
        self.Boxv1.Add(self.pwdText, 0, wx.EXPAND | wx.ALL,2)

        self.ButtonC = wx.Button(self.LeftPanel, -1, "確認連接")
        self.Bind(wx.EVT_BUTTON, self.ButtonConclick, self.ButtonC)
        self.Boxv1.Add(self.ButtonC, 0, wx.EXPAND | wx.ALL, 5)


        self.Labellamp = wx.StaticText(self.LeftPanel, -1, "燈管理")
        self.Labellamp.SetForegroundColour('white')
        self.Labellamp.SetBackgroundColour('black')
        self.Boxv1.Add(self.Labellamp, 0, wx.ALIGN_CENTRE, 5)
        #創建樹表控件
        self.tree = gizmos.TreeListCtrl(self.LeftPanel,-1)
        #添加樹表的列
        self.tree.AddColumn("燈名")
        self.tree.AddColumn("燈狀態")
        # self.tree.SetColumnWidth(0,186)
        self.root = self.tree.AddRoot("")
        self.tree.Expand(self.root)
        self.Boxv1.Add(self.tree, 0, wx.EXPAND | wx.ALL, 5)
        self.lamp = []

        child1 = self.tree.AppendItem(self.root, "led1")  # 添加一行
        self.tree.SetItemText(child1, "led1", 0)  # 按照索引設置每一列的數據
        self.tree.SetItemText(child1, "關閉", 1)  # 按照索引設置每一列的數據
        self.tree.Expand(self.root)
        self.lamp.append(child1)
        child2 = self.tree.AppendItem(self.root, "led2")  # 添加一行
        self.tree.SetItemText(child2, "led2", 0)  # 按照索引設置每一列的數據
        self.tree.SetItemText(child2, "關閉", 1)  # 按照索引設置每一列的數據
        self.tree.Expand(self.root)
        self.lamp.append(child2)

        #添加燈管理
        self.Label3 = wx.StaticText(self.LeftPanel, -1, "添加燈:")
        self.lampText1 = wx.TextCtrl(self.LeftPanel, -1, "", pos=(80, 10), size=(150, -1))
        self.lampText1.SetInsertionPoint(0)  # 設置焦點位置
        self.Boxv1.Add(self.Label3, 0, wx.EXPAND | wx.ALL, 0)
        self.Boxv1.Add(self.lampText1, 0, wx.EXPAND | wx.ALL, 0)
        self.ButtonD = wx.Button(self.LeftPanel, -1, "確認")
        self.Bind(wx.EVT_BUTTON, self.ButtonDonclick, self.ButtonD)
        self.Boxv1.Add(self.ButtonD, 0, wx.EXPAND | wx.ALL, 5)

        # 添加燈管理
        self.Label4 = wx.StaticText(self.LeftPanel, -1, "刪除燈:")
        self.lampText2 = wx.TextCtrl(self.LeftPanel, -1, "", pos=(80, 10), size=(150, -1))
        self.lampText2.SetInsertionPoint(0)  # 設置焦點位置
        self.Boxv1.Add(self.Label4, 0, wx.EXPAND | wx.ALL, 0)
        self.Boxv1.Add(self.lampText2, 0, wx.EXPAND | wx.ALL, 0)
        self.ButtonF = wx.Button(self.LeftPanel, -1, "確認")
        self.Bind(wx.EVT_BUTTON, self.ButtonFonclick, self.ButtonF)
        self.Boxv1.Add(self.ButtonF, 0, wx.EXPAND | wx.ALL, 5)

        self.logLabel = wx.StaticText(self.LeftPanel, -1, "日志")
        self.logLabel.SetForegroundColour('white')
        self.logLabel.SetBackgroundColour('black')
        self.Boxv1.Add(self.logLabel,0, wx.ALIGN_CENTRE, 10)
        # 創建文本域
        self.logmultiText = wx.TextCtrl(self.LeftPanel, -1, style=wx.TE_MULTILINE)  # 創建一個文本控件
        self.logmultiText.SetInsertionPoint(0)  # 設置插入點
        #  在垂直盒子里添加StaticBoxSizer盒子
        self.Boxv1.Add(self.logmultiText,5, wx.EXPAND | wx.ALL, 10)

        #把垂直盒子與LeftPanel關聯起來
        self.LeftPanel.SetSizer(self.Boxv1)

    def ButtonDonclick(self,event):
        lampname = self.lampText1.GetValue()
        zhmodel = re.compile(u'[\u4e00-\u9fa5]')  # 檢查中文
        if(lampname):
            match = zhmodel.search(lampname)
            if match:
                self.logmultiText.write('添加:' +  '名字含有中文字符' + "\n")
                return
            for i in self.lamp:
                if (self.tree.GetItemText(i, 0) == lampname):
                    self.logmultiText.write('添加:' + lampname + '重復' + "\n")
                    return
            child = self.tree.AppendItem(self.root,lampname)  # 添加一行
            self.tree.SetItemText(child,lampname, 0)  # 按照索引設置每一列的數據
            self.tree.SetItemText(child, "關閉", 1)  # 按照索引設置每一列的數據
            self.tree.Expand(self.root)
            self.lamp.append(child)
            self.logmultiText.write('添加:' + lampname + '成功'+"\n")
            return
        self.logmultiText.write('添加:內容為空' + "\n")
    def ButtonFonclick(self, event):
        lampname = self.lampText2.GetValue()
        if (lampname):
            for i in self.lamp:
                if (self.tree.GetItemText(i,0)==lampname):
                    self.tree.Delete(i)
                    self.logmultiText.write('刪除: '+ lampname +  '成功' + "\n")
                    self.lamp.remove(i)
                    return
            self.logmultiText.write('刪除:'+lampname+"不存在"+"\n")
            return
        self.logmultiText.write('刪除:內容為空' + "\n")
    def ButtonConclick(self,event):
        ip = self.inputText.GetValue()
        if(ip == ''):
            self.logmultiText.write('連接:ip內容為空' + "\n")
            return
        #驗證服務器ip
        if re.match(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$", ip):
            self.logmultiText.write('連接:ip有效' + "\n")
        else:
            self.logmultiText.write('連接:ip無效' + "\n")
            return
        port = self.pwdText.GetValue()

        # 驗證端口
        if (port== ''):
            self.logmultiText.write('連接:port內容為空' + "\n")
            return
        if re.match(r"^[1-9]$|(^[1-9][0-9]$)|(^[1-9][0-9][0-9]$)|(^[1-9][0-9][0-9][0-9]$)|(^[1-6][0-5][0-5][0-3][0-5]$)", port):
            port = int(port)
            self.logmultiText.write('連接:port有效' + "\n")
        else:
            self.logmultiText.write('連接:port無效' + "\n")
            return
        self.password = (ip,port)
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            self.s.connect(self.password)
            self.logmultiText.write('連接:' +  str(self.password) + "成功"+ "\n")
        except ConnectionRefusedError as e:
            self.logmultiText.write('連接:' +str(e) + "\n")
            self.s = None
    def Init_Right(self):
        # 創建一個wx.StaticBox對象
        # 聲明一個wx.StaticBoxSizer與創建的wx.StaticBox對象作為其參數
        nm1 = wx.StaticBox(self.RightPanel, -1)
        nmSizer1 = wx.StaticBoxSizer(nm1, wx.VERTICAL)
        # 創建文本域
        self.multiText3 = wx.TextCtrl(self.RightPanel, -1, style=wx.TE_MULTILINE)  # 創建一個文本控件
        self.multiText3.SetInsertionPoint(0)  # 設置插入點
        nmSizer1.Add(self.multiText3, 1, wx.EXPAND | wx.ALL, 10)
        #  在垂直盒子里添加StaticBoxSizer盒子
        self.Boxv2.Add(nmSizer1, 10, wx.EXPAND | wx.ALL, 10)

        self.text1 = wx.StaticText(self.RightPanel, label="消息內容:", style=wx.ALIGN_CENTER)
        self.Boxh2.Add(self.text1, 1, wx.ALIGN_LEFT, 10)


        self.input = wx.TextCtrl(self.RightPanel, -1)
        self.Boxh2.Add(self.input,5, wx.ALIGN_LEFT, 10)

        self.Button2 = wx.Button(self.RightPanel, -1, "發送信息")
        self.Boxh2.Add(self.Button2, 1, wx.ALIGN_LEFT, 10)
        self.Bind(wx.EVT_BUTTON, self.Button2onclick,self.Button2)

        self.Boxv2.Add(self.Boxh2,1, wx.ALL | wx.EXPAND)

        self.text1 = wx.StaticText(self.RightPanel, label="消息格式:燈名:控制命令;例如:led1:1、lamp1:0", style=wx.ALIGN_CENTER)
        self.Boxv2.Add(self.text1, 1, wx.ALL | wx.EXPAND)
        # 把垂直盒子與RightPanel關聯起來
        self.RightPanel.SetSizer(self.Boxv2)

    def Button2onclick(self,event):
        if(self.s):
            content = self.input.GetValue()
            if(self.s and content):
                ledinfo = content.split(":")
                if(len(ledinfo)<2):
                    self.logmultiText.write("發送:" + "數據無效" + "\n")
                    return
                if(ledinfo[1] == "1" or ledinfo[1] == "0" ):
                    # self.input.Clear()
                    for i in self.lamp:
                        print(self.tree.GetItemText(i,0))
                        if (self.tree.GetItemText(i,0)==ledinfo[0]):
                            content1 = str("客戶端") + "" + content + "\n"
                            self.multiText3.write(content1)
                            try:
                                self.s.send(content.encode('utf-8'))
                                self.logmultiText.write("發送:" + content + "\n")
                                reponse = self.s.recv(1024).decode('utf-8')
                                self.multiText3.write("服務端:"+reponse + "\n")
                                self.logmultiText.write("接收:" + reponse + "\n")
                                # print(reponse)
                                result = re.match('^(\S+)\s+(\S+)$', reponse)
                                print(result.group(1))
                                print(result.group(2))
                                if(result.group(1).lower() == ledinfo[0]):
                                    if(result.group(2)=="on"):
                                        self.tree.SetItemText(i, "開啟", 1)
                                    elif(result.group(2)=="off"):
                                        self.tree.SetItemText(i, "關閉", 1)
                            except ConnectionResetError as e:
                                self.logmultiText.write('連接:' + str(e) + "\n")
                            return
                    self.logmultiText.write("警告:" + ledinfo[0] + "不存在" + "\n")
                    return
                else:
                    self.logmultiText.write("發送:" + "數據無效" + "\n")
        else:
            self.logmultiText.write("連接:" + "還未連接服務器,不能發送信息" + "\n")

app = wx.App()
Example(None, title='LED控制客戶端')
app.MainLoop()
源碼

服務端

import socketserver
import serial
import serial.tools.list_ports

class Communication():
    #初始化
    def __init__(self,com,bps,timeout):
        self.port = com
        self.bps = bps
        self.timeout =timeout
        global Ret
        try:
            # 打開串口,並得到串口對象
             self.main_engine= serial.Serial(self.port,self.bps,timeout=self.timeout)
            # 判斷是否打開成功
             if (self.main_engine.is_open):
               Ret = True
        except Exception as e:
            print("---異常---:", e)

    # 打印設備基本信息
    def Print_Name(self):
        print(self.main_engine.name) #設備名字
        print(self.main_engine.port)#讀或者寫端口
        print(self.main_engine.baudrate)#波特率
        print(self.main_engine.bytesize)#字節大小
        print(self.main_engine.parity)#校驗位
        print(self.main_engine.stopbits)#停止位
        print(self.main_engine.timeout)#讀超時設置
        print(self.main_engine.writeTimeout)#寫超時
        print(self.main_engine.xonxoff)#軟件流控
        print(self.main_engine.rtscts)#軟件流控
        print(self.main_engine.dsrdtr)#硬件流控
        print(self.main_engine.interCharTimeout)#字符間隔超時

    #打開串口
    def Open_Engine(self):
        self.main_engine.open()

    #關閉串口
    def Close_Engine(self):
        self.main_engine.close()
        print(self.main_engine.is_open)  # 檢驗串口是否打開

    # 打印可用串口列表
    @staticmethod
    def Print_Used_Com():
        port_list = list(serial.tools.list_ports.comports())
        print(port_list)


    #接收指定大小的數據
    #從串口讀size個字節。如果指定超時,則可能在超時后返回較少的字節;如果沒有指定超時,則會一直等到收完指定的字節數。
    def Read_Size(self,size):
        return self.main_engine.read(size=size)

    #接收一行數據
    # 使用readline()時應該注意:打開串口時應該指定超時,否則如果串口沒有收到新行,則會一直等待。
    # 如果沒有超時,readline會報異常。
    def Read_Line(self):
        return self.main_engine.readline()

    #發數據
    def Send_data(self,data):
        # print("向串口發送信息:"+str(data))
        self.main_engine.write(data)

    #接收數據
    #一個整型數據占兩個字節
    #一個字符占一個字節
    def Recive_data(self,way):
        # 循環接收數據,此為死循環,可用線程實現
        while True:
            try:
                # 一個字節一個字節的接收
                if self.main_engine.in_waiting:
                    print("正在等待接收數據:" )
                    list = []
                    if(way == 0):
                        for i in range(self.main_engine.in_waiting):
                            data = self.Read_Size(1)
                            # print("接收ascii數據:"+str(data))
                            # data1 = str(data.hex())
                            # print("收到數據十六進制:"+data1)
                            list.append(data.decode("utf-8"))
                    return list
            except Exception as e:
                print("異常報錯:",e)

class Myserver(socketserver.BaseRequestHandler):
    def handle(self):
        print("conn is:"+str(self.request))
        print("conn is:" + str(self.client_address))
        #通訊循環
        while True:
            #收信息
            try:
                datasorues = self.request.recv(1024)
                if datasorues == '':
                    continue
                Engine.Send_data(datasorues)
                list = Engine.Recive_data(0)
                list = "".join(list)
                list = list.strip()
                self.request.sendall(list.encode('utf-8'))

            except Exception as e:
                continue
if __name__=="__main__":
    Ret = False  # 是否創建成功標志
    Com = None
    while (Com == None or Ret == False):
        Com = input("請輸入Com:")
        print(Com)
        Engine = Communication(Com, 115200, 0.5)
        print(Ret)
    #鏈接循環
    #這個方法需要兩個參數,((ip,port),Myserver)
    s=socketserver.ThreadingTCPServer(('127.0.0.1',888),Myserver)
    #永遠運行
    s.serve_forever()
源碼

測試

1.打開虛擬串口工具,添加虛擬串口。

2.打開服務端。

輸入串口,如果打印標明連接串口成功。

 

 3.打開客戶端。

 

 4.連接服務端。

注意:服務端的ip和端口我設置的固定的127.0.0.1:888,如果需要在局域網中通信,此ip需要設置為局域網中的靜態ip。

5.打開串口發送接收工具。

6.客戶端發送消息,串口工具回復信息。

客戶端:客戶端消息格式按照提示發送。

串口:串口回復格式為:燈名  on或者燈名  off,中間空格沒有限制。

日志顯示了用戶交互操作與提示,客戶端與服務端的交互。

當操作燈的狀態成功以后,燈的狀態會發送變化。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM