Python的擴展接口[1] -> 串口通信


串口通信 / Serial Communication


 

1 串口簡介 / Serial Introduction

串行接口(Serial Interface)簡稱串口,通常為COM接口,數據發送方式為一位一位地順序傳送,通信線路簡單,一對傳輸線即可實現雙向通信,適用於遠距離且傳輸速度慢的通信。其中最常用的也是標准接口為RS-232,除此之外還有RS-422,RS-485等。

 

2 pyserial環境搭建

Pyserial的安裝可以通過pip進行,直接在命令行輸入下列命令進行安裝,

pip install pyserial

 

3 pyserial基本用法

利用pyserial可以實現Python對串口的控制,其基本使用方式十分簡單,主要為一下幾步,

       1. 利用Serial類傳入配置參數,生成串口實例;

       2. 利用串口實例的write()/read()方法消息的發送與接收。

Note: 關於Serial類,主要參數為端口和波特率,更多參數可查看源代碼。

 1 import serial
 2 
 3 ser = serial.Serial(port='com1', baudrate=9600)
 4 print(ser.portstr)
 5 
 6 def serial_communi(ser, msg):
 7     # n is the length of msg sent
 8     n = ser.write(msg.encode())
 9     print(n)
10     s = ser.read(n)
11     print(s)

 

4 read_until 方法的實現與擴展
在pyserial中,read_until 方法只能處理一個終止符,而當需要處理多個終止標志的時候,則需要重新定義一個新的方法,參考 pyserial 源代碼中的 read_until 函數,其實現方式為:

  1. 每次讀取一個字符,加入待返回字符串
  2. 根據終止符長度n,獲取字符中后 n 位字符,檢測是否為終止符

下面的代碼供參考,

 1     def _read_until(self):
 2         line = ''
 3         while True:
 4             try:
 5                 c = self.read(1)
 6             except (
 7                     AttributeError, TypeError,          # COM Exception
 8                     ValueError, EOFError,               # TELNET Exception
 9                     self._connection.OpenCloseException) as e:
10                 # Above Exception happens when communication manually closed,
11                 # if still should run, condition sleep here,
12                 # else return None to exit thread
13                 if self.should_run:
14                     self._cond_wait(self._recv_cond)
15                     continue
16                 else:
17                     return None
18             if c:
19                 line += c
20                 self._result.put(c)
21                 tokens = [self._term_token, self._abort_token, self._cr_token, self._lf_token]
22                 for token in tokens:
23                     if not line[-len(token):] == token:
24                         continue
25                     if token == self._cr_token or token == self._lf_token:
26                         return line
27                     if token == self._term_token:
28                         self.waiter_wake_up()
29                         self.termi_signal(True)
30                         self._result.save(strip=self._term_token)
31                         self.logger.debug("TERMINATE %s" % self._term_token)
32                         return line
33                     if token == self._abort_token:
34                         # TODO: handle this abort token encountered
35                         self.logger.debug("ABORT %s" % self._abort_token)
36                         pass
37             # When timeout will enter this else
38             # else:
39             #     return line

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM