IO多路復用
IO多路復用就是我們經常說的select epoll.select和epoll的好處是單個process就可以同時處理多個網絡IO。基本原理是select\epoll會不斷的輪詢所負責的所有socket,當有某個socket數據到達了,就通知用戶進程。
下面是流程圖:
當用戶進程調用了select,那么整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
注意1:select函數返回結果中如果有文件可讀了,那么進程就可以通過調用accept()或recv()來讓kernel將位於內核中准備到的數據copy到用戶區。
注意2: select的優勢在於可以處理多個連接,不適用於單個連接
selectors
基於select模塊實現的IO多路復用
`
IO多路復用實現機制
在不同的平台上是不一樣的,win平台只有select,Linux平台有select poll epoll
-
win: select
-
linux : select poll epoll
通常是用戶空間創建fd,然后copy到內核空間
如果是開fd的數量多,select的的效率低
基於select模塊實現的IO多路復用
import selectors
import socket
sock = socket.socket()
sock.bind(("127.0.0.1", 8810))
sock.listen(5) # 這里雖然設置了最大連接數,但是已經沒有限制了
sel = selectors.DefaultSelector() # 實例化一個對象,會根據不同的平台自動設置優先級
# epoll|kqueue|devpoll > poll > select. 所以Linux系統會自動設置成epoll win 自動設置成select
# 第二步
def read(conn, mask):
# pass
try: # win 檢測異常 當有異常 如客戶端斷開的時候
data = conn.recv(1024)
print(data)
print(data.decode("utf-8"))
data2 = input(">>>")
conn.send(data2.encode("utf-8"))
except Exception:
sel.unregister(conn) # 解除注冊
# 第一步
def accept(sock, mask): # mask 是沒有用的
conn, addr = sock.accept()
# print(conn)
sel.register(conn, selectors.EVENT_READ, read) # 把conn 添加到列表中
# 首先要注冊 只是把sock和 accept綁定
sel.register(sock, selectors.EVENT_READ, accept) # 注冊,但是沒有監聽accept函數
# 監聽
while 1:
print("waiting...")
# event 就是那個r
events = sel.select() # [(key,mask) ,(key,mask) ,(key,mask) ,)] # 活活動的對象會自動添加到這里
for key, mask in events: # events 是個列表 需要遍歷
print(key.data) # 拿到accept函數
print(key.fileobj) # 當前的活動的對象 sock 文件句柄
func = key.data # 調用
obj = key.fileobj #
func(obj, mask) # 第一個參數是sock 對象
# break
select缺點:
- 每次調用slect都要將所有的fd拷貝到內核空間(每次都要拷貝),導致效率下降
- 監聽的的實現是通過遍歷所有的fd,(遍歷消耗的時間消耗多)判斷是否有數據訪問
- 最大連接數(input中放的文件描述符數量1024)
poll:
最大連接數沒有限制了,除此之外,和select一樣,所以基本不用
epoll:
內部通過3個函數實現(select是一個)
-
第一個函數:
創建epoll句柄,把所有的fd拷貝到內核空間,只需要拷貝一次 -
第二個函數: 回掉函數
某一個函數或者動作成功完成后,會自動觸發一個函數
為所有的fd綁定一個回調函數,一旦有數據訪問,觸發改回調函數,回調函數把fd放到鏈表中。(只要有活動,把fd放到鏈表中,動態監聽)這樣就提高了效率
例子:交試卷 -
第三個函數,判斷鏈表是否為空
selectors.DefaultSelector()
selectors會根據自己的平台選擇最佳IO多路復用,自動選擇。win只有select
linux的中epoll中的優先級最高
隊列queue
和線程有關系的,在多線程的時候有用,保證信息安全的
隊列是一種數據類型
優點:
保證線程安全,不用自己加鎖
put get
先進先出
import queue
q = queue.Queue(3) # 默認是先進先出 FIFO 設置參數是最大的存放數量5
q.put(111)
q.put("hello")
q.put(222)
q.put(333,False) # 默認blocking = True ,False 是當存滿的時候,自動報錯,解除阻塞的狀態
print(q.get())
print(q.get())
print(q.get())
print(q.get()) # 第4次已經拿不到了 取不到 默認阻塞
q.get(False) # 解除阻塞狀態
先進后出
q = queue.LifoQueue()
q.put(111)
q.put(222)
print(q.get())
print(q.get())
優先級
q = queue.PriorityQueue()
q.put([4,"hello4"])
q.put([1,"hello1"])
q.put([2,"hello2"])
print(q.get())
print(q.get())
print(q.get())
join 與task_done方法
import queue
q = queue.Queue(5)
q.put(111)
q.put(222)
q.get()
q.task_done() #
q.get()
q.task_done() #
q.join() # 等待task_done 和events是一個原理
print('endnig')
join 與task_done方法必須配合使用
其他的用法
q.qsize() 返回隊列的大小
q.empty() 如果隊列為空,返回True,反之False
q.full() 如果隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 相當q.get(False)非阻塞
q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之后,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列為空,再執行別的操作
生產者消費者模型
有生產數據的線程
有消費數據的線程
通過一個容器來解決生產者消費者強耦合的問題
這個容器是用來解耦的(類似吃飯的時候的服務員)
目錄結構也是一種解耦
下面是用隊列模擬實現
import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count = 0
while count <10:
print("making........")
time.sleep(random.randrange(3))
q.put(count)
print('Producer %s has produced %s baozi..' %(name, count))
count +=1
#q.task_done()
#q.join()
print("ok......")
def Consumer(name):
count = 0
while count <10:
time.sleep(random.randrange(4))
if not q.empty():
data = q.get()
#q.task_done()
#q.join()
print(data)
print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
else:
print("-----no baozi anymore----")
count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()