Python Select模型


IO多路復用

IO多路復用就是我們經常說的select epoll.select和epoll的好處是單個process就可以同時處理多個網絡IO。基本原理是select\epoll會不斷的輪詢所負責的所有socket,當有某個socket數據到達了,就通知用戶進程。
下面是流程圖:

  當用戶進程調用了select,那么整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。

注意1:select函數返回結果中如果有文件可讀了,那么進程就可以通過調用accept()或recv()來讓kernel將位於內核中准備到的數據copy到用戶區。

注意2: select的優勢在於可以處理多個連接,不適用於單個連接

selectors

基於select模塊實現的IO多路復用

`

IO多路復用實現機制

在不同的平台上是不一樣的,win平台只有select,Linux平台有select poll epoll

  • win: select

  • linux : select poll epoll

通常是用戶空間創建fd,然后copy到內核空間

如果是開fd的數量多,select的的效率低

基於select模塊實現的IO多路復用

import selectors
import socket

sock = socket.socket()
sock.bind(("127.0.0.1", 8810))

sock.listen(5)  # 這里雖然設置了最大連接數,但是已經沒有限制了
sel = selectors.DefaultSelector()  # 實例化一個對象,會根據不同的平台自動設置優先級
# epoll|kqueue|devpoll > poll > select.  所以Linux系統會自動設置成epoll  win 自動設置成select

# 第二步
def read(conn, mask):
    # pass
    try:  # win 檢測異常  當有異常 如客戶端斷開的時候 
        data = conn.recv(1024)
        print(data)
        print(data.decode("utf-8"))
        data2 = input(">>>")
        conn.send(data2.encode("utf-8"))
    except Exception:
        sel.unregister(conn)  # 解除注冊


# 第一步
def accept(sock, mask):  # mask 是沒有用的
    conn, addr = sock.accept()
    # print(conn)
    sel.register(conn, selectors.EVENT_READ, read)  # 把conn 添加到列表中


# 首先要注冊 只是把sock和 accept綁定
sel.register(sock, selectors.EVENT_READ, accept)  # 注冊,但是沒有監聽accept函數

# 監聽
while 1:
    print("waiting...")
    # event 就是那個r
    events = sel.select()  # [(key,mask) ,(key,mask) ,(key,mask) ,)]  # 活活動的對象會自動添加到這里
    for key, mask in events:  # events 是個列表 需要遍歷
        print(key.data)  # 拿到accept函數
        print(key.fileobj)  # 當前的活動的對象 sock 文件句柄
        func = key.data  # 調用
        obj = key.fileobj  #
        func(obj, mask)  # 第一個參數是sock 對象
        # break

select缺點:

  • 每次調用slect都要將所有的fd拷貝到內核空間(每次都要拷貝),導致效率下降
  • 監聽的的實現是通過遍歷所有的fd,(遍歷消耗的時間消耗多)判斷是否有數據訪問
  • 最大連接數(input中放的文件描述符數量1024)

poll:

最大連接數沒有限制了,除此之外,和select一樣,所以基本不用

epoll:

內部通過3個函數實現(select是一個)

  • 第一個函數:
    創建epoll句柄,把所有的fd拷貝到內核空間,只需要拷貝一次

  • 第二個函數: 回掉函數
    某一個函數或者動作成功完成后,會自動觸發一個函數
    為所有的fd綁定一個回調函數,一旦有數據訪問,觸發改回調函數,回調函數把fd放到鏈表中。(只要有活動,把fd放到鏈表中,動態監聽)這樣就提高了效率
    例子:交試卷

  • 第三個函數,判斷鏈表是否為空

selectors.DefaultSelector()
selectors會根據自己的平台選擇最佳IO多路復用,自動選擇。win只有select
linux的中epoll中的優先級最高

隊列queue

和線程有關系的,在多線程的時候有用,保證信息安全的

隊列是一種數據類型

優點:
保證線程安全,不用自己加鎖

put get

先進先出

import queue

q = queue.Queue(3) # 默認是先進先出  FIFO  設置參數是最大的存放數量5

q.put(111)
q.put("hello")
q.put(222)

q.put(333,False)  # 默認blocking = True ,False 是當存滿的時候,自動報錯,解除阻塞的狀態

print(q.get())
print(q.get())
print(q.get())
print(q.get()) # 第4次已經拿不到了 取不到 默認阻塞
q.get(False) # 解除阻塞狀態

先進后出

q = queue.LifoQueue()
q.put(111)
q.put(222)
print(q.get())
print(q.get())

優先級

q = queue.PriorityQueue()

q.put([4,"hello4"])
q.put([1,"hello1"])
q.put([2,"hello2"])

print(q.get())
print(q.get())
print(q.get())

join 與task_done方法

import queue
q = queue.Queue(5)

q.put(111)
q.put(222)

q.get()
q.task_done()  #

q.get()
q.task_done()  #

q.join()  # 等待task_done  和events是一個原理
print('endnig')

join 與task_done方法必須配合使用

其他的用法

q.qsize() 返回隊列的大小
q.empty() 如果隊列為空,返回True,反之False
q.full() 如果隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 相當q.get(False)非阻塞 
q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之后,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列為空,再執行別的操作

生產者消費者模型

有生產數據的線程
有消費數據的線程

通過一個容器來解決生產者消費者強耦合的問題
這個容器是用來解耦的(類似吃飯的時候的服務員)
目錄結構也是一種解耦

下面是用隊列模擬實現

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1

p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM