一、背景
打算使用locust做并发,实现性能测试,网上找到的大部分都是http协议的测试,但被测平台是类似于IOT的平台,压力不在于http的访问,而在于终端设备的连接与数据发送,于是就想基于locust中http的使用方式,做一个tcp的并发脚本。
二、HttpUser
locust执行并发的脚本大致可分为两部分,TaskSet和User。而HttpUser是继承于User类,实现了http访问相关的一些方法,只要继承该类就可以很方便地进行并发测试,此时用户只需要关注业务的实现即可,业务中方法的调用也和requests模块的一致。
我们发现继承自TaskSet的类中,可以直接调用 self.client 方法进行http的访问,于是追踪 self.client 方法的源码。

1 @property 2 def client(self): 3 """ 4 Shortcut to the client :py:attr:`client <locust.User.client>` attribute of this TaskSet's :py:class:`User <locust.User>` 5 """ 6 return self.user.client
返回的是 self.user.client,继续追踪 self.user

1 @property 2 def user(self): 3 """:py:class:`User <locust.User>` instance that this TaskSet was created by""" 4 return self._user
继续追踪 self._user

1 def __init__(self, parent): 2 self._task_queue = [] 3 self._time_start = time() 4 5 if isinstance(parent, TaskSet): 6 self._user = parent.user 7 else: 8 self._user = parent 9 10 self._parent = parent
在继承自TaskSet的类中打印 self.parent 可以发现,他就是继承自 HttpUser的类,这时候关键点就来到了HttpUser,此时可以认为self.client就是HttpUser.client
点进HttpUser查看源码。

1 class HttpUser(User): 2 """ 3 Represents an HTTP "user" which is to be spawned and attack the system that is to be load tested. 4 5 The behaviour of this user is defined by its tasks. Tasks can be declared either directly on the 6 class by using the :py:func:`@task decorator <locust.task>` on methods, or by setting 7 the :py:attr:`tasks attribute <locust.User.tasks>`. 8 9 This class creates a *client* attribute on instantiation which is an HTTP client with support 10 for keeping a user session between requests. 11 """ 12 13 abstract = True 14 """If abstract is True, the class is meant to be subclassed, and users will not choose this locust during a test""" 15 16 client: HttpSession = None 17 """ 18 Instance of HttpSession that is created upon instantiation of Locust. 19 The client supports cookies, and therefore keeps the session between HTTP requests. 20 """ 21 22 def __init__(self, *args, **kwargs): 23 super().__init__(*args, **kwargs) 24 if self.host is None: 25 raise LocustError( 26 "You must specify the base host. Either in the host attribute in the User class, or on the command line using the --host option." 27 ) 28 29 session = HttpSession( 30 base_url=self.host, 31 request_event=self.environment.events.request, 32 user=self, 33 ) 34 session.trust_env = False 35 self.client = session
发现 HttpUser确实有个client属性,该属性是一个HttpSession类的实例化对象。
点进HttpSession类,可以发现,该类是继承自requests.Session,而request正是实现http请求的常用模块。
三、TcpSocketUser构造
至此,我们可以大胆猜测,是不是只要构造一个 TcpSocketUser继承自User,并且将一个实现了socket方法的实例化对象赋值给TcpSocketUser.client方法即可呢?

1 # locustfile.py 2 # coding:utf-8 3 import socket 4 from locust import User, TaskSet, task 5 import time 6 7 8 class TcpSocketUser(User): 9 10 abstract = True 11 12 client: socket.socket = None 13 14 def __init__(self, *args, **kwargs): 15 super(TcpSocketUser, self).__init__(*args, **kwargs) 16 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 17 host, port = self.host.split(":") 18 self.client.connect((host, int(port))) 19 20 21 class TestTaskSet(TaskSet): 22 23 @task(1) 24 def send_data(self): 25 while True: 26 self.client.send(b"This is send data!") 27 data = self.client.recv(1024) 28 print(data) 29 time.sleep(3) 30 31 32 class TcpTestUser(TcpSocketUser): 33 34 tasks = [TestTaskSet] 35 host = "172.16.100.139:8001" 36 37 38 if __name__ == "__main__": 39 import os 40 os.system("locust")

1 # Server.py 2 # coding:utf-8 3 import gevent 4 from gevent import socket, monkey 5 6 monkey.patch_all() 7 8 9 def server(port): 10 try: 11 s = socket.socket() 12 s.bind(('0.0.0.0', port)) 13 s.listen(500) 14 while True: 15 cli, addr = s.accept() 16 gevent.spawn(handle_request, cli) 17 except KeyboardInterrupt as e: 18 print(e) 19 20 21 def handle_request(conn): 22 try: 23 while True: 24 data = conn.recv(1024) 25 if not data: 26 conn.close() 27 else: 28 print("recv:", data) 29 conn.send(b"Received!") 30 except OSError as e: 31 print("client has been closed") 32 33 except Exception as ex: 34 print(ex) 35 finally: 36 conn.close() 37 38 39 if __name__ == '__main__': 40 server(8001)
以上的代码实现了功能,客户端定时向服务端发送数据,服务端收到后进行回复,运行后发现可行,此时初步的并发框架已经完成。
此时想实现第二个需求,客户端每隔5s同时向服务器发送两条数据 “This is First data!” 和 “This is Second data!”,但是服务端因为要处理的原因,只能20s后才能返回结果,实现代码如下。

1 # locustfile.py 2 # coding:utf-8 3 import socket 4 from locust import User, TaskSet, task, constant 5 import time 6 7 8 class TcpSocketUser(User): 9 10 abstract = True 11 12 client: socket.socket = None 13 14 def __init__(self, *args, **kwargs): 15 super(TcpSocketUser, self).__init__(*args, **kwargs) 16 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 17 host, port = self.host.split(":") 18 self.client.connect((host, int(port))) 19 20 21 class TestTaskSet(TaskSet): 22 23 @task(1) 24 def send_data(self): 25 send_data = b"This is First data!" 26 self.client.send(send_data) 27 print(send_data) 28 send_data = b"This is Second data!" 29 self.client.send(send_data) 30 print(send_data) 31 recv_data = self.client.recv(1024) 32 print(recv_data) 33 34 35 class TcpTestUser(TcpSocketUser): 36 37 wait_time = constant(5) 38 39 tasks = [TestTaskSet] 40 host = "172.16.100.139:8001" 41 42 43 if __name__ == "__main__": 44 import os 45 os.system("locust")

1 # Server.py 2 # coding:utf-8 3 import gevent 4 from gevent import socket, monkey 5 import time 6 7 monkey.patch_all() 8 9 10 def server(port): 11 try: 12 s = socket.socket() 13 s.bind(('0.0.0.0', port)) 14 s.listen(500) 15 while True: 16 cli, addr = s.accept() 17 gevent.spawn(handle_request, cli) 18 except KeyboardInterrupt as e: 19 print(e) 20 21 22 def handle_request(conn): 23 try: 24 while True: 25 data = conn.recv(1024) 26 if not data: 27 conn.close() 28 else: 29 print("recv:", data) 30 time.sleep(20) 31 if b"First" in data: 32 conn.send(b"First Received!") 33 elif b'Second' in data: 34 conn.send(b"Second Received!") 35 except OSError as e: 36 print("client has been closed") 37 38 except Exception as ex: 39 print(ex) 40 finally: 41 conn.close() 42 43 44 if __name__ == '__main__': 45 server(8001)
运行起来会发现,当客户端数据发送完成之后,只能等待20s,收到服务端的答复后才能返回开始下一次数据发送,这是因为recv默认是个阻塞方法,当没有收到数据时会一直阻塞在此,影响下一次任务的开始,此时我们要解决阻塞的问题,可以使用recv的block=False,但官方文档在Testing non-HTTP systems 章节提到,协议库要用可以猴子补丁的。
所以我们选择使用gevent的方法来进行处理。
四、gevent介绍
gevent是基于协程的Python网络库。特点:
- 基于libev的快速事件循环(Linux上epoll,FreeBSD上kqueue)。
- 基于greenlet的轻量级执行单元。
- API的概念和Python标准库一致(如事件,队列)。
- 可以配合socket,ssl模块使用。
- 能够使用标准库和第三方模块创建标准的阻塞套接字(gevent.monkey)。
- 默认通过线程池进行DNS查询,也可通过c-are(通过GEVENT_RESOLVER=ares环境变量开启)。
- TCP/UDP/HTTP服务器
- 子进程支持(通过gevent.subprocess)
- 线程池
我们主要是用到上面标注红色的特点,该特点是结合猴子补丁一起使用的,猴子补丁的作用是将一些阻塞的方法/模块动态替换成非阻塞的,比如 socket、time等。
五、非阻塞代码

1 # locustfile.py 2 # coding:utf-8 3 from gevent import socket, monkey;monkey.patch_all() 4 import gevent 5 import socket 6 from locust import User, TaskSet, task, constant 7 import time 8 9 10 class TcpSocketUser(User): 11 12 abstract = True 13 14 client: socket.socket = None 15 16 def __init__(self, *args, **kwargs): 17 super(TcpSocketUser, self).__init__(*args, **kwargs) 18 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 19 host, port = self.host.split(":") 20 self.client.connect((host, int(port))) 21 gevent.spawn(handle_recv, self.client) 22 23 24 class TestTaskSet(TaskSet): 25 26 @task(1) 27 def send_data(self): 28 send_data = b"This is First data!" 29 self.client.send(send_data) 30 print(send_data) 31 send_data = b"This is Second data!" 32 self.client.send(send_data) 33 print(send_data) 34 35 36 def handle_recv(conn): 37 while True: 38 data = conn.recv(1024) 39 if not data: 40 conn.close() 41 else: 42 print(data) 43 time.sleep(0.1) 44 45 46 class TcpTestUser(TcpSocketUser): 47 48 wait_time = constant(5) 49 50 tasks = [TestTaskSet] 51 host = "172.16.100.139:8001" 52 53 54 if __name__ == "__main__": 55 import os 56 os.system("locust")

1 # Server.py 2 # coding:utf-8 3 import gevent 4 from gevent import socket, monkey 5 import time 6 7 monkey.patch_all() 8 9 10 def server(port): 11 try: 12 s = socket.socket() 13 s.bind(('0.0.0.0', port)) 14 s.listen(500) 15 while True: 16 cli, addr = s.accept() 17 gevent.spawn(handle_request, cli) 18 except KeyboardInterrupt as e: 19 print(e) 20 21 22 def handle_request(conn): 23 try: 24 while True: 25 data = conn.recv(1024) 26 if not data: 27 conn.close() 28 else: 29 print("recv:", data) 30 time.sleep(20) 31 for i in data.split(b'!'): 32 if b"First" in i: 33 conn.send(b"First Received!") 34 elif b'Second' in i: 35 conn.send(b"Second Received!") 36 except OSError as e: 37 print("client has been closed") 38 39 except Exception as ex: 40 print(ex) 41 finally: 42 conn.close() 43 44 45 if __name__ == '__main__': 46 server(8001)
以上代码可实现上述第二个需求,即 客户端每隔5s同时向服务器发送两条数据 “This is First data!” 和 “This is Second data!”,但是服务端因为要处理的原因,只能20s后才能返回结果。
至此,已经可以实现并发了。
六、并发结果注册到测试报告
我们继续回到HttpUser源码中查看。
HttpSession实例化时传入三个参数,self.host,self.environment.events.request,self,跟报告相关的主要是第二个参数,我们到HttpSession中进行查看。
发现两个跟报告相关的地方。

1 request_meta = { 2 "request_type": method, 3 "response_time": response_time, 4 "name": name or url_after_redirect, 5 "context": context, 6 "response": response, 7 "exception": None, 8 "start_time": start_time, 9 "url": request_after_redirect.url, 10 }

1 if catch_response: 2 return ResponseContextManager(response, request_event=self.request_event, request_meta=request_meta)
request_meta中的各个key与报告中的每一列是一一对应的。
因此我们认为 ResponseContextManager 类中就含有将结果注册到测试报告的相关代码,于是我们继续到 ResponseContextManager 源码中查看,果然发现了相关的代码。

1 def _report_request(self, exc=None): 2 self._request_event.fire(**self.request_meta)
self._request_event指向的就是self.environment.events.request,所以fire方法就可以将request_meta的数据注册到报告中。
此时我们做一个小调整,新建一个TcpSocketClient类继承自socket类,重写connect、recv、send等方法,使其可以注册到报告中。
request_meta中大部分参数的含义都比较清晰,这里不做一一解释(本文未用到response和context),但问题在于在注册到报告中,如何确认是成功或失败呢?
查看locust官方文档,可以看到有这么一段。
Parameters request_type – Request type method used name – Path to the URL that was called (or override name if it was used in the call to the client) response_time – Time in milliseconds until exception was thrown response_length – Content-length of the response response – Response object (e.g. a requests.Response) context – User/request context exception – Exception instance that was thrown. None if request was successful.
其中exception用法写到,该参数传入的是方法执行错误产生时所抛出的错误实例,如果方法执行成功则填为None,至此,大部分代码都已完成,我们看下代码和结果。

1 # locustfile.py 2 # coding:utf-8 3 from gevent import socket, monkey, event;monkey.patch_all() 4 import gevent 5 import socket 6 from locust import User, TaskSet, task, constant 7 import time 8 9 10 class TcpSocketClient(socket.socket): 11 12 def __init__(self, af_inet, socket_type, request_event, user): 13 super(TcpSocketClient, self).__init__(af_inet, socket_type) 14 self.connect_event = event.Event() 15 self.request_event = request_event 16 self.user = user 17 18 def connect(self, addr): 19 start_time = time.time() 20 try: 21 super(TcpSocketClient, self).connect(addr) 22 except Exception as e: 23 total_time = int((time.time() - start_time) * 1000) 24 self.request_event.fire(request_type="tcp", name="connect", response_time=total_time, response_length=0, 25 response=None, context=None, exception=e) 26 else: 27 self.connect_event.set() 28 total_time = int((time.time() - start_time) * 1000) 29 self.request_event.fire(request_type="tcp", name="connect", response_time=total_time, response_length=0, 30 response=None, context=None, exception=None) 31 32 def send(self, data, flag=False): 33 start_time = time.time() 34 try: 35 self.connect_event.wait() 36 super(TcpSocketClient, self).send(data) 37 total_time = int((time.time() - start_time) * 1000) 38 print(data) 39 self.request_event.fire(request_type="tcp", name="send", response_time=total_time, 40 response_length=len(data), response=None, context=None, exception=None) 41 except OSError as e: 42 total_time = int((time.time() - start_time) * 1000) 43 self.request_event.fire(request_type="tcp", name="send", response_time=total_time, 44 response_length=0, response=None, context=None, exception=e) 45 46 def recv(self, bufsize, flag=False): 47 start_time = time.time() 48 try: 49 while True: 50 start_time = time.time() 51 self.connect_event.wait() 52 data = super(TcpSocketClient, self).recv(bufsize) 53 total_time = int((time.time() - start_time) * 1000) 54 print(data) 55 self.request_event.fire(request_type="tcp", name="recv", response_time=total_time, 56 response_length=len(data), response=None, context=None, exception=None) 57 time.sleep(0.1) 58 except OSError as e: 59 total_time = int((time.time() - start_time) * 1000) 60 self.request_event.fire(request_type="tcp", name="recv", response_time=total_time, response_length=0, 61 response=None, context=None, exception=e) 62 63 64 class TcpSocketUser(User): 65 66 abstract = True 67 68 client: socket.socket = None 69 70 def __init__(self, *args, **kwargs): 71 super(TcpSocketUser, self).__init__(*args, **kwargs) 72 self.client = TcpSocketClient(socket.AF_INET, socket.SOCK_STREAM, self.environment.events.request, self) 73 host, port = self.host.split(":") 74 self.client.connect((host, int(port))) 75 gevent.spawn(self.client.recv, 1024) 76 77 78 class TestTaskSet(TaskSet): 79 80 @task(1) 81 def send_data(self): 82 send_data = b"This is First data!" 83 self.client.send(send_data) 84 send_data = b"This is Second data!" 85 self.client.send(send_data) 86 87 88 class TcpTestUser(TcpSocketUser): 89 90 wait_time = constant(5) 91 92 tasks = [TestTaskSet] 93 host = "172.16.100.139:8001" 94 95 96 if __name__ == "__main__": 97 import os 98 os.system("locust")

1 # Server.py 2 # coding:utf-8 3 import gevent 4 from gevent import socket, monkey 5 import time 6 7 monkey.patch_all() 8 9 10 def server(port): 11 try: 12 s = socket.socket() 13 s.bind(('0.0.0.0', port)) 14 s.listen(500) 15 while True: 16 cli, addr = s.accept() 17 gevent.spawn(handle_request, cli) 18 except KeyboardInterrupt as e: 19 print(e) 20 21 22 def handle_request(conn): 23 try: 24 while True: 25 data = conn.recv(1024) 26 if not data: 27 conn.close() 28 else: 29 print("recv:", data) 30 # time.sleep(5) 31 for i in data.split(b'!'): 32 if b"First" in i: 33 conn.send(b"First Received!") 34 elif b'Second' in i: 35 conn.send(b"Second Received!") 36 except OSError as e: 37 print("client has been closed") 38 39 except Exception as ex: 40 print(ex) 41 finally: 42 conn.close() 43 44 45 if __name__ == '__main__': 46 server(8001)
七、业务融合
这里又出现一个问题,注册到报告中的信息,更多是想关注业务的成功与失败,而不是底层的connect、send、recv这些,那该怎么办呢?
因为我们要模拟的是终端客户向平台发送数据,所以我们新建一个Device类,该类继承自TcpSocketClient,并且会扩展实际的业务。
现在假设我们有这么一个需求,客户端连接平台后要先发送一条“Register!”指令向平台注册,如果平台回复“Success”,则客户端开始以5s的间隔持续向平台发送“Heart!”,若平台超过5s没有响应,则客户端注册失败,不再继续发送信息。
此时我们更多的关注在于业务的响应时间,比如register和heart的回复时间。
实现代码如下:

1 # locustfile.py 2 # coding:utf-8 3 from gevent import socket, monkey, event;monkey.patch_all() 4 import gevent 5 import socket 6 from locust import User, TaskSet, task, constant 7 import time 8 import queue 9 10 11 class TcpSocketClient(socket.socket): 12 13 def __init__(self, af_inet, socket_type, request_event, user): 14 super(TcpSocketClient, self).__init__(af_inet, socket_type) 15 self.connect_event = event.Event() 16 self.request_event = request_event 17 self.user = user 18 self.recv_queue = queue.Queue() 19 20 def connect(self, addr): 21 start_time = time.time() 22 try: 23 super(TcpSocketClient, self).connect(addr) 24 except Exception as e: 25 total_time = int((time.time() - start_time) * 1000) 26 self.request_event.fire(request_type="tcp", name="connect", response_time=total_time, response_length=0, 27 response=None, context=None, exception=e) 28 else: 29 self.connect_event.set() 30 total_time = int((time.time() - start_time) * 1000) 31 self.request_event.fire(request_type="tcp", name="connect", response_time=total_time, response_length=0, 32 response=None, context=None, exception=None) 33 34 def send(self, data, flag=False): 35 start_time = time.time() 36 try: 37 self.connect_event.wait() 38 super(TcpSocketClient, self).send(data) 39 total_time = int((time.time() - start_time) * 1000) 40 print(self.getsockname(), data) 41 self.request_event.fire(request_type="tcp", name="send", response_time=total_time, 42 response_length=len(data), response=None, context=None, exception=None) 43 except OSError as e: 44 total_time = int((time.time() - start_time) * 1000) 45 self.request_event.fire(request_type="tcp", name="send", response_time=total_time, 46 response_length=0, response=None, context=None, exception=e) 47 48 def recv(self, bufsize, flag=False): 49 start_time = time.time() 50 try: 51 while True: 52 start_time = time.time() 53 self.connect_event.wait() 54 data = super(TcpSocketClient, self).recv(bufsize) 55 total_time = int((time.time() - start_time) * 1000) 56 self.recv_queue.put_nowait(data) 57 self.request_event.fire(request_type="tcp", name="recv", response_time=total_time, 58 response_length=len(data), response=None, context=None, exception=None) 59 time.sleep(0.1) 60 except OSError as e: 61 total_time = int((time.time() - start_time) * 1000) 62 self.request_event.fire(request_type="tcp", name="recv", response_time=total_time, response_length=0, 63 response=None, context=None, exception=e) 64 65 66 class Device(TcpSocketClient): 67 def __init__(self, af_inet, socket_type, request_event, user): 68 super(Device, self).__init__(af_inet, socket_type, request_event, user) 69 self.is_registered = False 70 self.heart_dict = {} 71 72 def parse_data(self): 73 while True: 74 if not self.recv_queue.empty(): 75 data = self.recv_queue.get_nowait() 76 print(self.getsockname(), data) 77 if b"Success" in data: 78 self.is_registered = True 79 elif b"Heart Reply" in data: 80 start_time = self.heart_dict.get("Heart") 81 total_time = int((time.time() - start_time) * 1000) 82 self.request_event.fire(request_type="tcp", name="Heart", response_time=total_time, 83 response_length=0, response=None, context=None, exception=None) 84 time.sleep(0.1) 85 86 def register(self): 87 self.send(b"Register!") 88 89 def heart(self): 90 self.send(b"Heart!") 91 92 93 class TcpSocketUser(User): 94 95 abstract = True 96 97 client: socket.socket = None 98 99 def __init__(self, *args, **kwargs): 100 super(TcpSocketUser, self).__init__(*args, **kwargs) 101 self.client = Device(socket.AF_INET, socket.SOCK_STREAM, self.environment.events.request, self) 102 host, port = self.host.split(":") 103 self.client.connect((host, int(port))) 104 gevent.spawn(self.client.recv, 1024) 105 gevent.spawn(self.client.parse_data) 106 107 108 class TestTaskSet(TaskSet): 109 110 def on_start(self): 111 start_time = time.time() 112 try: 113 self.client.register() 114 count = 0 115 while not self.client.is_registered: 116 time.sleep(0.1) 117 count += 1 118 if count == 100: 119 raise TimeoutError("注册失败") 120 except Exception as e: 121 total_time = int((time.time() - start_time) * 1000) 122 self.client.request_event.fire(request_type="tcp", name="register", response_time=total_time, 123 response_length=0, response=None, context=None, exception=e) 124 else: 125 total_time = int((time.time() - start_time) * 1000) 126 self.client.request_event.fire(request_type="tcp", name="register", response_time=total_time, 127 response_length=0, response=None, context=None, exception=None) 128 129 @task(1) 130 def heart(self): 131 if self.client.is_registered: 132 self.client.heart() 133 self.client.heart_dict["Heart"] = time.time() 134 135 136 class TcpTestUser(TcpSocketUser): 137 138 wait_time = constant(5) 139 140 tasks = [TestTaskSet] 141 host = "172.16.100.139:8001" 142 143 144 if __name__ == "__main__": 145 import os 146 os.system("locust")

1 # Server.py 2 # coding:utf-8 3 import gevent 4 from gevent import socket, monkey 5 import time 6 7 monkey.patch_all() 8 9 10 def server(port): 11 try: 12 s = socket.socket() 13 s.bind(('0.0.0.0', port)) 14 s.listen(500) 15 while True: 16 cli, addr = s.accept() 17 gevent.spawn(handle_request, cli) 18 except KeyboardInterrupt as e: 19 print(e) 20 21 22 def handle_request(conn): 23 try: 24 while True: 25 data = conn.recv(1024) 26 if not data: 27 conn.close() 28 else: 29 print("recv:", data) 30 # time.sleep(5) 31 for i in data.split(b'!'): 32 if b"Register" in i: 33 conn.send(b"Success!") 34 pass 35 if b"Heart" in i: 36 conn.send(b"Heart Reply!") 37 38 except OSError as e: 39 print("client has been closed") 40 41 except Exception as ex: 42 print(ex) 43 finally: 44 conn.close() 45 46 47 if __name__ == '__main__': 48 server(8001)
至此,一个模拟客户端并发连接平台的测试脚本案例完成。