星期一, 20. 八月 2018 01:53上午 - beautifulzzzz
1、前言
做類似zigbee、ble mesh...無線網絡節點性能測試的時候,手動操作然后看表象往往很難找出真正的原因,而且有些深層次問題隱藏在弱網環境中、或大量測試中,因在上位機上用腳本實現自動化掛機測試便顯得尤為重要。
本文介紹一種用python寫的基於串口通信的上位機自動測試程序框架(簡陋框架)。
2、代碼框架介紹
如下:整個代碼包含兩層app+bsp,其中:
- bsp層放硬件相關的代碼(比如linux系統用python2.7寫的串口驅動類);
- app層中包含兩個應用程序
app_app_auto_test_0xda_0xdb_adapter
和app_app_auto_test_off_line
;
其中應用程序是基於bsp中的代碼實現的,進入每個獨立的應用程序文件夾,運行make all則可以運行~
➜ mesh_test_toos git:(master) ✗ tree
.
├── app
│ ├── app_app_auto_test_0xda_0xdb_adapter
│ │ ├── app_auto_test.py
│ │ ├── app_frame.py
│ │ ├── main.py
│ │ └── makefile
│ └── app_app_auto_test_off_line
│ ├── app_frame.py
│ ├── app_frame.pyc
│ ├── main.py
│ └── makefile
└── bsp
├── bsp_serial.py
├── bsp_serial.pyc
├── bsp_system.py
└── bsp_system.pyc
4 directories, 12 files
3、bsp代碼介紹
bsp_system.py: 該文件目前只放了一個獲取當前時間戳的函數,精確到毫秒:
#!/usr/bin/env python
# coding=utf-8
import time
def get_time_stamp():
ct = time.time()
local_time = time.localtime(ct)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
data_secs = (ct - long(ct)) * 1000
time_stamp = "[%s.%03d] " % (data_head, data_secs)
return time_stamp
version = '0.1'
bsp_serial.py: 該文件在pyserial上封裝了一個bsp_serial類,該類包含下面幾個成員函數:
- 實例化函數:自動讀取系統中所有串口,如果有多個則會讓你選擇一個,並進行打開,產生一個ser成員變量
- iswaiting函數:讀取之前要先調用該函數,看看是否有數據
- read函數:讀取一字節
- write函數:寫一個數組的數據
- close函數:關閉函數
A demo for read:
ser1 = bsp_serial.bsp_serial(9600)
while 1<2:
if ser1.iswaiting() > 0:
x = ser1.read()
print x
note: If you want to write datas when reading, you should use the thread (next will show you) !
4、app_app_auto_test_off_line demo介紹
該腳本為自動測試無線網絡中的某一個節點的長掛機情況下是否有掉線情況:
該網絡中有一個mesh燈節點和一個和PC相連的dongle mesh節點,由於ble mesh的特性:
處於同一mesh網絡中的節點中維護一個全部節點的在線/離線狀態的表
因此如果想實現監聽燈節點的在線/離線狀態,只需要周期性地從dongle節點中讀取狀態表即可!這里每隔15S dongle節點將狀態表以圖中所示FRAME的格式傳給PC:
- head為幀頭,固定的
- cmd為幀命令,同步狀態表時其值為0x07
- length為數據長度,這里為8
- data1、data2為數據,每4個字節表示一個節點的狀態,第1字節表示節點ID,第二字節為0表示離線
- check為校驗,為除該位其它位數據和模256
app_frame.py 中實現的則是用於解析數據包的類:
#!/usr/bin/env python
# coding=utf-8
import sys
import termios
class FRAME:
HEAD1=0
HEAD2=1
VERSION=2
CMD=3
LEN1=4
LEN2=5
LEN_HEAD=6
MAX_DATA_BUF_SIZE = 1000
def __init__(self,fun_analysis):
self.data_buf = ""
self.fun_analysis = fun_analysis
'''
judge frame is ok
'''
def frame_ok(self,str):
start_pos = 0
fram_len = 0
end_pos = 0
str_len = len(str)
while start_pos<str_len:
pos = start_pos
if((ord(str[pos]) == 0x55) and (pos!=str_len-1) and (ord(str[pos+1]) == 0xAA)):
break
start_pos = start_pos+1
if(start_pos == str_len):#no find
return (-1,start_pos,end_pos)
if(start_pos + FRAME.LEN_HEAD < str_len):
#print str_len,start_pos,FRAME.LEN2
fram_len = ord(str[start_pos+FRAME.LEN2])
end_pos = start_pos + FRAME.LEN_HEAD +fram_len
#print fram_len,end_pos
if(end_pos < str_len):
return (0,start_pos,end_pos)
return (-2,start_pos,end_pos)
'''
insert data to frame fifo
'''
def insert_data(self,data):
self.data_buf+=data
if len(self.data_buf) > self.MAX_DATA_BUF_SIZE:
self.data_buf = ""
'''
analysis frame and perform
'''
def run(self):
while 1<2:
(ret,start_pos,end_pos) = self.frame_ok(self.data_buf)
#print (ret,start_pos,end_pos)
if(ret == 0):
self.fun_analysis(self.data_buf[start_pos:end_pos+1])
self.data_buf = self.data_buf[end_pos:]
FRAME類的實例化函數需要注冊一個命令解析函數fun_analysis;frame_ok用於判斷數據包是否正確;insert_data用於將串口收到的數據插入到FIFO中,接收插入數據和處理分開;run函數用於不斷從FIFO中取出數據並判斷是否是一個有效數據包,並進而調用fun_analysis進行解析及后續處理。
note: run函數需要獨占一個線程!
則在main.py中分別開兩個線程 —— 串口接收線程和幀RUN線程:
import threading
import app_frame
import sys
sys.path.append('../../bsp')
import bsp_serial
import bsp_system
def init():
#......(略)
def analysis_cmd(str):
#......(略)
def ser_receive():
global ser1
global frame
while 1<2:
if ser1.iswaiting() > 0:
x = ser1.read()
frame.insert_data(x)
total_num = 0
fail_times = 0
ser1 = bsp_serial.bsp_serial(9600)
frame = app_frame.FRAME(analysis_cmd)
try:
init()
threads = []
t1 = threading.Thread(target=ser_receive)
t2 = threading.Thread(target=frame.run)
threads.append(t1)
threads.append(t2)
for t in threads:
t.setDaemon(True)
t.start()
t.join()
except Exception, e:
ser1.close() # close port
print("safe exit"+str(e))
- 串口接收線程不斷讀取串口數據,並插入到幀對象的FIFO中
- 幀RUN函數不斷解析FIFO中的數據,若檢測到一個有效數據包,則調用analysis_cmd處理
最終效果如下:
5、app_app_auto_test_0xda_0xdb_adapter demo介紹
這個例子和上面的很像,用於測試一條GET STATE命令的成功率:
- 1)整個mesh網路的架構還是dongle+1個node燈;
- 2)PC通過串口發請求命令給dongle;
- 3)dongle收到cmd1立刻通過串口應答該命令,並向燈節點請求狀態;
- 4)燈收到請求將狀態返回給dongle,dongle再通過串口給PC;
可見:自動化測試整個流程不像DEMO1中的那么簡單,這里有多次應答,因此我們必須注意設置timeout!
因此在app_auto_test.py實現如下:
#...略
class AUTO_PROCESS:
START=0
PROCESS1=1
PROCESS2=2
FINISH=3
def __init__(self,ser):
self.auto = AUTO_PROCESS.START
self.ser = ser
def analysis_cmd(self,str):
#...略
if cmd1 == 0x08:
print "\033[1;34m>> \033[0m",
self.auto = self.PROCESS2
def run(self):
#...略
all_times = 0
fail1_times = 0
fail2_times = 0
while 1<2:
if self.auto == self.START:
all_times = all_times + 1
time.sleep(2)
self.ser.write(cmd_get_status_all)
self.auto = AUTO_PROCESS.PROCESS1
time.sleep(2)
elif self.auto == self.PROCESS1:
fail1_times = fail1_times + 1
print "fail %d" %self.auto
self.auto = self.START
elif self.auto == self.PROCESS2:
fail2_times = fail2_times + 1
print "fail %d" %self.auto
self.auto = self.START
else:
print "success %d total:%d fail1:%d fail2:%d" %(self.auto,all_times,fail1_times,fail2_times)
self.auto = self.START
FRAME的analysis_cmd函數用於解析串口返回的命令,來判斷改變成員變量auto的值;run函數用於主動發送請求並等待返回,如果超時未收到返回,則會改變auto為失敗,並打印結果。
鏈接
- 工程GITHUB地址:https://github.com/nbtool/auto_test_tool
@beautifulzzzz
智能硬件、物聯網,熱愛技術,關注產品
博客:http://blog.beautifulzzzz.com
園友交流群:414948975