我們今天要做一個聊天系統,這樣可以和我們之前flask api那系列文章結合起來;其次,聊天系統最能代表tcpserver,以后可以套用各種模型,比如我們公司做的物聯網,其實就是把聊天系統簡化一下。
twisted官方網站已經為我們提供了一個非常好的例子,我們研究一下,然后在此基礎上進行修改即可(這方面確實要比tornado做得好,不過tornado在閱讀源碼方面又有很大優勢,以后我們做一個tornado版的)
from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver from twisted.internet import reactor class Chat(LineReceiver): def __init__(self, users): self.users = users self.name = None self.state = "GETNAME" def connectionMade(self): self.sendLine("What's your name?") def connectionLost(self, reason): if self.name in self.users: del self.users[self.name] def lineReceived(self, line): if self.state == "GETNAME": self.handle_GETNAME(line) else: self.handle_CHAT(line) def handle_GETNAME(self, name): if name in self.users: self.sendLine("Name taken, please choose another.") return self.sendLine("Welcome, %s!" % (name,)) self.name = name self.users[name] = self self.state = "CHAT" def handle_CHAT(self, message): message = "<%s> %s" % (self.name, message) for name, protocol in self.users.iteritems(): if protocol != self: protocol.sendLine(message) class ChatFactory(Factory): def __init__(self): self.users = {} # maps user names to Chat instances def buildProtocol(self, addr): return Chat(self.users) reactor.listenTCP(8123, ChatFactory()) reactor.run()
代碼非常簡單,每個用戶連接上來的時候,都新建一個Chat對象,Chat類中,包含各種對單個連接的操作方法,其實看名字都可以看出來他們的作用,
構造函數__init__中定義了3個變量,users是一個字典,包含所有當前連接的對象,key是它的name,value是Chat對象本身,代表自己這個連接;name標識這個連接名稱,一定要明了,唯一,我們以后會用客戶的電話號碼作為它的name;state有點意思,它代表一個狀態,當這個連接沒有通過驗證的時候,是一個狀態,驗證過以后,又是一個狀態。其實state以后還會繼續擴展,比如說,在很多時候,會有很多垃圾連接進來,通常一個連接上來,在一定時間內還沒有通過驗證,就可以abort掉。
connectionMade看名字也知道,連接創建好以后,觸發的函數。
connectionLost看名字意思,連接丟失以后,觸發的函數,這個函數以后可以擴展到redis記錄連接狀態。
lineReceived這個是一個連接用的最多的函數,就是數據接受到以后,觸發的函數,下面2個函數就是在此基礎上構建而成的。
handle_GETNAME和handle_CHAT的運用跟連接的state有關,當state在未驗證狀態時,調用handle_GETNAME函數;當已經驗證過時,調用handle_CHAT。
再看看factory類,其中users就不用說了,記錄每個連接的變量。
buildProtocol,新建一個連接以后,觸發的函數,它調用了Chat的構造函數,新建一個Chat對象。
其實Chat繼承LineReceive,而LineReceive繼承Protocol的。真實的連接是transport,所以我們這個例子中沒有展示出來transport,只有sendLine這樣的函數,我下面自己寫例子的時候,會加上去;Protocol其實就是整個連接連上來以后,加上一些這個連接當前的狀態,再加上一些基本操作方法組成的;Factory就是所有Protocol組成的一個工廠類,每新加入或者減少一個Protocol對象時,都能在Factory里面表現出來。
整個代碼分析完畢,官方例子就可以直接運行了,看看運行結果吧。
用telnet模擬一個客戶端,就可以很好的操作了。
以上全是官方的例子,我們要引入自己的項目。
首先,數據模型,官方例子很簡單,直接把str格式的數據發送出去,在測試的時候沒問題,但正式項目中絕對不可能。通常每個數據,都會由2部分組成,一個header作為頭,一個content作為內容。其實就是模擬http。header中,通常有數據長度、版本號、數據類型id等,這個都不是必須的,要根據你實際項目來。content作為真實數據內容,一般都用json數據格式,當然,如果你追求效率,也可以用google protor buf或者facebook的數據模式,都可以(很多公司都用的google protor buf模式,解析速度比較快,我們這為了簡單,就用json格式)。
上面是我們數據格式,綠色段就是header,藍色段就是content。我上面就說了,這只是隨便寫的一個項目,在真實項目中,要根據你的需求來選擇,很可能要保留字段。這邊稍微解釋一下command_id,其實這個就類似於http中的url,http根據url表明它的作用;我們這同樣根據command_id標示它的作用,因為在整個過程中,不但有聊天,還有驗證過程,以后還可能有廣播,組播等各種功能。我們就根據command_id來判斷這個數據的作用(其實寫到這,大家完全可以看出來,我們基本就是跟http學的,現實過程中也這樣,幾乎都在模仿http),而響應之類的,就是服務器主動推送給客戶端的command_id,這也是跟http不同的地方,很多時候,我們都是主動推送給客戶端。
好了,既然已經這樣規定,我們再詳細規定一下command_id吧,就像http的url一樣。
我們先比較簡單的設定一下,以后要是有改動,再改變。
我們重寫tcpserver,代碼如下:
# coding:utf-8 from twisted.internet.protocol import Factory, Protocol from twisted.internet import reactor import struct import json from twisted.python import log import sys log.startLogging(sys.stdout) class Chat(Protocol): def __init__(self, users): self.users = users self.phone_number = None self.state = "VERIFY" self.version = 0 self.command_func_dict = { 1: self.handle_verify, 2: self.handle_single_chat, 3: self.handle_group_chat, 4: self.handle_broadcast_chat } def connectionMade(self): log.msg("New connection, the info is:", self.transport.getPeer()) def connectionLost(self, reason): if self.phone_number in self.users: del self.users[self.phone_number] def dataReceived(self, data): """ 接受到數據以后的操作 """ length, self.version, command_id = struct.unpack('!3I', data[:12]) content = data[12:length] if command_id not in [1, 2, 3, 4]: return if self.state == "VERIFY" and command_id == 1: self.handle_verify(content) else: self.handle_data(command_id, content) def handle_verify(self, content): """ 驗證函數 """ content = json.loads(content) phone_number = content.get('phone_number') if phone_number in self.users: log.msg("電話號碼<%s>存在老的連接." % phone_number.encode('utf-8')) self.users[phone_number].connectionLost("") log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),)) self.phone_number = phone_number self.users[phone_number] = self self.state = "DATA" send_content = json.dumps({'code': 1}) self.send_content(send_content, 101, [phone_number]) def handle_data(self, command_id, content): """ 根據command_id來分配函數 """ self.command_func_dict[command_id](content) def handle_single_chat(self, content): """ 單播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(send_content, 102, [chat_to]) def handle_group_chat(self, content): """ 組播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = chat_to self.send_content(send_content, 103, phone_numbers) def handle_broadcast_chat(self, content): """ 廣播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = self.users.keys() self.send_content(send_content, 104, phone_numbers) def send_content(self, send_content, command_id, phone_numbers): """ 發送函數 """ length = 12 + len(send_content) version = self.version command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) for phone_number in phone_numbers: if phone_number in self.users.keys(): self.users[phone_number].transport.write(header_pack + send_content) else: log.msg("Phone_number:%s 不在線,不能聊天." % phone_number.encode('utf-8')) class ChatFactory(Factory): def __init__(self): self.users = {} def buildProtocol(self, addr): return Chat(self.users) reactor.listenTCP(8124, ChatFactory()) reactor.run()
代碼修改的比較多,
首先,直接從Protocol繼承了,這樣比從LineReceive繼承更直觀一點;command_func_dict代表command_id和其處理函數的一一對應字典;
其次,dataReceived是主要的接受函數,接受到數據以后,先解析header,根據header里面的length截取數據,再根據command_id來把數據送個它的處理函數。如果command_id為1,就進入驗證函數;如果為其他,就進入其他數據處理函數,不過要先驗證通過,才能用其他函數處理。這就跟http一樣。(這邊以后要重寫的,大家想象一下,如果我一個客戶端連接,同時發送2個數據,按照上面代碼,只能處理一個數據,另外一個就丟棄了。)
最后,send_content為總的發送函數,先把header頭組建好,然后加上數據,就發送了。這邊可能遇到發送的客戶端不在線,要先檢測一下(以后還會遇到各種意外斷線情況,服務器端沒法及時檢測到,這個以后再講。)
服務器端是不是很簡單?再寫一個客戶端代碼,客戶端如果用GUI方式寫的話,篇幅太長了,我們這就用最簡單的方式,模擬客戶端操作。下面是客戶端代碼。
# coding:utf-8 from twisted.internet import reactor, task from twisted.internet.protocol import Protocol, ClientFactory import struct from twisted.python import log import sys import json log.startLogging(sys.stdout) class EchoClient(Protocol): def __init__(self): self.command_func_dict = { 101: self.handle_verify_s, 102: self.handle_single_chat_s, 103: self.handle_group_chat_s, 104: self.handle_broadcast_chat_s } self.version = 0 self.state = "VERIFY" self.phone_number = "" def connectionMade(self): log.msg("New connection", self.transport.getPeer()) def dataReceived(self, data): length, self.version, command_id = struct.unpack('!3I', data[:12]) content = data[12:length] if self.state == "VERIFY" and command_id == 101: self.handle_verify_s(content) else: self.handle_data(command_id, content) def handle_data(self, command_id, pack_data): self.command_func_dict[command_id](pack_data) def connectionLost(self, reason): log.msg("connection lost") def handle_verify_s(self, pack_data): """ 接受驗證結果 """ content = json.loads(pack_data) code = content.get('code') if code == 1: log.msg('驗證通過') self.state = "Data" def handle_single_chat_s(self, pack_data): """ 接受單聊 """ content = json.loads(pack_data) chat_from = content.get('chat_from') chat_content = content.get('chat_content') log.msg("[單聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def handle_group_chat_s(self, pack_data): """ 接受組聊 """ content = json.loads(pack_data) chat_from = content.get('chat_from') chat_content = content.get('chat_content') log.msg("[組聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def handle_broadcast_chat_s(self, pack_data): """ 接受廣播 """ content = json.loads(pack_data) chat_from = content.get('chat_from') chat_content = content.get('chat_content') log.msg("[群聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def send_verify(self, phone_number): """ 發送驗證 """ content = json.dumps(dict(phone_number=phone_number)) self.send_data(content, 1) def send_single_chat(self, chat_from, chat_to, chat_content): """ 發送單聊內容 """ content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content)) self.send_data(content, 2) def send_group_chat(self, chat_from, chat_to, chat_content): """ 發送組聊內容 """ content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content)) self.send_data(content, 3) def send_broadcast_chat(self, chat_from, chat_content): """ 發送群聊內容 """ content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_data(content, 4) def send_data(self, send_content, command_id): """ 發送函數 """ length = 12 + len(send_content) version = self.version command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) self.transport.write(header_pack + send_content) class EchoClientFactory(ClientFactory): def __init__(self): self.p = EchoClient() def startedConnecting(self, connector): log.msg("Started to connect") def buildProtocol(self, addr): log.msg("Connected.") return self.p def clientConnectionFailed(self, connector, reason): log.msg("Lost connection. Reason:", reason) def clientConnectionLost(self, connector, reason): log.msg("Connection failed. Reason:", reason) if __name__ == '__main__': cf = EchoClientFactory() chat_from = sys.argv[1] all_phone_numbers = ['000001', '000002', '000003', '000004'] all_phone_numbers.remove(chat_from) import random reactor.callLater(3, cf.p.send_verify, chat_from) reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,這是單聊') reactor.callLater(11, cf.p.send_group_chat, chat_from, [random.choice(all_phone_numbers), random.choice(all_phone_numbers)], '你好,這是組聊') reactor.callLater(12, cf.p.send_broadcast_chat, chat_from, '你好,這是群聊') reactor.connectTCP('127.0.0.1', 8124, cf) reactor.run()
客戶端比較簡單,主要是幾個發送函數,基本都是以send_開頭,就是主動發送消息以及驗證的;接受從服務器的處理函數,基本以handle_開頭。跟服務器端一樣,接受到數據以后,先解析header,根據header里面的length截取數據,再根據command_id來把數據送個它的處理函數。
這邊弄了個定時任務,第3秒開始驗證;第10秒隨機發送一個單聊;第11秒隨機發送一個組聊;第12秒發送一個群聊。
我們開3個客戶端,看看結果吧。
yudahai@yudahaiPC:tcpserver$ python frontClient.py 000001 2016-06-21 17:33:17+0800 [-] Log opened. 2016-06-21 17:33:17+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fa325b41680> 2016-06-21 17:33:17+0800 [-] Started to connect 2016-06-21 17:33:17+0800 [Uninitialized] Connected. 2016-06-21 17:33:17+0800 [Uninitialized] New connection IPv4Address(TCP, '127.0.0.1', 8124) 2016-06-21 17:33:20+0800 [EchoClient,client] 驗證通過 2016-06-21 17:33:29+0800 [EchoClient,client] [群聊][000001]:你好,這是群聊 2016-06-21 17:33:29+0800 [EchoClient,client] [單聊][000002]:你好,這是單聊 2016-06-21 17:33:30+0800 [EchoClient,client] [組聊][000002]:你好,這是組聊 2016-06-21 17:33:31+0800 [EchoClient,client] [群聊][000002]:你好,這是群聊 2016-06-21 17:33:38+0800 [EchoClient,client] [單聊][000003]:你好,這是單聊 2016-06-21 17:33:39+0800 [EchoClient,client] [組聊][000003]:你好,這是組聊 2016-06-21 17:33:40+0800 [EchoClient,client] [群聊][000003]:你好,這是群聊
yudahai@yudahaiPC:tcpserver$ python frontClient.py 000002 2016-06-21 17:33:19+0800 [-] Log opened. 2016-06-21 17:33:19+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f23f9a48680> 2016-06-21 17:33:19+0800 [-] Started to connect 2016-06-21 17:33:19+0800 [Uninitialized] Connected. 2016-06-21 17:33:19+0800 [Uninitialized] New connection IPv4Address(TCP, '127.0.0.1', 8124) 2016-06-21 17:33:22+0800 [EchoClient,client] 驗證通過 2016-06-21 17:33:27+0800 [EchoClient,client] [單聊][000001]:你好,這是單聊 2016-06-21 17:33:29+0800 [EchoClient,client] [群聊][000001]:你好,這是群聊 2016-06-21 17:33:31+0800 [EchoClient,client] [群聊][000002]:你好,這是群聊 2016-06-21 17:33:40+0800 [EchoClient,client] [群聊][000003]:你好,這是群聊
yudahai@yudahaiPC:tcpserver$ python frontClient.py 000003 2016-06-21 17:33:28+0800 [-] Log opened. 2016-06-21 17:33:28+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7ff3067dc680> 2016-06-21 17:33:28+0800 [-] Started to connect 2016-06-21 17:33:28+0800 [Uninitialized] Connected. 2016-06-21 17:33:28+0800 [Uninitialized] New connection IPv4Address(TCP, '127.0.0.1', 8124) 2016-06-21 17:33:31+0800 [EchoClient,client] 驗證通過 2016-06-21 17:33:40+0800 [EchoClient,client] [群聊][000003]:你好,這是群聊
這就是3個客戶端的結果,是不是你期望的值?
再看看服務器端的調試結果。
/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py 2016-06-21 17:23:01+0800 [-] Log opened. 2016-06-21 17:23:01+0800 [-] ChatFactory starting on 8124 2016-06-21 17:23:01+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f08b0ec8638> 2016-06-21 17:23:26+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59802) 2016-06-21 17:23:29+0800 [Chat,0,127.0.0.1] 歡迎, 000001! 2016-06-21 17:23:36+0800 [Chat,0,127.0.0.1] Phone_number:000003 不在線,不能聊天. 2016-06-21 17:23:37+0800 [Chat,0,127.0.0.1] Phone_number:000003 不在線,不能聊天. 2016-06-21 17:23:37+0800 [Chat,0,127.0.0.1] Phone_number:000002 不在線,不能聊天. 2016-06-21 17:33:17+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59926) 2016-06-21 17:33:19+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59928) 2016-06-21 17:33:20+0800 [Chat,1,127.0.0.1] 歡迎, 000001! 2016-06-21 17:33:22+0800 [Chat,2,127.0.0.1] 歡迎, 000002! 2016-06-21 17:33:28+0800 [Chat,1,127.0.0.1] Phone_number:000004 不在線,不能聊天. 2016-06-21 17:33:28+0800 [Chat,1,127.0.0.1] Phone_number:000004 不在線,不能聊天. 2016-06-21 17:33:28+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59930) 2016-06-21 17:33:30+0800 [Chat,2,127.0.0.1] Phone_number:000003 不在線,不能聊天. 2016-06-21 17:33:31+0800 [Chat,3,127.0.0.1] 歡迎, 000003!
不在線的時候,都打印出來了。
其實整個例子還是比較簡單的,但是很多地方還非常不完善,這個要在我們接下來的系列中,慢慢完善。
比如:如果一個客戶端同時發送2個數據,上面的代碼就只處理了一個,另外一個就丟棄掉了;還有,我們的程序考慮的是正常的上線、離線,如果客戶端因為網絡問題,突然斷線,沒有發生tcp結束的4次握手,服務器端是不知道的,這時候如何保證服務器端知道客戶端在線不在線?還有,twisted如何異步訪問數據庫、redis、rabbitmq等,這個我們以后都會慢慢講。