一、背景
打算使用locust做並發,實現性能測試,網上找到的大部分都是http協議的測試,但被測平台是類似於IOT的平台,壓力不在於http的訪問,而在於終端設備的連接與數據發送,於是就想基於locust中http的使用方式,做一個tcp的並發腳本。
二、HttpUser
locust執行並發的腳本大致可分為兩部分,TaskSet和User。而HttpUser是繼承於User類,實現了http訪問相關的一些方法,只要繼承該類就可以很方便地進行並發測試,此時用戶只需要關注業務的實現即可,業務中方法的調用也和requests模塊的一致。
我們發現繼承自TaskSet的類中,可以直接調用 self.client 方法進行http的訪問,於是追蹤 self.client 方法的源碼。

1 @property 2 def client(self): 3 """ 4 Shortcut to the client :py:attr:`client <locust.User.client>` attribute of this TaskSet's :py:class:`User <locust.User>` 5 """ 6 return self.user.client
返回的是 self.user.client,繼續追蹤 self.user

1 @property 2 def user(self): 3 """:py:class:`User <locust.User>` instance that this TaskSet was created by""" 4 return self._user
繼續追蹤 self._user

1 def __init__(self, parent): 2 self._task_queue = [] 3 self._time_start = time() 4 5 if isinstance(parent, TaskSet): 6 self._user = parent.user 7 else: 8 self._user = parent 9 10 self._parent = parent
在繼承自TaskSet的類中打印 self.parent 可以發現,他就是繼承自 HttpUser的類,這時候關鍵點就來到了HttpUser,此時可以認為self.client就是HttpUser.client
點進HttpUser查看源碼。

1 class HttpUser(User): 2 """ 3 Represents an HTTP "user" which is to be spawned and attack the system that is to be load tested. 4 5 The behaviour of this user is defined by its tasks. Tasks can be declared either directly on the 6 class by using the :py:func:`@task decorator <locust.task>` on methods, or by setting 7 the :py:attr:`tasks attribute <locust.User.tasks>`. 8 9 This class creates a *client* attribute on instantiation which is an HTTP client with support 10 for keeping a user session between requests. 11 """ 12 13 abstract = True 14 """If abstract is True, the class is meant to be subclassed, and users will not choose this locust during a test""" 15 16 client: HttpSession = None 17 """ 18 Instance of HttpSession that is created upon instantiation of Locust. 19 The client supports cookies, and therefore keeps the session between HTTP requests. 20 """ 21 22 def __init__(self, *args, **kwargs): 23 super().__init__(*args, **kwargs) 24 if self.host is None: 25 raise LocustError( 26 "You must specify the base host. Either in the host attribute in the User class, or on the command line using the --host option." 27 ) 28 29 session = HttpSession( 30 base_url=self.host, 31 request_event=self.environment.events.request, 32 user=self, 33 ) 34 session.trust_env = False 35 self.client = session
發現 HttpUser確實有個client屬性,該屬性是一個HttpSession類的實例化對象。
點進HttpSession類,可以發現,該類是繼承自requests.Session,而request正是實現http請求的常用模塊。
三、TcpSocketUser構造
至此,我們可以大膽猜測,是不是只要構造一個 TcpSocketUser繼承自User,並且將一個實現了socket方法的實例化對象賦值給TcpSocketUser.client方法即可呢?

1 # locustfile.py 2 # coding:utf-8 3 import socket 4 from locust import User, TaskSet, task 5 import time 6 7 8 class TcpSocketUser(User): 9 10 abstract = True 11 12 client: socket.socket = None 13 14 def __init__(self, *args, **kwargs): 15 super(TcpSocketUser, self).__init__(*args, **kwargs) 16 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 17 host, port = self.host.split(":") 18 self.client.connect((host, int(port))) 19 20 21 class TestTaskSet(TaskSet): 22 23 @task(1) 24 def send_data(self): 25 while True: 26 self.client.send(b"This is send data!") 27 data = self.client.recv(1024) 28 print(data) 29 time.sleep(3) 30 31 32 class TcpTestUser(TcpSocketUser): 33 34 tasks = [TestTaskSet] 35 host = "172.16.100.139:8001" 36 37 38 if __name__ == "__main__": 39 import os 40 os.system("locust")

1 # Server.py 2 # coding:utf-8 3 import gevent 4 from gevent import socket, monkey 5 6 monkey.patch_all() 7 8 9 def server(port): 10 try: 11 s = socket.socket() 12 s.bind(('0.0.0.0', port)) 13 s.listen(500) 14 while True: 15 cli, addr = s.accept() 16 gevent.spawn(handle_request, cli) 17 except KeyboardInterrupt as e: 18 print(e) 19 20 21 def handle_request(conn): 22 try: 23 while True: 24 data = conn.recv(1024) 25 if not data: 26 conn.close() 27 else: 28 print("recv:", data) 29 conn.send(b"Received!") 30 except OSError as e: 31 print("client has been closed") 32 33 except Exception as ex: 34 print(ex) 35 finally: 36 conn.close() 37 38 39 if __name__ == '__main__': 40 server(8001)
以上的代碼實現了功能,客戶端定時向服務端發送數據,服務端收到后進行回復,運行后發現可行,此時初步的並發框架已經完成。
此時想實現第二個需求,客戶端每隔5s同時向服務器發送兩條數據 “This is First data!” 和 “This is Second data!”,但是服務端因為要處理的原因,只能20s后才能返回結果,實現代碼如下。

1 # locustfile.py 2 # coding:utf-8 3 import socket 4 from locust import User, TaskSet, task, constant 5 import time 6 7 8 class TcpSocketUser(User): 9 10 abstract = True 11 12 client: socket.socket = None 13 14 def __init__(self, *args, **kwargs): 15 super(TcpSocketUser, self).__init__(*args, **kwargs) 16 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 17 host, port = self.host.split(":") 18 self.client.connect((host, int(port))) 19 20 21 class TestTaskSet(TaskSet): 22 23 @task(1) 24 def send_data(self): 25 send_data = b"This is First data!" 26 self.client.send(send_data) 27 print(send_data) 28 send_data = b"This is Second data!" 29 self.client.send(send_data) 30 print(send_data) 31 recv_data = self.client.recv(1024) 32 print(recv_data) 33 34 35 class TcpTestUser(TcpSocketUser): 36 37 wait_time = constant(5) 38 39 tasks = [TestTaskSet] 40 host = "172.16.100.139:8001" 41 42 43 if __name__ == "__main__": 44 import os 45 os.system("locust")

1 # Server.py 2 # coding:utf-8 3 import gevent 4 from gevent import socket, monkey 5 import time 6 7 monkey.patch_all() 8 9 10 def server(port): 11 try: 12 s = socket.socket() 13 s.bind(('0.0.0.0', port)) 14 s.listen(500) 15 while True: 16 cli, addr = s.accept() 17 gevent.spawn(handle_request, cli) 18 except KeyboardInterrupt as e: 19 print(e) 20 21 22 def handle_request(conn): 23 try: 24 while True: 25 data = conn.recv(1024) 26 if not data: 27 conn.close() 28 else: 29 print("recv:", data) 30 time.sleep(20) 31 if b"First" in data: 32 conn.send(b"First Received!") 33 elif b'Second' in data: 34 conn.send(b"Second Received!") 35 except OSError as e: 36 print("client has been closed") 37 38 except Exception as ex: 39 print(ex) 40 finally: 41 conn.close() 42 43 44 if __name__ == '__main__': 45 server(8001)
運行起來會發現,當客戶端數據發送完成之后,只能等待20s,收到服務端的答復后才能返回開始下一次數據發送,這是因為recv默認是個阻塞方法,當沒有收到數據時會一直阻塞在此,影響下一次任務的開始,此時我們要解決阻塞的問題,可以使用recv的block=False,但官方文檔在Testing non-HTTP systems 章節提到,協議庫要用可以猴子補丁的。
所以我們選擇使用gevent的方法來進行處理。
四、gevent介紹
gevent是基於協程的Python網絡庫。特點:
- 基於libev的快速事件循環(Linux上epoll,FreeBSD上kqueue)。
- 基於greenlet的輕量級執行單元。
- API的概念和Python標准庫一致(如事件,隊列)。
- 可以配合socket,ssl模塊使用。
- 能夠使用標准庫和第三方模塊創建標准的阻塞套接字(gevent.monkey)。
- 默認通過線程池進行DNS查詢,也可通過c-are(通過GEVENT_RESOLVER=ares環境變量開啟)。
- TCP/UDP/HTTP服務器
- 子進程支持(通過gevent.subprocess)
- 線程池
我們主要是用到上面標注紅色的特點,該特點是結合猴子補丁一起使用的,猴子補丁的作用是將一些阻塞的方法/模塊動態替換成非阻塞的,比如 socket、time等。
五、非阻塞代碼

1 # locustfile.py 2 # coding:utf-8 3 from gevent import socket, monkey;monkey.patch_all() 4 import gevent 5 import socket 6 from locust import User, TaskSet, task, constant 7 import time 8 9 10 class TcpSocketUser(User): 11 12 abstract = True 13 14 client: socket.socket = None 15 16 def __init__(self, *args, **kwargs): 17 super(TcpSocketUser, self).__init__(*args, **kwargs) 18 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 19 host, port = self.host.split(":") 20 self.client.connect((host, int(port))) 21 gevent.spawn(handle_recv, self.client) 22 23 24 class TestTaskSet(TaskSet): 25 26 @task(1) 27 def send_data(self): 28 send_data = b"This is First data!" 29 self.client.send(send_data) 30 print(send_data) 31 send_data = b"This is Second data!" 32 self.client.send(send_data) 33 print(send_data) 34 35 36 def handle_recv(conn): 37 while True: 38 data = conn.recv(1024) 39 if not data: 40 conn.close() 41 else: 42 print(data) 43 time.sleep(0.1) 44 45 46 class TcpTestUser(TcpSocketUser): 47 48 wait_time = constant(5) 49 50 tasks = [TestTaskSet] 51 host = "172.16.100.139:8001" 52 53 54 if __name__ == "__main__": 55 import os 56 os.system("locust")

1 # Server.py 2 # coding:utf-8 3 import gevent 4 from gevent import socket, monkey 5 import time 6 7 monkey.patch_all() 8 9 10 def server(port): 11 try: 12 s = socket.socket() 13 s.bind(('0.0.0.0', port)) 14 s.listen(500) 15 while True: 16 cli, addr = s.accept() 17 gevent.spawn(handle_request, cli) 18 except KeyboardInterrupt as e: 19 print(e) 20 21 22 def handle_request(conn): 23 try: 24 while True: 25 data = conn.recv(1024) 26 if not data: 27 conn.close() 28 else: 29 print("recv:", data) 30 time.sleep(20) 31 for i in data.split(b'!'): 32 if b"First" in i: 33 conn.send(b"First Received!") 34 elif b'Second' in i: 35 conn.send(b"Second Received!") 36 except OSError as e: 37 print("client has been closed") 38 39 except Exception as ex: 40 print(ex) 41 finally: 42 conn.close() 43 44 45 if __name__ == '__main__': 46 server(8001)
以上代碼可實現上述第二個需求,即 客戶端每隔5s同時向服務器發送兩條數據 “This is First data!” 和 “This is Second data!”,但是服務端因為要處理的原因,只能20s后才能返回結果。
至此,已經可以實現並發了。
六、並發結果注冊到測試報告
我們繼續回到HttpUser源碼中查看。
HttpSession實例化時傳入三個參數,self.host,self.environment.events.request,self,跟報告相關的主要是第二個參數,我們到HttpSession中進行查看。
發現兩個跟報告相關的地方。

1 request_meta = { 2 "request_type": method, 3 "response_time": response_time, 4 "name": name or url_after_redirect, 5 "context": context, 6 "response": response, 7 "exception": None, 8 "start_time": start_time, 9 "url": request_after_redirect.url, 10 }

1 if catch_response: 2 return ResponseContextManager(response, request_event=self.request_event, request_meta=request_meta)
request_meta中的各個key與報告中的每一列是一一對應的。
因此我們認為 ResponseContextManager 類中就含有將結果注冊到測試報告的相關代碼,於是我們繼續到 ResponseContextManager 源碼中查看,果然發現了相關的代碼。

1 def _report_request(self, exc=None): 2 self._request_event.fire(**self.request_meta)
self._request_event指向的就是self.environment.events.request,所以fire方法就可以將request_meta的數據注冊到報告中。
此時我們做一個小調整,新建一個TcpSocketClient類繼承自socket類,重寫connect、recv、send等方法,使其可以注冊到報告中。
request_meta中大部分參數的含義都比較清晰,這里不做一一解釋(本文未用到response和context),但問題在於在注冊到報告中,如何確認是成功或失敗呢?
查看locust官方文檔,可以看到有這么一段。
Parameters request_type – Request type method used name – Path to the URL that was called (or override name if it was used in the call to the client) response_time – Time in milliseconds until exception was thrown response_length – Content-length of the response response – Response object (e.g. a requests.Response) context – User/request context exception – Exception instance that was thrown. None if request was successful.
其中exception用法寫到,該參數傳入的是方法執行錯誤產生時所拋出的錯誤實例,如果方法執行成功則填為None,至此,大部分代碼都已完成,我們看下代碼和結果。

1 # locustfile.py 2 # coding:utf-8 3 from gevent import socket, monkey, event;monkey.patch_all() 4 import gevent 5 import socket 6 from locust import User, TaskSet, task, constant 7 import time 8 9 10 class TcpSocketClient(socket.socket): 11 12 def __init__(self, af_inet, socket_type, request_event, user): 13 super(TcpSocketClient, self).__init__(af_inet, socket_type) 14 self.connect_event = event.Event() 15 self.request_event = request_event 16 self.user = user 17 18 def connect(self, addr): 19 start_time = time.time() 20 try: 21 super(TcpSocketClient, self).connect(addr) 22 except Exception as e: 23 total_time = int((time.time() - start_time) * 1000) 24 self.request_event.fire(request_type="tcp", name="connect", response_time=total_time, response_length=0, 25 response=None, context=None, exception=e) 26 else: 27 self.connect_event.set() 28 total_time = int((time.time() - start_time) * 1000) 29 self.request_event.fire(request_type="tcp", name="connect", response_time=total_time, response_length=0, 30 response=None, context=None, exception=None) 31 32 def send(self, data, flag=False): 33 start_time = time.time() 34 try: 35 self.connect_event.wait() 36 super(TcpSocketClient, self).send(data) 37 total_time = int((time.time() - start_time) * 1000) 38 print(data) 39 self.request_event.fire(request_type="tcp", name="send", response_time=total_time, 40 response_length=len(data), response=None, context=None, exception=None) 41 except OSError as e: 42 total_time = int((time.time() - start_time) * 1000) 43 self.request_event.fire(request_type="tcp", name="send", response_time=total_time, 44 response_length=0, response=None, context=None, exception=e) 45 46 def recv(self, bufsize, flag=False): 47 start_time = time.time() 48 try: 49 while True: 50 start_time = time.time() 51 self.connect_event.wait() 52 data = super(TcpSocketClient, self).recv(bufsize) 53 total_time = int((time.time() - start_time) * 1000) 54 print(data) 55 self.request_event.fire(request_type="tcp", name="recv", response_time=total_time, 56 response_length=len(data), response=None, context=None, exception=None) 57 time.sleep(0.1) 58 except OSError as e: 59 total_time = int((time.time() - start_time) * 1000) 60 self.request_event.fire(request_type="tcp", name="recv", response_time=total_time, response_length=0, 61 response=None, context=None, exception=e) 62 63 64 class TcpSocketUser(User): 65 66 abstract = True 67 68 client: socket.socket = None 69 70 def __init__(self, *args, **kwargs): 71 super(TcpSocketUser, self).__init__(*args, **kwargs) 72 self.client = TcpSocketClient(socket.AF_INET, socket.SOCK_STREAM, self.environment.events.request, self) 73 host, port = self.host.split(":") 74 self.client.connect((host, int(port))) 75 gevent.spawn(self.client.recv, 1024) 76 77 78 class TestTaskSet(TaskSet): 79 80 @task(1) 81 def send_data(self): 82 send_data = b"This is First data!" 83 self.client.send(send_data) 84 send_data = b"This is Second data!" 85 self.client.send(send_data) 86 87 88 class TcpTestUser(TcpSocketUser): 89 90 wait_time = constant(5) 91 92 tasks = [TestTaskSet] 93 host = "172.16.100.139:8001" 94 95 96 if __name__ == "__main__": 97 import os 98 os.system("locust")

1 # Server.py 2 # coding:utf-8 3 import gevent 4 from gevent import socket, monkey 5 import time 6 7 monkey.patch_all() 8 9 10 def server(port): 11 try: 12 s = socket.socket() 13 s.bind(('0.0.0.0', port)) 14 s.listen(500) 15 while True: 16 cli, addr = s.accept() 17 gevent.spawn(handle_request, cli) 18 except KeyboardInterrupt as e: 19 print(e) 20 21 22 def handle_request(conn): 23 try: 24 while True: 25 data = conn.recv(1024) 26 if not data: 27 conn.close() 28 else: 29 print("recv:", data) 30 # time.sleep(5) 31 for i in data.split(b'!'): 32 if b"First" in i: 33 conn.send(b"First Received!") 34 elif b'Second' in i: 35 conn.send(b"Second Received!") 36 except OSError as e: 37 print("client has been closed") 38 39 except Exception as ex: 40 print(ex) 41 finally: 42 conn.close() 43 44 45 if __name__ == '__main__': 46 server(8001)
七、業務融合
這里又出現一個問題,注冊到報告中的信息,更多是想關注業務的成功與失敗,而不是底層的connect、send、recv這些,那該怎么辦呢?
因為我們要模擬的是終端客戶向平台發送數據,所以我們新建一個Device類,該類繼承自TcpSocketClient,並且會擴展實際的業務。
現在假設我們有這么一個需求,客戶端連接平台后要先發送一條“Register!”指令向平台注冊,如果平台回復“Success”,則客戶端開始以5s的間隔持續向平台發送“Heart!”,若平台超過5s沒有響應,則客戶端注冊失敗,不再繼續發送信息。
此時我們更多的關注在於業務的響應時間,比如register和heart的回復時間。
實現代碼如下:

1 # locustfile.py 2 # coding:utf-8 3 from gevent import socket, monkey, event;monkey.patch_all() 4 import gevent 5 import socket 6 from locust import User, TaskSet, task, constant 7 import time 8 import queue 9 10 11 class TcpSocketClient(socket.socket): 12 13 def __init__(self, af_inet, socket_type, request_event, user): 14 super(TcpSocketClient, self).__init__(af_inet, socket_type) 15 self.connect_event = event.Event() 16 self.request_event = request_event 17 self.user = user 18 self.recv_queue = queue.Queue() 19 20 def connect(self, addr): 21 start_time = time.time() 22 try: 23 super(TcpSocketClient, self).connect(addr) 24 except Exception as e: 25 total_time = int((time.time() - start_time) * 1000) 26 self.request_event.fire(request_type="tcp", name="connect", response_time=total_time, response_length=0, 27 response=None, context=None, exception=e) 28 else: 29 self.connect_event.set() 30 total_time = int((time.time() - start_time) * 1000) 31 self.request_event.fire(request_type="tcp", name="connect", response_time=total_time, response_length=0, 32 response=None, context=None, exception=None) 33 34 def send(self, data, flag=False): 35 start_time = time.time() 36 try: 37 self.connect_event.wait() 38 super(TcpSocketClient, self).send(data) 39 total_time = int((time.time() - start_time) * 1000) 40 print(self.getsockname(), data) 41 self.request_event.fire(request_type="tcp", name="send", response_time=total_time, 42 response_length=len(data), response=None, context=None, exception=None) 43 except OSError as e: 44 total_time = int((time.time() - start_time) * 1000) 45 self.request_event.fire(request_type="tcp", name="send", response_time=total_time, 46 response_length=0, response=None, context=None, exception=e) 47 48 def recv(self, bufsize, flag=False): 49 start_time = time.time() 50 try: 51 while True: 52 start_time = time.time() 53 self.connect_event.wait() 54 data = super(TcpSocketClient, self).recv(bufsize) 55 total_time = int((time.time() - start_time) * 1000) 56 self.recv_queue.put_nowait(data) 57 self.request_event.fire(request_type="tcp", name="recv", response_time=total_time, 58 response_length=len(data), response=None, context=None, exception=None) 59 time.sleep(0.1) 60 except OSError as e: 61 total_time = int((time.time() - start_time) * 1000) 62 self.request_event.fire(request_type="tcp", name="recv", response_time=total_time, response_length=0, 63 response=None, context=None, exception=e) 64 65 66 class Device(TcpSocketClient): 67 def __init__(self, af_inet, socket_type, request_event, user): 68 super(Device, self).__init__(af_inet, socket_type, request_event, user) 69 self.is_registered = False 70 self.heart_dict = {} 71 72 def parse_data(self): 73 while True: 74 if not self.recv_queue.empty(): 75 data = self.recv_queue.get_nowait() 76 print(self.getsockname(), data) 77 if b"Success" in data: 78 self.is_registered = True 79 elif b"Heart Reply" in data: 80 start_time = self.heart_dict.get("Heart") 81 total_time = int((time.time() - start_time) * 1000) 82 self.request_event.fire(request_type="tcp", name="Heart", response_time=total_time, 83 response_length=0, response=None, context=None, exception=None) 84 time.sleep(0.1) 85 86 def register(self): 87 self.send(b"Register!") 88 89 def heart(self): 90 self.send(b"Heart!") 91 92 93 class TcpSocketUser(User): 94 95 abstract = True 96 97 client: socket.socket = None 98 99 def __init__(self, *args, **kwargs): 100 super(TcpSocketUser, self).__init__(*args, **kwargs) 101 self.client = Device(socket.AF_INET, socket.SOCK_STREAM, self.environment.events.request, self) 102 host, port = self.host.split(":") 103 self.client.connect((host, int(port))) 104 gevent.spawn(self.client.recv, 1024) 105 gevent.spawn(self.client.parse_data) 106 107 108 class TestTaskSet(TaskSet): 109 110 def on_start(self): 111 start_time = time.time() 112 try: 113 self.client.register() 114 count = 0 115 while not self.client.is_registered: 116 time.sleep(0.1) 117 count += 1 118 if count == 100: 119 raise TimeoutError("注冊失敗") 120 except Exception as e: 121 total_time = int((time.time() - start_time) * 1000) 122 self.client.request_event.fire(request_type="tcp", name="register", response_time=total_time, 123 response_length=0, response=None, context=None, exception=e) 124 else: 125 total_time = int((time.time() - start_time) * 1000) 126 self.client.request_event.fire(request_type="tcp", name="register", response_time=total_time, 127 response_length=0, response=None, context=None, exception=None) 128 129 @task(1) 130 def heart(self): 131 if self.client.is_registered: 132 self.client.heart() 133 self.client.heart_dict["Heart"] = time.time() 134 135 136 class TcpTestUser(TcpSocketUser): 137 138 wait_time = constant(5) 139 140 tasks = [TestTaskSet] 141 host = "172.16.100.139:8001" 142 143 144 if __name__ == "__main__": 145 import os 146 os.system("locust")

1 # Server.py 2 # coding:utf-8 3 import gevent 4 from gevent import socket, monkey 5 import time 6 7 monkey.patch_all() 8 9 10 def server(port): 11 try: 12 s = socket.socket() 13 s.bind(('0.0.0.0', port)) 14 s.listen(500) 15 while True: 16 cli, addr = s.accept() 17 gevent.spawn(handle_request, cli) 18 except KeyboardInterrupt as e: 19 print(e) 20 21 22 def handle_request(conn): 23 try: 24 while True: 25 data = conn.recv(1024) 26 if not data: 27 conn.close() 28 else: 29 print("recv:", data) 30 # time.sleep(5) 31 for i in data.split(b'!'): 32 if b"Register" in i: 33 conn.send(b"Success!") 34 pass 35 if b"Heart" in i: 36 conn.send(b"Heart Reply!") 37 38 except OSError as e: 39 print("client has been closed") 40 41 except Exception as ex: 42 print(ex) 43 finally: 44 conn.close() 45 46 47 if __name__ == '__main__': 48 server(8001)
至此,一個模擬客戶端並發連接平台的測試腳本案例完成。