所用模塊
asyncore
英文捉雞點 這里
源碼中可以看到其實本質上就對 select 以及 socket 的進一步封裝
簡單說明
Python的asyncore模塊提供了以異步的方式寫入套接字服務的客戶端和服務器的基礎結構。
主要包括
- asyncore.loop(…) - 用於循環監聽網絡事件。loop()函數負責檢測一個字典,字典中保存dispatcher的實例。
- asyncore.dispatcher類 - 一個底層套接字對象的簡單封裝。這個類有少數由異步循環調用的,用來事件處理的函數。
- dispatcher類中的writable()和readable()在檢測到一個socket可以寫入或者數據到達的時候被調用,並返回一個bool值,決定是否調用handle_read或者handle_write。
- asyncore.dispatcher_with_send類 - 一個 dispatcher的子類,添加了簡單的緩沖輸出能力,對簡單的客戶端很有用。
可用方法
- handle_read():當socket有可讀的數據的時候執行這個方法,可讀的數據的判定條件就是看方法readable()返回為True還是False。即readable()返回為True的時候執行該方法。
- handle_write():當socket有可寫的數據的時候執行這個方法,可寫的數據的判定條件就是看方法writable()返回為True還是False。即writable()返回為True的時候執行該方法。
- handle_expt():當socket通信過程中出現OOB異常的時候執行該方法。
- handle_connect():當有客戶端連接的時候,執行該方法進行處理。
- handle_close():可連接關閉的時候執行該方法。
- handle_error():當通信過程中出現異常並且沒有在其他的地方進行處理的時候執行該方法。
- handle_accept():當作為server socket監聽的時候,有客戶端連接的時候,利用這個方法進行處理。
- readable():緩沖區是否有可讀數據。
- writable():緩沖區是否有可寫數據。
- create_socket(family, type):創建一個socket連接。
- connect(address):連接一個socket server。
- send(data):發送數據。
- recv(buffer_size):收取數據到內存緩沖中。
- listen(backlog):server socket開始監聽。
- bind(address):server socket綁定某個地址和端口。
- accept():當有客戶端連接的時候,執行該方法接受客戶端連接。
- close():關閉socket。
- asyncore.loop([timeout[, use_poll[, map[, count]]]])
-
- 進入輪詢循環直到所有打開的通道已被關閉或計數通過。
- 所有的參數都是可選的。
- count參數默認為None,只有當所有通道都被關閉時循環才會終止。
- timeout參數設置為select()或poll()調用設置超時,以秒為單位,默認為30秒。
- use_poll參數,如果為true ,則表示 poll()優先於select(),默認值為False。
- map是包含監控的channel的字典。channel關閉時會從map中刪除。不指定map時會使用全局map。
- Channel(asyncore.dispatcher , asynchat.async_chat和其子類的實例)可以自由地混合在map上)。
- asyncore.dispatcher_with_send
- dispatcher的子類,增加了簡單的緩沖輸出,對於簡單的客戶端有用。
- 詳細資料參考:asynchat.async_chat。
- class asyncore.file_dispatcher
- 封裝了文件描述符或文件對象及映射參數(可選)供poll()和loop()函數使用的文件分發器。
- 它提供了文件對象或其他具備fileno()方法的對象,調用fileno()並傳遞到file_wrapper構造函數。
- 可用於UNIX。
- class asyncore.file_wrapper
- 接收整數文件描述符並調用os.dup()復制句柄,這樣原句柄可以關閉,而文件包裝器不受影響。
- 該類封裝了大量方法以模擬socket給file_dispatcher類使用。
- 可用於UNIX。
asynchat
英捉雞 , 這里
簡單說明
該模塊建立在asyncore
基礎架構之上,簡化了異步客戶端和服務器,並且更容易處理元素被任意字符串終止或者長度可變的協議。
主要包括
- asynchat.async_chat類 - 這個類是asyncore.dispatcher的抽象子類。一般使用其collect_incoming_data()和found_terminator()方法。
- collect_incoming_data() - 接收數據。
- found_terminator() - 當輸入數據流符合由 set_terminator() 設置的終止條件時被調用。
- set_terminator() - 設置終止條件。
- push() - 向通道壓入數據以確保其傳輸。
聊天室開發
接口
本次項目開發所需要用到的模塊和接口
asyncore
- dispacher
- loop
- handle_read
- handle_write
asynchat
- collect_incoming_data
- set_terminator
- tound_terminator
- push
- handle_close
流程
- 用戶連接
- 登記用戶
- 建立會話
- 處理用戶消息
聊天室代碼剖析
服務端
ChatServer 類 - 套接字處理
用於創建 server_socket 套接字
整體操作類似於 socket 的使用
import asynchat import asyncore # 定義端口 PORT = 6666 # 定義結束異常類 class EndSession(Exception): pass class ChatServer(asyncore.dispatcher): """ 聊天服務器 """ def __init__(self, port): asyncore.dispatcher.__init__(self) # 創建socket self.create_socket() # 設置 socket 為可重用 self.set_reuse_addr() # 監聽端口 self.bind(('', port)) self.listen(5) self.users = {} self.main_room = ChatRoom(self) def handle_accept(self): conn, addr = self.accept() ChatSession(self, conn)
ChatSession 類 - 會話處理
用於維護聊天室
重寫了 collect_incoming_data 用於數據存放
以及 found_terminator 來進行結束標志
以及 handle_close 來進行結束操作
class ChatSession(asynchat.async_chat): """ 負責和客戶端通信 """ def __init__(self, server, sock): asynchat.async_chat.__init__(self, sock) self.server = server self.set_terminator(b'\n') self.data = [] self.name = None self.enter(LoginRoom(server)) def enter(self, room): # 從當前房間移除自身,然后添加到指定房間 try: cur = self.room except AttributeError: pass else: cur.remove(self) self.room = room room.add(self) def collect_incoming_data(self, data): # 接收客戶端的數據 self.data.append(data.decode("utf-8")) def found_terminator(self): # 當客戶端的一條數據結束時的處理 line = ''.join(self.data) self.data = [] try: self.room.handle(self, line.encode("utf-8")) # 退出聊天室的處理 except EndSession: self.handle_close() def handle_close(self): # 當 session 關閉時,將進入 LogoutRoom asynchat.async_chat.handle_close(self) self.enter(LogoutRoom(self.server))
CommandHandler 類 - 命令處理
用於自定義協議, 類似於開發 httpserver 的時候的 協議格式定制處理
我們預設了4種命令分別由 其同名函數進行分發處理
- do_login 登錄
- de_logout 登出
- do_say 發送消息
- do_look 查看在線用戶
class CommandHandler: """ 命令處理類 """ def unknown(self, session, cmd): # 響應未知命令 # 通過 asynchat.async_chat.push 方法發送消息 session.push(('Unknown command {} \n'.format(cmd)).encode("utf-8")) def handle(self, session, line): line = line.decode() # 命令處理 if not line.strip(): return parts = line.split(' ', 1) cmd = parts[0] try: line = parts[1].strip() except IndexError: line = '' # 通過協議代碼執行相應的方法 method = getattr(self, 'do_' + cmd, None) try: method(session, line) except TypeError: self.unknown(session, cmd)
Room 類 - 初始 聊天室基類
Room 類繼承了 CommandHandler 可以處理聊天室中的命令處理
主要用於維護一個存有所有用戶的 sessions 列表以及 廣播發送信息處理
class Room(CommandHandler): """ 包含多個用戶的環境,負責基本的命令處理和廣播 """ def __init__(self, server): self.server = server self.sessions = [] def add(self, session): # 一個用戶進入房間 self.sessions.append(session) def remove(self, session): # 一個用戶離開房間 self.sessions.remove(session) def broadcast(self, line): # 向所有的用戶發送指定消息 # 使用 asynchat.asyn_chat.push 方法發送數據 for session in self.sessions: session.push(line) def do_logout(self, session, line): # 退出房間 raise EndSession
LoginRoom 類 - 用戶登錄處理
用戶登錄后需要廣播一條信息 xxx 加入聊天室
class LoginRoom(Room): """ 處理登錄用戶 """ def add(self, session): # 用戶連接成功的回應 Room.add(self, session) # 使用 asynchat.asyn_chat.push 方法發送數據 session.push(b'Connect Success') def do_login(self, session, line): # 用戶登錄邏輯 name = line.strip() # 獲取用戶名稱 if not name: session.push(b'UserName Empty') # 檢查是否有同名用戶 elif name in self.server.users: session.push(b'UserName Exist') # 用戶名檢查成功后,進入主聊天室 else: session.name = name session.enter(self.server.main_room)
Loginout 類 - 退出聊天室處理
class LogoutRoom(Room): """ 處理退出用戶 """ def add(self, session): # 從服務器中移除 try: del self.server.users[session.name] except KeyError: pass
ChatRoom 類 - 聊天處理
class ChatRoom(Room): """ 聊天用的房間 """ def add(self, session): # 廣播新用戶進入 session.push(b'Login Success') self.broadcast((session.name + ' has entered the room.\n').encode("utf-8")) self.server.users[session.name] = session Room.add(self, session) def remove(self, session): # 廣播用戶離開 Room.remove(self, session) self.broadcast((session.name + ' has left the room.\n').encode("utf-8")) def do_say(self, session, line): # 客戶端發送消息 self.broadcast((session.name + ': ' + line + '\n').encode("utf-8")) def do_look(self, session, line): # 查看在線用戶 session.push(b'Online Users:\n') for other in self.sessions: session.push((other.name + '\n').encode("utf-8"))
mian - 主函數處理
if __name__ == '__main__': s = ChatServer(PORT) try: print("chat server run at '0.0.0.0:{0}'".format(PORT)) asyncore.loop() except KeyboardInterrupt: print("chat server exit")
客戶端
登錄窗口生成
import wx import telnetlib from time import sleep import _thread as thread class LoginFrame(wx.Frame): """ 登錄窗口 """ def __init__(self, parent, id, title, size): # 初始化,添加控件並綁定事件 wx.Frame.__init__(self, parent, id, title) self.SetSize(size) self.Center() self.serverAddressLabel = wx.StaticText(self, label="Server Address", pos=(10, 50), size=(120, 25)) self.userNameLabel = wx.StaticText(self, label="UserName", pos=(40, 100), size=(120, 25)) self.serverAddress = wx.TextCtrl(self, pos=(120, 47), size=(150, 25)) self.userName = wx.TextCtrl(self, pos=(120, 97), size=(150, 25)) self.loginButton = wx.Button(self, label='Login', pos=(80, 145), size=(130, 30)) # 綁定登錄方法 self.loginButton.Bind(wx.EVT_BUTTON, self.login) self.Show() def login(self, event): # 登錄處理 try: serverAddress = self.serverAddress.GetLineText(0).split(':') con.open(serverAddress[0], port=int(serverAddress[1]), timeout=10) response = con.read_some() if response != b'Connect Success': self.showDialog('Error', 'Connect Fail!', (200, 100)) return con.write(('login ' + str(self.userName.GetLineText(0)) + '\n').encode("utf-8")) response = con.read_some() if response == b'UserName Empty': self.showDialog('Error', 'UserName Empty!', (200, 100)) elif response == b'UserName Exist': self.showDialog('Error', 'UserName Exist!', (200, 100)) else: self.Close() ChatFrame(None, 2, title='ShiYanLou Chat Client', size=(500, 400)) except Exception: self.showDialog('Error', 'Connect Fail!', (95, 20)) def showDialog(self, title, content, size): # 顯示錯誤信息對話框 dialog = wx.Dialog(self, title=title, size=size) dialog.Center() wx.StaticText(dialog, label=content) dialog.ShowModal()
聊天窗口生成
class ChatFrame(wx.Frame): """ 聊天窗口 """ def __init__(self, parent, id, title, size): # 初始化,添加控件並綁定事件 wx.Frame.__init__(self, parent, id, title) self.SetSize(size) self.Center() self.chatFrame = wx.TextCtrl(self, pos=(5, 5), size=(490, 310), style=wx.TE_MULTILINE | wx.TE_READONLY) self.message = wx.TextCtrl(self, pos=(5, 320), size=(300, 25)) self.sendButton = wx.Button(self, label="Send", pos=(310, 320), size=(58, 25)) self.usersButton = wx.Button(self, label="Users", pos=(373, 320), size=(58, 25)) self.closeButton = wx.Button(self, label="Close", pos=(436, 320), size=(58, 25)) # 發送按鈕綁定發送消息方法 self.sendButton.Bind(wx.EVT_BUTTON, self.send) # Users按鈕綁定獲取在線用戶數量方法 self.usersButton.Bind(wx.EVT_BUTTON, self.lookUsers) # 關閉按鈕綁定關閉方法 self.closeButton.Bind(wx.EVT_BUTTON, self.close) thread.start_new_thread(self.receive, ()) self.Show() def send(self, event): # 發送消息 message = str(self.message.GetLineText(0)).strip() if message != '': con.write(('say ' + message + '\n').encode("utf-8")) self.message.Clear() def lookUsers(self, event): # 查看當前在線用戶 con.write(b'look\n') def close(self, event): # 關閉窗口 con.write(b'logout\n') con.close() self.Close() def receive(self): # 接受服務器的消息 while True: sleep(0.6) result = con.read_very_eager() if result != '': self.chatFrame.AppendText(result)
主函數
if __name__ == '__main__': app = wx.App() con = telnetlib.Telnet() LoginFrame(None, -1, title="Login", size=(320, 250)) app.MainLoop()
流程梳理
初始狀態
- socket 創建后 handle_accept 執行來調用了 ChatSession ,
- ChatSession 的 初始化方法中 執行了 enter 方法需要 LoginRoom 的實例化作為參數
- LoginRoom 繼承自 Room , 且 沒有自己定義 初始化方法因此, 利用 Room 進行初始化
- Room 初始化方法中創建了一個 sessions 列表, 此列表用於維護 用戶會話
- enter 方法中執行了一個 add 方法, LoginRoom 和 其基類中的 Room 中都有 add 方法
- 根據 python 面向對象的定義, 執行的是 LoginRoom 中的 add , 此 add 方法中又再次執行了一個 Room.add
- 最終還是執行到了 Room 中的 add 方法, 即往 sessions 列表中加入了這個會話. 以上設計的是初始化方法
用戶操作
- 初始的接口程序經由 found_terminator 進行發起 ( 官方解釋如下 )
To build a functioning async_chat subclass your input methods collect_incoming_data() and found_terminator() must handle the data that the channel receives asynchronously. The methods are described below
- 然后又此方法分發在 CommandHandler 類中進行字符串的分解以及反射分別到 do_ 開頭的4個方法進行分發執行
- do_login 相關的驗證后, 分流到 ChatRoom 中進行相關的 do_say / do_look 操作
- do_say 經由 廣播 ( Room.broadcast ) 進行想的操作
- do_logout 直接退出
- do_look 查看當前所有用戶