python并发编程之IO阻塞


IO模型

  解决IO问题的方式方法
  问题是:IO操作阻塞程序执行
  解决的也仅仅是网络IO操作
 
  一般数据传输经历的两个阶段,如图:

 

  IO阻塞模型分类:

  1. 阻塞IO
  2. 非阻塞IO
  3. 多路复用IO
  4. 异步IO(爬虫阶段)
  5. 信号驱动IO(了解)

1、阻塞IO模型

  socket模块默认是阻塞的,一个读操作流程如下:

  问题:

    同一时间只能服务一个客户端

  解决办法:

    1. 多线程

      优点:如果并发量不高,效率是较高的,因为每个客户端都有单独线程来处理

      缺点:不可能无限的开启线程,线程也需要占用资源

    2. 多进程

      优点:可以多个CPU并行处理

      弊端:占用资源非常大,一旦客户端稍微多一点,立马就慢了

    3.线程池

      优点:保证了服务器正常运行,还帮你负责创建和销毁线程,以及任务分配

      缺点:一旦并发量超出最大线程量,就只能等签名的运行完毕。

    4. 协程

      优点:不需要创建一段线程,也不需要在线程间做切换,没有数量限制

      缺点:不能利用多核优势

    结果:真正倒是效率低的是阻塞问题,但上述办法并没有真正的解决阻塞问题。

2、非阻塞IO模型

  遇到IO操作也不阻塞,会继续执行。意味着即使遇到IO操作CPU执行权也不会被剥夺

  方法:设置socket使其变为non-blocking,即server.setblocking(False),具体流程如下:

  从图中看出,非阻塞的recv系统调用之后,进程没有被阻塞,操作系统立马把结果返回给进程,如果数据还没准备好,则抛出异常,进程可以去做其他的事,然后在发起recv系统调用,重复上述过程(这个过程通常被称为轮询),一直到数据准备好,再拷贝数据到进程进行数据处理。需要注意,拷贝数据的整个过程,进程仍然是属于阻塞状态。

  缺点: 占用CPU太多,原因是需要无限的循环去向操作系统拿数据。

import socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)
# 所有客户端的socket
conns = []
# 所有需要返回数据的客户端
send_cs = []

while True:
    try:
        conn, client_addr = server.accept()
        print(client_addr,'已连接')
        conns.append(conn)
    except BlockingIOError:
        # 接收数据
        for conn in conns[:]:# 和conns.copy()一样,原因:迭代过程不能删除元素
            try:
                data = conn.recv(1024)
                print(data)
                send_cs.append((data, conn))
                # send也是IO操作,在一些极端情况下,如系统缓存满了,肯定也会抛出异常
                # 所以,send要单拿出来处理
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                conns.remove(conn)
        # 发送数据
        for item in send_cs[:]:
            data, conn = item
            try:
                conn.send(data.upper())
                # 如果发送成功就把数据从列表中删除
                send_cs.remove(item)
            except BlockingIOError: # 如果缓冲区满了 就下次再发
                continue
            except ConnectionResetError:
                conn.close()
                send_cs.remove(item)
                conns.remove(conn)
服务端代码

3、多路复用IO

  用一个线程来处理并发所有的客户端。

  需要使用select模块,select原理:把所有的socket交给select,select会不断轮询所负责的所有socket,当某个socket有数据到达,就通知进程继续执行后面代码。

  流程:程序发起一个select调用,select使整个进程阻塞,直到有socket准备就绪,select就返回,这个时候进程在调用read操作,直接从缓冲中把数据拷贝到进程。流程图如下:

import socket
import select

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 8888))
server.listen()

r_list = [server] # 监测是否收到数据的客户端
w_list = []       # 监测是否需要发送数据的客户端
x_list = []

# 用来发送数据
data_dic = {}

while True:
    readables, writeables, _ = select.select(r_list, w_list, x_list)
    # 接收数据以及建立连接
    for conn in readables:
        if conn == server:
            new_conn, _ = conn.accept()
            r_list.append(new_conn)
        else:
            try:
                data = conn.recv(1024)
                if not data:
                    conn.close()
                    r_list.remove(conn)
                    continue
                print(data)
                # 发送数据
                w_list.append(conn)
                data_dic[conn] = data
            except ConnectionResetError:
                conn.close()
                r_list.remove(conn)
    # 发送数据
    for conn in writeables:
        try:
            conn.send(data_dic[conn].upper())
        except ConnectionResetError:
            conn.close()
        finally:
            data_dic.pop(conn)
            w_list.remove(conn)
服务端代码
import socket
import threading
from  threading import Thread

def communication():
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('127.0.0.1', 8888))
    while True:
        msg = "%s say hello for you"%threading.current_thread()
        if not msg:
            continue
        client.send(msg.encode("utf-8"))
        data = client.recv(1024)
        print(data.decode("utf-8"))


for i in range(100):
    Thread(target=communication).start()
客户端代码

  强调:select的优势在于可以处理多个连接,并不适用于单个连接

  优点:占用资源少,不消耗太多CPU,同时能够为多个客户端提供服务。(适用于简单的事件驱动服务器)

  缺点:需要消耗大量时间区轮询各个socket,更好的选择时epoll,其次把事件探测和响应夹杂在一起,耦合性增加

 

 

 

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM