程序代碼如下:
import serial,threading,time class SerThread(): def __init__(self,portx): # 初始化串口 self.my_serial=serial.Serial() self.my_serial.port=portx self.my_serial.baudrate=9600 self.my_serial.timeout=1 fname=time.strftime('%Y%m%d') # blog名稱為當前時間 self.rfname='r'+fname # 接收blog名稱 self.sfname='s'+fname # 發送blog名稱 self.waitEnd=None # 設置線程事件變量 self.alive=False # 設置條件變量 def start(self): # 打開串口並創建blog文件 self.my_serial.open() # 打開串口 self.rfile=open(self.rfname,'w') # 創建接收文件 self.sfile=open(self.sfname,'w') # 創建發送文件 if self.my_serial.isOpen(): self.waitEnd=threading.Event() # 將線程事件賦值給變量 self.alive=True # 改變條件變量值 self.thread_read=threading.Thread(target=self.Reader) # 創建一個讀取串口數據的線程 self.thread_read.setDaemon(True) # 調用線程同時結束的函數 self.thread_send=threading.Thread(target=self.Sender) # 創建一個發送串口數據的線程 self.thread_send.setDaemon(True) # 調用線程同時結束的函數 self.thread_read.start() # 啟動讀數據線程 self.thread_send.start() # 啟動寫數據線程 return True # 如果串口打開了,就返回True else: return False #如果串口未打開,就返回False def Reader(self): while self.alive: # 當條件變量為True時執行 try: time.sleep(0.01) # 此處最好設置一個暫停時間,為了上串口發過來的數據緩存到接收緩存區 n=self.my_serial.inWaiting() # 將接收緩存區數據字節數保存在變量n中 data='' if n: data=self.my_serial.read(n).decode('gbk') # 讀取接收緩存區的數據並解碼 print('recv'+' '+time.strftime('%Y-%m-%d %X')+' '+data.strip()) # 將接收到的數據打印出來 print(time.strftime('%Y-%m-%d %X:')+data.strip(),file=self.rfile) # 將打印的內容寫入到文件中 if len(data)==1 and ord(data[len(data)-1])==113: # 根據輸入的'q'來退出程序 break except Exception as ex: print(ex) self.waitEnd.set() # 改變線程事件狀態為True,即喚醒后面的程序 self.alive=False # 改變條件量為False def Sender(self): while self.alive: try: snddata=input('輸入數據:\n') self.my_serial.write(snddata.encode('gbk')) print('sent'+' '+time.strftime('%Y-%m-%d %X')) print(snddata,file=self.sfile) except Exception as ex: print(ex) # 此兩行代碼是不用寫的,因為前面我們用了self.thread_send.setDaemon(True)代碼, # 其只要一個線程結束,另的線程也會結束 # self.waitEnd.set() # self.alive=False def waiting(self): # 等待event停止標志 if not self.waitEnd is None: self.waitEnd.wait() # 改變線程事件狀態為False,使線程阻止后續程序執行 # 關閉串口、保存文件 def stop(self): self.alive=False if self.my_serial.isOpen(): self.my_serial.close() self.rfile.close() self.sfile.close() if __name__ == '__main__': ser=SerThread('com1') try: if ser.start(): ser.waiting() ser.stop() else: pass except Exception as ex: print(ex) if ser.alive: ser.stop() print('End OK.') del ser
完成以上代碼之后運行,然后打開串口助手com2
以下就是該安全的操作。
如有問題,可關注微信公眾號進行咨詢!