python locust 進行壓力測試


最近公司項目周期比較趕, 項目是軟硬結合,在缺少硬件的情況下,通過接口模擬設備上下架和購買情況,並進行壓力測試,

本次主要使用三個接口 分別是3個場景: 生成商品IP, 對商品進行上架, 消費者購買商品

最大問題:是數據庫是用ssh,只能用127.0.0.1去鏈接數據庫,

試過用ssh鏈接數據庫, 用requests 去跑腳本沒有問題,換上locust 就有問題

最后使用putty作為代理鏈接 ,有個缺陷就是 鏈接時效性不強,經常要重新鏈接putty

 

環境:

win10  mysql  locust python3.7

 

1.locust:

Locust是一個用於可擴展的,分布式的,性能測試的,開源的,用Python編寫框架/工具,它非常容易使用,也非常好學。

它的主要思想就是模擬一群用戶將訪問你的網站。每個用戶的行為由你編寫的python代碼定義,同時可以從Web界面中實時觀察到用戶的行為。

Locust完全是事件驅動的,因此在單台機器上能夠支持幾千並發用戶訪問。

與其它許多基於事件的應用相比,Locust並不使用回調,而是使用gevent,而gevent是基於協程的,可以用同步的方式來編寫異步執行的代碼。

每個用戶實際上運行在自己的greenlet中。

2. 安裝

pip  install locust

 安裝 pyzmq

 If you intend to run Locust distributed across multiple processes/machines, we recommend you to also install pyzmq.

 如果打算運行Locust 分布在多個進程/機器,需要安裝pyzmq.

 通過pip命令安裝。

 pip install pyzmq

3.Locust主要由下面的幾個庫構成:

 

1) gevent

gevent是一種基於協程的Python網絡庫,它用到Greenlet提供的,封裝了libevent事件循環的高層同步API。

2) flask

Python編寫的輕量級Web應用框架。

3) requests

Python Http庫

4) msgpack-python

MessagePack是一種快速、緊湊的二進制序列化格式,適用於類似JSON的數據格式。msgpack-python主要提供MessagePack數據序列化及反序列化的方法。

5) six

Python2和3兼容庫,用來封裝Python2和Python3之間的差異性

6) pyzmq

pyzmq是zeromq(一種通信隊列)的Python綁定,主要用來實現Locust的分布式模式運行

當我們在安裝 Locust 時,它會檢測我們當前的 Python 環境是否已經安裝了這些庫,如果沒有安裝,它會先把這些庫一一裝上。並且對這些庫版本有要求,有些是必須等於某版本,有些是大於某版本。我們也可以事先把這些庫全部按要求裝好,再安裝Locust時就會快上許多

 

 

4. 腳本解讀

1、創建ScriptTasks()類繼承TaskSet類:  用於定義測試業務。

2、創建index()、about()、demo()方法分別表示一個行為,訪問http://example.com。用@task() 裝飾該方法為一個任務。1、2表示一個Locust實例被挑選執行的權重,數值越大,執行頻率越高。在當前ScriptTasks()行為下的三個方法得執行比例為2:1:1

3、WebsiteUser()類: 用於定義模擬用戶。

4、task_set :  指向一個定義了的用戶行為類。

5、host:   指定被測試應用的URL的地址

6、min_wait :   用戶執行任務之間等待時間的下界,單位:毫秒。

7、max_wait :   用戶執行任務之間等待時間的上界,單位:毫秒。

 

5.執行腳本:

web/UI 界面:

if __name__ == '__main__':
    import os
    os.system("locust -f godemo.py --host=http://xx.api.xxxxx.net")

或者在命令行:

locust -f godemo.py --host=http:xxx.xxx.xx.net

或者:
locust -f godemo.py --H=http:xxx.xxx.xx.net

 

no-web:

dos進入Scripts目錄下,執行  locust -f ****.py --csv=onetest --host=http://0.0.0.0:0000 --no-web -c10 -r10 -t2 

(PS:-f 指定運行的py文件的名字,--csv 生成報告的名字,--host 測試的http服務的ip和port,--no-web 不用web啟動,-c 設置虛擬用戶數,  -r 設置每秒啟動虛擬用戶數, -t  設置運行時間)

 

下面是腳本:

創建一個方法保存在goconn,從數據庫讀取數據傳入請求:

import  pymysql
from sshtunnel import SSHTunnelForwarder
import random


def conn():

    #本地通過putty鏈接數據庫,在鏈接跳板機
    lcDB = pymysql.connect(host="127.0.0.1",port=8807, user="rt",passwd="qwqwqw12",db="go")
    cur = lcDB.cursor()
    #隨機生成一個數字,作為查詢的結果的結果數
    num = random.randint(0,10)

    sql = "select no from goods where operator_id =%s order by rand() limit %s"%(11, num)
    print(sql)
    rfid = []

    try:

        cur.execute(sql)
        data = cur.fetchall()

        for row in data:
            good_no = row[0]
            rfid.append(good_no)
            # print(good_no)
        return rfid


    except:
        print("Error")

    finally:
        #關閉連接
        lcDB.close()

 

請求:

import goconnfrom locust import HttpLocust,TaskSet,task

class goDemo(TaskSet):

    def getRfid(self):

        gID = goconn.conn()
        data = {
            "goods_no[]": gID,
            "num": 1,
        }
        print("2")
        r = self.client.post("/test/api/XXX", data=data)
        rfid = eval(r.content)["result"]
        print(rfid)
        return rfid

    def stock(self,did, oid, rfid):
        data = {
            "device_id": did,
            "operator_id": oid,
            "data[add][]": rfid,
            "data[remove][]": "",
        }
        result = self.client.post("/test/api/XX2", data= data)
        print(result.content)


    def purchase(self,did,uid,rfid):

        data = {
            "device_id":did,
            "user_id":uid,
            "data[]":rfid,
        }
        req = self.client.post("/test/api/XXX3", data = data)
        res =eval(req.content)["result"]
        print(req.content)

    @task(1)
    def test_getrfid(self):
        self.getRfid()

    @task(3)
    def test_stock(self):
        rfid = self.getRfid()
        self.stock(28,11,rfid)

    @task(7)
    def test_purchase(self):
        rfid = self.getRfid()
        self.stock(28,11,rfid)
        self.purchase(28,172,rfid)


class WebsiteUser(HttpLocust):
    task_set = goDemo
    min_wait = 3000
    max_wait = 5000

 

測試結果:

 Number of users to simulate:設置模擬的用戶總數,

Hatch rate (users spawned/second):每秒啟動的虛擬用戶數 ,

Start swarming:執行locust腳本

 

UI界面:運行200個, 每秒啟動2個用戶:

 參考文章:

http://www.pianshen.com/article/6404330705/

關於性能好文章:

https://www.cnblogs.com/botoo/p/7410283.html

設計locust:

參數化 ,關聯等:

http://www.cnblogs.com/ailiailan/p/9474973.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM