最近公司項目周期比較趕, 項目是軟硬結合,在缺少硬件的情況下,通過接口模擬設備上下架和購買情況,並進行壓力測試,
本次主要使用三個接口 分別是3個場景: 生成商品IP, 對商品進行上架, 消費者購買商品
最大問題:是數據庫是用ssh,只能用127.0.0.1去鏈接數據庫,
試過用ssh鏈接數據庫, 用requests 去跑腳本沒有問題,換上locust 就有問題
最后使用putty作為代理鏈接 ,有個缺陷就是 鏈接時效性不強,經常要重新鏈接putty
環境:
win10 mysql locust python3.7
1.locust:
Locust是一個用於可擴展的,分布式的,性能測試的,開源的,用Python編寫框架/工具,它非常容易使用,也非常好學。
它的主要思想就是模擬一群用戶將訪問你的網站。每個用戶的行為由你編寫的python代碼定義,同時可以從Web界面中實時觀察到用戶的行為。
Locust完全是事件驅動的,因此在單台機器上能夠支持幾千並發用戶訪問。
與其它許多基於事件的應用相比,Locust並不使用回調,而是使用gevent,而gevent是基於協程的,可以用同步的方式來編寫異步執行的代碼。
每個用戶實際上運行在自己的greenlet中。
2. 安裝
pip install locust
安裝 pyzmq
If you intend to run Locust distributed across multiple processes/machines, we recommend you to also install pyzmq.
如果打算運行Locust 分布在多個進程/機器,需要安裝pyzmq.
通過pip命令安裝。
pip install pyzmq
3.Locust主要由下面的幾個庫構成:
1) gevent
gevent是一種基於協程的Python網絡庫,它用到Greenlet提供的,封裝了libevent事件循環的高層同步API。
2) flask
Python編寫的輕量級Web應用框架。
3) requests
Python Http庫
4) msgpack-python
MessagePack是一種快速、緊湊的二進制序列化格式,適用於類似JSON的數據格式。msgpack-python主要提供MessagePack數據序列化及反序列化的方法。
5) six
Python2和3兼容庫,用來封裝Python2和Python3之間的差異性
6) pyzmq
pyzmq是zeromq(一種通信隊列)的Python綁定,主要用來實現Locust的分布式模式運行
當我們在安裝 Locust 時,它會檢測我們當前的 Python 環境是否已經安裝了這些庫,如果沒有安裝,它會先把這些庫一一裝上。並且對這些庫版本有要求,有些是必須等於某版本,有些是大於某版本。我們也可以事先把這些庫全部按要求裝好,再安裝Locust時就會快上許多
4. 腳本解讀:
1、創建ScriptTasks()類繼承TaskSet類: 用於定義測試業務。 2、創建index()、about()、demo()方法分別表示一個行為,訪問http://example.com。用@task() 裝飾該方法為一個任務。1、2表示一個Locust實例被挑選執行的權重,數值越大,執行頻率越高。在當前ScriptTasks()行為下的三個方法得執行比例為2:1:1 3、WebsiteUser()類: 用於定義模擬用戶。 4、task_set : 指向一個定義了的用戶行為類。 5、host: 指定被測試應用的URL的地址 6、min_wait : 用戶執行任務之間等待時間的下界,單位:毫秒。 7、max_wait : 用戶執行任務之間等待時間的上界,單位:毫秒。
5.執行腳本:
web/UI 界面:
if __name__ == '__main__': import os os.system("locust -f godemo.py --host=http://xx.api.xxxxx.net")
或者在命令行:
locust -f godemo.py --host=http:xxx.xxx.xx.net
或者:
locust -f godemo.py --H=http:xxx.xxx.xx.net
no-web:
dos進入Scripts目錄下,執行 locust -f ****.py --csv=onetest --host=http://0.0.0.0:0000 --no-web -c10 -r10 -t2
(PS:-f 指定運行的py文件的名字,--csv 生成報告的名字,--host 測試的http服務的ip和port,--no-web 不用web啟動,-c 設置虛擬用戶數, -r 設置每秒啟動虛擬用戶數, -t 設置運行時間)
下面是腳本:
創建一個方法保存在goconn,從數據庫讀取數據傳入請求:
import pymysql from sshtunnel import SSHTunnelForwarder import random def conn(): #本地通過putty鏈接數據庫,在鏈接跳板機 lcDB = pymysql.connect(host="127.0.0.1",port=8807, user="rt",passwd="qwqwqw12",db="go") cur = lcDB.cursor() #隨機生成一個數字,作為查詢的結果的結果數 num = random.randint(0,10) sql = "select no from goods where operator_id =%s order by rand() limit %s"%(11, num) print(sql) rfid = [] try: cur.execute(sql) data = cur.fetchall() for row in data: good_no = row[0] rfid.append(good_no) # print(good_no) return rfid except: print("Error") finally: #關閉連接 lcDB.close()
請求:
import goconnfrom locust import HttpLocust,TaskSet,task class goDemo(TaskSet): def getRfid(self): gID = goconn.conn() data = { "goods_no[]": gID, "num": 1, } print("2") r = self.client.post("/test/api/XXX", data=data) rfid = eval(r.content)["result"] print(rfid) return rfid def stock(self,did, oid, rfid): data = { "device_id": did, "operator_id": oid, "data[add][]": rfid, "data[remove][]": "", } result = self.client.post("/test/api/XX2", data= data) print(result.content) def purchase(self,did,uid,rfid): data = { "device_id":did, "user_id":uid, "data[]":rfid, } req = self.client.post("/test/api/XXX3", data = data) res =eval(req.content)["result"] print(req.content) @task(1) def test_getrfid(self): self.getRfid() @task(3) def test_stock(self): rfid = self.getRfid() self.stock(28,11,rfid) @task(7) def test_purchase(self): rfid = self.getRfid() self.stock(28,11,rfid) self.purchase(28,172,rfid) class WebsiteUser(HttpLocust): task_set = goDemo min_wait = 3000 max_wait = 5000
測試結果:
Number of users to simulate:設置模擬的用戶總數,
Hatch rate (users spawned/second):每秒啟動的虛擬用戶數 ,
Start swarming:執行locust腳本
UI界面:運行200個, 每秒啟動2個用戶:
參考文章:
http://www.pianshen.com/article/6404330705/
關於性能好文章:
https://www.cnblogs.com/botoo/p/7410283.html
設計locust:
參數化 ,關聯等:
http://www.cnblogs.com/ailiailan/p/9474973.html