Python/selectors模塊及隊列
selectors模塊是可以實現IO多路復用機制:
它具有根據平台選出最佳的IO多路機制,比如在win的系統上他默認的是select模式而在linux上它默認的epoll。
常用共分為三種: select、poll、epoll
select的缺點:
1、每次調用都要將所有的文件描述符(fd)拷貝的內核空間,導致效率下降
2、遍歷所有的文件描述符(fd)查看是否有數據訪問
3、最大鏈接數限額(1024)
poll:
它就是select和epoll的過渡階段,它沒有最大鏈接數的限額
epoll:
1、第一個函數是創建一個epoll句柄,將所有的描述符(fd)拷貝到內核空間,但只拷貝一次。
2、回調函數,某一個函數或某一個動作成功完成之后會觸發的函數為所有的描述符(fd)綁定一個回調函數,一旦有數據訪問就是觸發該回調函數,回調函數將(fd)放到鏈表中
3、函數判斷鏈表是否為空
4、最大啟動項沒有限額
selsect實例:
1 服務端 2 import selectors #基於select模塊實現的IO多路復用,建議大家使用 3 import socket 4 sock=socket.socket() 5 sock.bind(('127.0.0.1',8800)) 6 sock.listen(5) 7 sock.setblocking(False) 8 sel=selectors.DefaultSelector() #根據平台選擇最佳的IO多路機制,比如linux就會選擇epoll 9 10 def read(conn,mask): 11 try: 12 data=conn.recv(1024) 13 print(data.decode('utf8')) 14 data2=input('>>>>') 15 conn.send(data2.encode('utf8')) 16 except Exception: 17 sel.unregister(conn) 18 19 def accept(sock,mask): 20 conn,addr=sock.accept() 21 print('-------',conn) 22 sel.register(conn,selectors.EVENT_READ,read) 23 sel.register(sock, selectors.EVENT_READ, accept) #注冊功能 24 while True: 25 print('wating....') 26 events=sel.select() #[(sock),(),()] 監聽 27 28 for key,mask in events: 29 # print(key.data) #accept 找出有活動的綁定函數 30 # print(key.fileobj) #sock 找出有活動的文件描述符 31 32 func=key.data 33 obj=key.fileobj 34 35 func(obj,mask) #1 accept(sock,mask) 2read(conn,mask) 36 ------------------------------------------------------------------------------ 37 客戶端 38 import socket 39 tin=socket.socket() 40 tin.connect(('127.0.0.1',8800)) 41 while True: 42 inp=input('>>>>') 43 tin.send(inp.encode('utf8')) 44 data=tin.recv(1024) 45 print(data.decode('utf8'))
隊列:
隊列分為(先進先出、后進先出) 隊列是一個數據類型,可以進行數據儲存功能 隊列可以實現耦合的效果,比如有個人A把包子放到鍋里,在有個人B把包子從鍋里拿出來。現在的鍋就是隊列的效果。
實例如下:
1 import queue 2 q=queue.Queue() #默認的先進先出 3 q.put(111) #往管道里放一個值 4 q.put(222) #往管道里放一個值 5 q.put(333) #往管道里放一個值 6 7 print(q.get()) #從管道里拿一個值 8 print(q.get()) #從管道里拿一個值 9 print(q.get()) #從管道里拿一個值 10 ----------------------------------------------- 11 運行結果 12 111 13 222 14 333
join和tast_done
1 import queue 2 q=queue.Queue() #默認的先進先出 3 q.put(111) #往管道里放一個值 4 q.task_done() #解除阻塞 5 q.put(222) #往管道里放一個值 6 q.task_done() #解除阻塞 7 q.put(333) #往管道里放一個值 8 q.task_done() #解除阻塞 9 q.join() #隊列阻塞功能 10 print(q.get()) #從管道里拿一個值 11 print(q.get()) #從管道里拿一個值 12 print(q.get()) #從管道里拿一個值
join和tast_done 功能
如果有倆個put功能,在get的前邊有join功能的話,在倆個put的后邊要進行一個task_done才能執行。
生產者消費者模型:
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
這就像,在餐廳,廚師做好菜,不需要直接和客戶交流,而是交給前台,而客戶去飯菜也不需要不找廚師,直接去前台領取即可,這也是一個結耦的過程。
1 import time,random 2 import queue,threading 3 4 q = queue.Queue() 5 6 def Producer(name): 7 count = 0 8 while count <10: 9 print("making........") 10 time.sleep(random.randrange(3)) 11 q.put(count) 12 print('Producer %s has produced %s baozi..' %(name, count)) 13 count +=1 14 #q.task_done() 15 #q.join() 16 print("ok......") 17 def Consumer(name): 18 count = 0 19 while count <10: 20 time.sleep(random.randrange(4)) 21 if not q.empty(): 22 data = q.get() 23 #q.task_done() 24 #q.join() 25 print(data) 26 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 27 else: 28 print("-----no baozi anymore----") 29 count +=1 30 31 p1 = threading.Thread(target=Producer, args=('A',)) 32 c1 = threading.Thread(target=Consumer, args=('B',)) 33 # c2 = threading.Thread(target=Consumer, args=('C',)) 34 # c3 = threading.Thread(target=Consumer, args=('D',)) 35 p1.start() 36 c1.start() 37 # c2.start() 38 # c3.start()
這個生產消費者就是通過列表的形式把輸入和獲取進行耦合操作!
