python+locust性能測試學習筆記


前言

Locust(俗稱 蝗蟲)一個輕量級的開源壓測工具,基本功能是用Python代碼描述所有測試。不需要笨拙的UI或龐大的XML,只需簡單的代碼即可。

環境安裝

Locust支持Python 2.7, 3.4, 3.5, and 3.6的版本,小編的環境是python3.6直接用pip安裝就行

安裝命令:pip install locustio

官方文檔

Locust Documentation​docs.locust.io

開始第一個實例

from locust import HttpLocust, TaskSet, task class Testlocust(TaskSet): def on_start(self): print("start") self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36' } @task(1) def baidu_demo(self): r = self.client.get("/", headers=self.headers, verify=False) print(r.status_code) assert r.status_code == 200 class WebsiteUser(HttpLocust): task_set = Testlocust min_wait = 1500 max_wait = 5000 if __name__ == "__main__": import os os.system("locust -f locust4.py --host=https://www.baidu.com") 
  • 使用@task裝飾的方法為一個事務,方法的參數用於指定該行為的執行權重,參數越大每次被用戶執行的概率越高,默認為1;
  • on_start():每個locust用戶執行測試事務之前執行一次,用於做初始化的工作,如登錄;
  • host :要加載主機的URL,通常是在命令行啟動locust時使用--host選項指定,若命令行啟動時未指定,該屬性被使用;
  • task_set:指向定義的一個用戶行為類;
  • min_wait:模擬用戶在執行每個任務之間等待的最小時間,單位為毫秒;
  • max_wait:模擬用戶在執行每個任務之間等待的最大時間,單位為毫秒

啟動locust

web模式啟動:os.system("locust -f locust4.py --host=")

啟動成功:

在瀏覽器中輸入: 出現如下圖說明啟動成功

 

測試結果:

no-web模式啟動:os.system("locust -f locust4.py --host= --no-web --csv=example -c 100 -r 10 -t 10s")


測試結果:

 

開始第二個實例壓kafka

import time
from locust import TaskSet, task, Locust, events
from kafka import KafkaProducer
import json

class UserBehavior(TaskSet):
    def on_start(self):
        self.producer = KafkaProducer(bootstrap_servers=['x.x.x.x:9092'])

    def on_stop(self):
        # 該方法當程序結束時每用戶進行調用,關閉連接
        self.producer.close()

    @task(1)
    def sendAddCmd(self):
        start_time = time.time()
        time_unique_id = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
        print(time_unique_id)
        print("===========================",start_time)
        try:
            timestamp = int(time.time())
            message = {
                'timestamp': timestamp,
                'message': "121314"
            }
            msg = json.dumps(message)
            msg = msg.encode('utf-8')
            self.producer.send('preHandleTopic', msg)
        except Exception as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(request_type="kafka", name="add", response_time=total_time,
                                        response_length=0, exception=e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            events.request_success.fire(request_type="kafka", name="add", response_time=total_time,
                                        response_length=0)



class SocketUser(Locust):
    task_set = UserBehavior
    min_wait = 1000  # 單位毫秒
    max_wait = 1000  # 單位毫秒



if __name__ == '__main__':
    import os
    os.system("locust -f SendKafka.py --host=x.x.x.x:9092"

啟動方式跟實例一相同

 

開始第三個實例壓tcp

from locust import HttpLocust, TaskSet, task

import socket  # 導入 socket 模塊
host = socket.gethostname()  # 獲取本地主機名
port = 8888  # 設置端口號
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))


class UserBehaviora(TaskSet):
    def on_start(self):
        print("start")

    @task(1)
    def bky_index(self):
        sock.send(
            b'121314')
        re_mes = sock.recv(1024).decode()
        print(re_mes)


class WebsiteUser(HttpLocust):
    task_set = UserBehaviora
    min_wait = 1500
    max_wait = 5000


if __name__ == "__main__":
    import os
    os.system("locust -f locust6.py --host=x.x.x.x:xxxx")

啟動方式跟實例一相同

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家的支持。

 

小編微信公眾號:自動化測試 To share


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM