最近的項目來了一個需求,要求測試tcp網關通訊協議;
1、液壓井蓋通過TCP/IP TCP與平台通信;
2、硬件定期發送心跳包(10S)給平台,是平台與硬件保持長連接;
3、每台硬件有一個12字節的唯一編碼(字符型);
4、每台設備是1S發送一條報文;
最初使用NetAssist測試功能,模擬硬件設備發送報文,測試硬件設備發過來的狀態。
功能測試通過后,新來的壓測需求:要求對模擬60個左右的設備每隔一秒發送一條報文到平台,去百度Google搜索TCP壓測怎么壓測,這類文章博客比較少,試了有個博客自己寫的一個軟件,發現不能滿足需求(都不能安裝),其余就是jmeter進行壓測,所以就嘗試用jmeter進行測試,版本是3.2,按照博客所寫進行操作,發現還是不能滿足需求,並不能進行60個設備同時1S發送一條報文(加入定時器也不能滿足),單個的模擬一個設備可以發送成功。
由於使用工具不能滿足需求,只能依靠手寫代碼來實現了。
common.py 存放公用方法,如進制轉化、異或運算等;
manholecover_state.py 結合數據組裝報文;
run_test.py 主方法,發送報文,執行腳本;
test.log 日志文件
common.py
# coding=utf8 # import mysql.connector import binascii # 報文頭 def reverseStart_field(): return "F5F5" # 返回序列 def reverseNumber_field(): return "0001" # 補全字節 def zfill(str1, i): str1 = str1.zfill(2 * i) # 補全 字節 return str1 # int轉16進制 def intChange16(str1, i): str1 = hex(int(str1)) # 轉16進制 str1 = str1.replace("0x", "") # 去掉16進制0x str1 = str1.zfill(2 * i) # 補全 字節 return str1 # 16進制 去掉0x 補全 字節 def strReplace(str1, i): str1 = str1.replace("0x", "") # 去掉16進制0x str1 = str1.zfill(2 * i) # 補全 字節 return str1 # int轉16進制 倒序 def change16Reverse(str1, i): # print("==========調用方法change16Reverse") str1 = hex(int(str1)) # 轉16進制 str1 = str1.replace("0x", "") # 去掉16進制0x str1 = str1.zfill(2 * i) # 補全 字節 # print("==========倒序之前:",str1) i = len(str1) str2 = "" while (i > 0): str2 = str2 + (str1[i - 2:i]) i = i - 2 # print("==========倒序之后:",str2) return str2 # 時間轉16進制 def timeChange16(str1): arrTime = [] arrTime = str1.split('-') arrChange = [] strChange = "" i = 0 while (i < len(arrTime)): if (i == 0): print("----------") strChange = change16Reverse(arrTime[i], 2) else: strTem2 = intChange16(arrTime[i], 1) strChange = strChange + strTem2 i = i + 1 return strChange + "FF" # 字符串 轉換 def changestr(str2): length = len(str2) utf8_length = len(str2.encode('utf-8')) # gbk 8.5 utf-8 9 length = int((utf8_length - length) / 2 + length) str2 = binascii.b2a_hex(str2.encode("gbk")) # encode("gbk") utf8 str2 = str(str2) # 將 byte 類型轉換成 str 類型 str2 = str2.replace("b'", "").replace("'", "") # 去掉 b' str2 = str2.zfill(2 * length) return str2 # asc 轉換 def changeascii(str1, aa): e = 0 # 暫存結果 for i in str1: d = ord(i) # 單個字符轉換成ASCii碼 e = e * 256 + d # 將單個字符轉換成的ASCii碼相連 a = hex(e) a = a.replace("0x", "") a = a.zfill(2 * aa) # print("結果是:%s" % a) return a if __name__ == "__main__": message_id = '0x10' case_id = "1"
manholecover_state.py
# -*-coding:utf-8-*- # coding=utf8 import mysql.connector import common import logging import datetime import pymysql def parseadd(case): config = { 'host': '127.0.0.1', # 連接的IP地址 'user': 'root', 'password': '123456', 'port': 3306, 'database': 'monitor', 'charset': 'utf8', # 編碼格式,防止查出來的數據中文亂碼 } #db1_cursor = mysql.connector.Connect(**config) # 連接數據庫 db1_cursor = pymysql.Connect(**config) cur = db1_cursor.cursor() # 執行命令,接收結果 t_str = "select * from monitor.test_manholecover_state_copy where case_id=" try: cur.execute(t_str + str(case)) t_result = cur.fetchone() print(t_result) db1_cursor.close() print("-----------------------井蓋監控點編碼", t_result[5]) strElement0 = common.reverseStart_field() # 頭 strElement1 = common.reverseNumber_field() # 序列 strElement2 = common.intChange16(t_result[4], 1) # 應答標志 # strElement3=common.changeascii(t_result[5],12)#設備編碼 strElement3 = common.changeascii(t_result[5], 12) # 設備編碼 logging.info("報文發送時間:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " 編碼:" + t_result[5]) strElement4 = common.intChange16(t_result[6], 1) # 井蓋狀態 strElement5 = common.intChange16(t_result[7], 1) # 正在操作方式 strElement6 = common.intChange16(t_result[8], 3) # 告警狀態 strElement6_1 = common.intChange16(t_result[9], 3) # 告警狀態 strElement7 = common.intChange16(t_result[10], 1) # 控制授權 strElement8 = common.intChange16(t_result[11], 1) # 角度狀態 strElement9 = common.zfill(t_result[12], 4) # 結束標記 # 拼接字符串 strElement = [strElement0, strElement1, strElement2, strElement3, strElement4, strElement5, strElement6, strElement6_1, strElement7, strElement8, strElement9] strResult = ''.join(strElement) # print("=====================") print("拼接報文:", strResult) return strResult except Exception as e: print("Error: %s" % e) if __name__ == "__main__": for id in range(1, 68): print(parseadd(1)) # db1_cursor = mysql.connector.connect(host='127.0.0.1', port='3306', user='root', password='123456', database='monitor', charset='utf8') # cur = db1_cursor.cursor() # case_id='1' # #parseadd(case_id) # db1_cursor.close()
run_test.py
# coding=utf-8 import datetime import logging import threading from socket import * from time import sleep import manholecover_state # import logging.handlers logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='test.log', filemode='w') logging.debug('debug message') logging.info('info message') logging.warning('warning message') logging.error('error message') logging.critical('critical message') socks = [] def singlesend(case, soc2): global lock lock.acquire() # case_id = 1 # while case_id<=2: #循環次數 print("報文發送時間:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) print("-----------------------井蓋監控點編碼", case) # , "上報次數:", case_id) send_manholecover_state = manholecover_state.parseadd(case) #print("----------------------", send_manholecover_state + '\n') soc2.send(bytes().fromhex(send_manholecover_state)) # case_id = case_id + 1 # print("報文發送時間:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # sleep(1) #可修改的上報時間間隔 lock.release() def message(): global lock lock.acquire() t_code = 201820190601 threads = [] # while (t_code <= 201820190667): for case in range(1, 68): t_singlesend = threading.Thread(target=singlesend, args=(str(case), socks[case-1])) threads.append(t_singlesend) # t_singlesend.setDaemon(True) # t_singlesend.start() # sleep(1) t_code += 1 for i in threads: i.setDaemon(True) i.start() lock.release() if __name__ == '__main__': lock = threading.Lock() end = datetime.datetime.now() + datetime.timedelta(days=2) # 獲取結束時間 print("報文發送一周結束時間" + end.strftime("%Y-%m-%d %H:%M:%S") + '\n') for case in range(1, 68): soc = socket(AF_INET, SOCK_STREAM) soc.settimeout(300) # 設置超時時間 soc.connect(('192.168.0.156', 8845)) socks.append(soc) while True: message() now = datetime.datetime.now() sleep(1) if now >= end: break ''' threads = [] threads.append(t_singlesend) for t in threads: t.start() for t in threads: t.join() logging.info("全部執行完成~:%s" % ctime()) '''
運行結果:達到需求,每秒發送67條報文到平台
在這種正常場景下進行壓測,連續跑了幾個小時候,平台就崩掉了,java內存不斷的升高;
開發進行不斷的性能優化,已經連續跑了三四的情況下,平台已經走向穩定。