學習日志-0005:python 實現串口


應用Pyserial 包實現串口通信類:

其主要接口如下:

1. Serial_Create  創建和使能串口

2. Serial_WriteAndReadBin 寫讀二進制數據流

3. Serial_WriteAndReadString 寫讀字符串

4. Serial_WriteString 寫字符串

5. Serial_WriteBin 寫二進制數據

6. Serial_Read 讀字符串數據

 

#coding:utf-8
#!/usr/bin/python3
import serial
import serial.tools.list_ports
import threading
import time

__all__ = ["CSerial"]

class CSerial:
    def __init__(self, Port, Baud, TimeOut):
        self.l_serial = None
        self.alive = False
        self.waitEnd = None
        self.port = Port
        self.baud = Baud
        self.timeOut = TimeOut
        self.readString = ""
        self.readBin = None
        self.receiveReady = False
        self.thread_SerialRead = None

    def Serial_Create(self):
        try:
            self.l_serial = serial.Serial(self.port, self.baud, timeout=self.timeOut)
        except Exception as e:
            print("serial port error:", e)

    def Serial_WriteAndReadBin(self,bin):
        WaitingCnt = 0
        self.alive = True
        self.l_serial.write(bin)
        while self.alive:
            time.sleep(0.1)   #1 second
            if self.l_serial.in_waiting:
                self.readBin = self.l_serial.read(self.l_serial.in_waiting)
                self.alive = False
                return self.readBin
            else:
                 WaitingCnt += 1
                 if WaitingCnt > 3:
                     return None

        return None

    def Serial_WriteAndReadString(self,string):
        WaitingCnt = 0
        self.readString = ""
        self.alive = True
        self.l_serial.write(string.encode("gbk"))
        while self.alive:
            time.sleep(0.1)  #1 second
            if self.l_serial.in_waiting:
                self.readString = self.l_serial.read(self.l_serial.in_waiting).decode("gbk")
                self.alive = False
                return self.readString
            else:
                 WaitingCnt += 1
                 if WaitingCnt > 3:
                     return None

        return None

    def Serial_ReadData(self):
        while self.alive:
            if self.l_serial.in_waiting and self.receiveReady == False:
                self.readData = self.l_serial.read(self.l_serial.in_waiting)
                self.receiveReady = True

    def Serial_WriteString(self,string):
         length = self.l_serial.write(string.encode("gbk"))
         return length

    def Serial_WriteBin(self,bin):
         length = self.l_serial.write(bin)
         return length

    def Serial_Read(self):
        if self.l_serial.in_waiting:
            STRGLO = self.l_serial.read(self.l_serial.in_waiting).decode("gbk")
            print(STRGLO)

    def Serial_Close(self):
        self.alive = False
        if self.l_serial.isOpen():
            self.l_serial.close()

#test example
if __name__ == '__main__':
    test = CSerial("COM41",9600,100)
    test.Serial_Create()
    print(test.Serial_WriteAndReadString("hello, I am a serial port!\r\n"))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM