IO模型

IO阻塞模型分类:
- 阻塞IO
- 非阻塞IO
- 多路复用IO
- 异步IO(爬虫阶段)
- 信号驱动IO(了解)
1、阻塞IO模型
socket模块默认是阻塞的,一个读操作流程如下:
问题:
同一时间只能服务一个客户端
解决办法:
1. 多线程
优点:如果并发量不高,效率是较高的,因为每个客户端都有单独线程来处理
缺点:不可能无限的开启线程,线程也需要占用资源
2. 多进程
优点:可以多个CPU并行处理
弊端:占用资源非常大,一旦客户端稍微多一点,立马就慢了
3.线程池
优点:保证了服务器正常运行,还帮你负责创建和销毁线程,以及任务分配
缺点:一旦并发量超出最大线程量,就只能等签名的运行完毕。
4. 协程
优点:不需要创建一段线程,也不需要在线程间做切换,没有数量限制
缺点:不能利用多核优势
结果:真正倒是效率低的是阻塞问题,但上述办法并没有真正的解决阻塞问题。
2、非阻塞IO模型
遇到IO操作也不阻塞,会继续执行。意味着即使遇到IO操作CPU执行权也不会被剥夺

从图中看出,非阻塞的recv系统调用之后,进程没有被阻塞,操作系统立马把结果返回给进程,如果数据还没准备好,则抛出异常,进程可以去做其他的事,然后在发起recv系统调用,重复上述过程(这个过程通常被称为轮询),一直到数据准备好,再拷贝数据到进程进行数据处理。需要注意,拷贝数据的整个过程,进程仍然是属于阻塞状态。
缺点: 占用CPU太多,原因是需要无限的循环去向操作系统拿数据。

import socket server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('127.0.0.1', 8080)) server.listen(5) server.setblocking(False) # 所有客户端的socket conns = [] # 所有需要返回数据的客户端 send_cs = [] while True: try: conn, client_addr = server.accept() print(client_addr,'已连接') conns.append(conn) except BlockingIOError: # 接收数据 for conn in conns[:]:# 和conns.copy()一样,原因:迭代过程不能删除元素 try: data = conn.recv(1024) print(data) send_cs.append((data, conn)) # send也是IO操作,在一些极端情况下,如系统缓存满了,肯定也会抛出异常 # 所以,send要单拿出来处理 except BlockingIOError: continue except ConnectionResetError: conn.close() conns.remove(conn) # 发送数据 for item in send_cs[:]: data, conn = item try: conn.send(data.upper()) # 如果发送成功就把数据从列表中删除 send_cs.remove(item) except BlockingIOError: # 如果缓冲区满了 就下次再发 continue except ConnectionResetError: conn.close() send_cs.remove(item) conns.remove(conn)
3、多路复用IO
用一个线程来处理并发所有的客户端。
需要使用select模块,select原理:把所有的socket交给select,select会不断轮询所负责的所有socket,当某个socket有数据到达,就通知进程继续执行后面代码。
流程:程序发起一个select调用,select使整个进程阻塞,直到有socket准备就绪,select就返回,这个时候进程在调用read操作,直接从缓冲中把数据拷贝到进程。流程图如下:

import socket import select server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(("127.0.0.1", 8888)) server.listen() r_list = [server] # 监测是否收到数据的客户端 w_list = [] # 监测是否需要发送数据的客户端 x_list = [] # 用来发送数据 data_dic = {} while True: readables, writeables, _ = select.select(r_list, w_list, x_list) # 接收数据以及建立连接 for conn in readables: if conn == server: new_conn, _ = conn.accept() r_list.append(new_conn) else: try: data = conn.recv(1024) if not data: conn.close() r_list.remove(conn) continue print(data) # 发送数据 w_list.append(conn) data_dic[conn] = data except ConnectionResetError: conn.close() r_list.remove(conn) # 发送数据 for conn in writeables: try: conn.send(data_dic[conn].upper()) except ConnectionResetError: conn.close() finally: data_dic.pop(conn) w_list.remove(conn)

import socket import threading from threading import Thread def communication(): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('127.0.0.1', 8888)) while True: msg = "%s say hello for you"%threading.current_thread() if not msg: continue client.send(msg.encode("utf-8")) data = client.recv(1024) print(data.decode("utf-8")) for i in range(100): Thread(target=communication).start()
强调:select的优势在于可以处理多个连接,并不适用于单个连接
优点:占用资源少,不消耗太多CPU,同时能够为多个客户端提供服务。(适用于简单的事件驱动服务器)
缺点:需要消耗大量时间区轮询各个socket,更好的选择时epoll,其次把事件探测和响应夹杂在一起,耦合性增加