上一章,我們直接寫了一個小例子來從整體講述twisted運行的大致過程,今天我們首先深入一些概念,在逐漸明白這些概念以后,我們會修改昨天寫的例子。
先看下面一張圖:
這個系列的第一篇文章,我們已經為大家展示了一張twisted的原理圖,那張圖,因為我們沒有捕獲任何socket事件,所以只有一個圈。這張圖上面的大圈代表捕獲socket事件,這個也是twisted最主要的功能,它已經為我們做了。並且提供了2個函數,transport.write寫入事件,dataReceived讀取事件。下面的小圈子,也就是我們自己的代碼,比如我們昨天的驗證、單聊、組聊等功能。大家一定要時時刻刻記住這張圖,編寫twisted代碼的時候,腦子里印着這張圖,這就跟我們以前寫c代碼的時候,一定要記住內存模型一樣。
回到這個大圈,transport.write和dataReceived其實是經過很多層封裝的函數,它們本質上還是操作select模型中的寫文件描述符(write_fd)、讀文件描述符(read_fd),對應twisted的基礎類就是IWriteDescriptor和IReadDescriptor,如果我們比較熟悉select模型,我們都知道,每次新來一個連接,都是建立write_fd、read_fd、error_fd,select不停的輪詢這些fd,當其中任何一個滿足條件時,觸發相應的事件,這些所有的東西,twisted都已經幫我們做好了,而且異步化了。我們接受到事件,只管處理就好了。
再看下面一個圖,
仔細看上面這個圖,再對比之前的圖,twisted在socket這塊全部為我們做好。
下面我們再講一下transport這個對象,這個對象在每個Protocol里面都會產生一個,它代表一個連接,這個連接可以是socket,也可以是unix的pipe,twisted已經為我們封裝好,一般不會自己去新建它。通常我們會用它來發送數據(write)、獲取連接另一方的信息(getPeer)。
再看一下dataReceived這個函數,就是每次接到數據以后觸發事件,上面說了,就是每次循環,select檢查這些fd,fd被寫入就觸發。這時候大家想想,如果循環被阻塞,在這個data里面會有很多數據,按照我們昨天的程序,只會處理第一個數據,其他的可能被丟棄掉了。
我們昨天的例子,把客戶端運行代碼稍微修改一下,在第10秒的時候,同時發送2個數據(粘包),看看服務器運行情況。
if __name__ == '__main__': cf = EchoClientFactory() chat_from = sys.argv[1] all_phone_numbers = ['000001', '000002', '000003', '000004'] all_phone_numbers.remove(chat_from) import random reactor.callLater(3, cf.p.send_verify, chat_from) reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,這是單聊') reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,這是單聊') # reactor.callLater(11, cf.p.send_group_chat, chat_from, [random.choice(all_phone_numbers), random.choice(all_phone_numbers)], '你好,這是組聊') # reactor.callLater(12, cf.p.send_broadcast_chat, chat_from, '你好,這是群聊') reactor.connectTCP('127.0.0.1', 8124, cf) reactor.run()
客戶端代碼已經更改,運行一下,看看服務器結果。
/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py 2016-06-22 10:23:55+0800 [-] Log opened. 2016-06-22 10:23:55+0800 [-] ChatFactory starting on 8124 2016-06-22 10:23:55+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f382d908638> 2016-06-22 10:24:02+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 47834) 2016-06-22 10:24:05+0800 [Chat,0,127.0.0.1] 歡迎, 000001! 2016-06-22 10:24:12+0800 [Chat,0,127.0.0.1] 你好,這是單聊 2016-06-22 10:24:12+0800 [Chat,0,127.0.0.1] Phone_number:000002 不在線,不能聊天.
果然,只處理了一個數據,后面一個直接丟棄掉了。
通常來說,我們都會為每個Protocol申請一段內存,每次接受到數據以后,先存放到這段內存中,然后再集中處理,這樣,即使循環被blocking住或者客戶端粘包,我們也能正確處理。新的代碼如下:
# coding:utf-8 from twisted.internet.protocol import Factory, Protocol from twisted.internet import reactor import struct import json from twisted.python import log import sys log.startLogging(sys.stdout) class Chat(Protocol): def __init__(self, users): self.users = users self.phone_number = None self.state = "VERIFY" self.version = 0 self.command_func_dict = { 1: self.handle_verify, 2: self.handle_single_chat, 3: self.handle_group_chat, 4: self.handle_broadcast_chat } self._data_buffer = bytes() def connectionMade(self): log.msg("New connection, the info is:", self.transport.getPeer()) def connectionLost(self, reason): if self.phone_number in self.users: del self.users[self.phone_number] def dataReceived(self, data): """ 接受到數據以后的操作 """ self._data_buffer += data while True: length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12]) if length > len(self._data_buffer): break content = self._data_buffer[12:length] if command_id not in [1, 2, 3, 4]: return if self.state == "VERIFY" and command_id == 1: self.handle_verify(content) else: self.handle_data(command_id, content) self._data_buffer = self._data_buffer[length:] if len(self._data_buffer) < 12: break def handle_verify(self, content): """ 驗證函數 """ content = json.loads(content) phone_number = content.get('phone_number') if phone_number in self.users: log.msg("電話號碼<%s>存在老的連接." % phone_number.encode('utf-8')) self.users[phone_number].connectionLost("") log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),)) self.phone_number = phone_number self.users[phone_number] = self self.state = "DATA" send_content = json.dumps({'code': 1}) self.send_content(send_content, 101, [phone_number]) def handle_data(self, command_id, content): """ 根據command_id來分配函數 """ self.command_func_dict[command_id](content) def handle_single_chat(self, content): """ 單播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') log.msg(chat_content.encode('utf-8')) send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(send_content, 102, [chat_to]) def handle_group_chat(self, content): """ 組播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = chat_to self.send_content(send_content, 103, phone_numbers) def handle_broadcast_chat(self, content): """ 廣播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = self.users.keys() self.send_content(send_content, 104, phone_numbers) def send_content(self, send_content, command_id, phone_numbers): """ 發送函數 """ length = 12 + len(send_content) version = self.version command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) for phone_number in phone_numbers: if phone_number in self.users.keys(): self.users[phone_number].transport.write(header_pack + send_content) else: log.msg("Phone_number:%s 不在線,不能聊天." % phone_number.encode('utf-8')) class ChatFactory(Factory): def __init__(self): self.users = {} def buildProtocol(self, addr): return Chat(self.users) reactor.listenTCP(8124, ChatFactory()) reactor.run()
我們在構造函數里面,加入了一個字段,這個字段就是self._data_buffer,在每次接受到數據以后,都循環處理這段內存。再看看運行結果,有什么不同。
/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py 2016-06-22 10:40:42+0800 [-] Log opened. 2016-06-22 10:40:42+0800 [-] ChatFactory starting on 8124 2016-06-22 10:40:42+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f96860e0680> 2016-06-22 10:40:57+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 48010) 2016-06-22 10:41:00+0800 [Chat,0,127.0.0.1] 歡迎, 000001! 2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] 你好,這是單聊 2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] Phone_number:000004 不在線,不能聊天. 2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] 你好,這是單聊 2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] Phone_number:000003 不在線,不能聊天.
是不是正確了?接受數據,我們先講到這,下面我們講開發tcpserver一定要處理的問題,異常斷線
異常斷線
異常斷線的處理在tcpserver開發過程中必不可少,很多時候,尤其是無線、3G、4G網絡,信號不好的時候就斷線,由於是網絡問題,沒有經過tcp結束的4次握手,服務器不可能及時檢查到此事件,這時候就有可能出錯。通常我們會采取一種心跳包機制,即客戶端每隔一段時間就向服務器端發送一個心跳包,服務器端每隔一段時間就檢測一下,如果發現客戶端連續2次或者多次沒有發送心跳包,就認為客戶端已經掉線,再采取措施。
好了,說了這么多,先要重新部署一下程序,我把一個客戶端發在我的另外一台筆記本上,先連接好,然后拔掉網線,再從服務器端發送一組數據過去,看看會發生什么。
首先,我們把000002放在筆記本上,000001在服務器端,在10秒和20秒的時候,分別發送一個單聊給000002,看看服務器端和000002的情況。
000001的運行代碼修改如下:
if __name__ == '__main__': cf = EchoClientFactory() chat_from = sys.argv[1] all_phone_numbers = ['000001', '000002', '000003', '000004'] all_phone_numbers.remove(chat_from) import random reactor.callLater(3, cf.p.send_verify, chat_from) reactor.callLater(10, cf.p.send_single_chat, chat_from, '000002', '你好,這是10秒的時候發送') reactor.callLater(20, cf.p.send_single_chat, chat_from, '000002', '你好,這是20秒的時候發送') reactor.connectTCP('127.0.0.1', 8124, cf) reactor.run()
10秒和20秒,分別發送數據到服務器端,而000002端,在10秒和20秒的中間,拔掉網線,我們看看發生了什么情況。
首先,服務器端的運行結果如下:
/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py 2016-06-22 11:40:02+0800 [-] Log opened. 2016-06-22 11:40:02+0800 [-] ChatFactory starting on 8124 2016-06-22 11:40:02+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f9c39f89638> 2016-06-22 11:41:26+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.15', 57150) 2016-06-22 11:41:29+0800 [Chat,0,192.168.5.15] 歡迎, 000002! 2016-06-22 11:41:41+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 49526) 2016-06-22 11:41:44+0800 [Chat,1,127.0.0.1] 歡迎, 000001! 2016-06-22 11:41:51+0800 [Chat,1,127.0.0.1] 你好,這是10秒的時候發送 2016-06-22 11:42:01+0800 [Chat,1,127.0.0.1] 你好,這是20秒的時候發送
它在000002中斷了以后,並沒有發現000002已經中斷,還是照樣write下去,其實本質上,它還是把數據發到了write_fd上,然后就是底層的事了。
而000002客戶端的結果比較有意思。
2016-06-22 11:41:26+0800 [-] Log opened. 2016-06-22 11:41:26+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f4e75db7680> 2016-06-22 11:41:26+0800 [-] Started to connect 2016-06-22 11:41:26+0800 [Uninitialized] Connected. 2016-06-22 11:41:26+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124) 2016-06-22 11:41:29+0800 [EchoClient,client] 驗證通過 2016-06-22 11:41:51+0800 [EchoClient,client] [單聊][000001]:你好,這是10秒的時候發送 2016-06-22 11:44:27+0800 [EchoClient,client] [單聊][000001]:你好,這是20秒的時候發送
大家注意到沒有,居然還是收到了,但是看時間,時間和原來的是不對的。我后來把網線重新插上去,然后就接受到了。twisted把write_fd的數據重新發送給了客戶端,因為客戶端沒有任何改變,ip和端口都是原來的,網絡情況沒有改變,所以再次就連接上來。
我們再試一下另外一種情況,也是移動端經常遇到的情況,就是切換網絡,比如從4G切換到無線網,看看會發生什么。
yudahai@yu-sony:~/PycharmProjects/flask001$ python frontClient.py 000002 2016-06-22 13:09:34+0800 [-] Log opened. 2016-06-22 13:09:34+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fd8a0408680> 2016-06-22 13:09:34+0800 [-] Started to connect 2016-06-22 13:09:34+0800 [Uninitialized] Connected. 2016-06-22 13:09:34+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124) 2016-06-22 13:09:37+0800 [EchoClient,client] 驗證通過 2016-06-22 13:09:54+0800 [EchoClient,client] [單聊][000001]:你好,這是10秒的時候發送
客戶端再也收不到了,這也是真實情況。通常來說,用戶切換網絡的時候,都會更改網絡信息,這時候移動客戶端再也收不到這個信息了,而且服務器端也不會報錯(以后要為我們做消息確認機制埋下伏筆。)
既然收不到了,我們就解決這個問題,上面說了,增加心跳包機制,客戶端每隔一段時間發送一次心跳包,服務器端收到心跳包以后,記錄最近一次接受到的時間。每隔一段時間,服務器整體輪詢一次,如果發現某一個客戶端很長時間沒有接受到心跳包,就判定它為斷線,這時候主動切斷這個客戶端。
心跳包的command_id也要加上,直接為5吧,內容為空。只是心跳包,沒有必要寫內容了。
新代碼如下:
frontTCP.py
# coding:utf-8 from twisted.internet.protocol import Factory, Protocol from twisted.internet import reactor, task import struct import json from twisted.python import log import sys import time log.startLogging(sys.stdout) class Chat(Protocol): def __init__(self, users): self.users = users self.phone_number = None self.state = "VERIFY" self.version = 0 self.last_heartbeat_time = 0 self.command_func_dict = { 1: self.handle_verify, 2: self.handle_single_chat, 3: self.handle_group_chat, 4: self.handle_broadcast_chat, 5: self.handle_heartbeat } self._data_buffer = bytes() def connectionMade(self): log.msg("New connection, the info is:", self.transport.getPeer()) def connectionLost(self, reason): log.msg("[%s]:斷線" % self.phone_number.encode('utf-8')) if self.phone_number in self.users: del self.users[self.phone_number] def dataReceived(self, data): """ 接受到數據以后的操作 """ self._data_buffer += data while True: length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12]) if length > len(self._data_buffer): break content = self._data_buffer[12:length] if command_id not in [1, 2, 3, 4, 5]: return if self.state == "VERIFY" and command_id == 1: self.handle_verify(content) else: self.handle_data(command_id, content) self._data_buffer = self._data_buffer[length:] if len(self._data_buffer) < 12: break def handle_heartbeat(self, content): """ 處理心跳包 """ self.last_heartbeat_time = int(time.time()) def handle_verify(self, content): """ 驗證函數 """ content = json.loads(content) phone_number = content.get('phone_number') if phone_number in self.users: log.msg("電話號碼<%s>存在老的連接." % phone_number.encode('utf-8')) self.users[phone_number].connectionLost("") log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),)) self.phone_number = phone_number self.users[phone_number] = self self.state = "DATA" send_content = json.dumps({'code': 1}) self.send_content(send_content, 101, [phone_number]) def handle_data(self, command_id, content): """ 根據command_id來分配函數 """ self.command_func_dict[command_id](content) def handle_single_chat(self, content): """ 單播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') log.msg(chat_content.encode('utf-8')) send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(send_content, 102, [chat_to]) def handle_group_chat(self, content): """ 組播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = chat_to self.send_content(send_content, 103, phone_numbers) def handle_broadcast_chat(self, content): """ 廣播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = self.users.keys() self.send_content(send_content, 104, phone_numbers) def send_content(self, send_content, command_id, phone_numbers): """ 發送函數 """ length = 12 + len(send_content) version = self.version command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) for phone_number in phone_numbers: if phone_number in self.users.keys(): self.users[phone_number].transport.write(header_pack + send_content) else: log.msg("Phone_number:%s 不在線,不能聊天." % phone_number.encode('utf-8')) class ChatFactory(Factory): def __init__(self): self.users = {} def buildProtocol(self, addr): return Chat(self.users) def check_users_online(self): for key, value in self.users.items(): if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4: log.msg("[%s]沒有檢測到心跳包,主動切斷" % key.encode('utf-8')) value.transport.abortConnection() cf = ChatFactory() task1 = task.LoopingCall(cf.check_users_online) task1.start(3, now=False) reactor.listenTCP(8124, cf) reactor.run()
就像上面所說的,加了一個接受心跳包的檢測的函數,handle_heartbeat,每次來一個心跳包,就把它相應的last_heartbeat_time變換一下,這樣,整體輪詢檢測的時候,我只要判斷最后一次連接時間和當前連接時間之差,就可以判斷它是不是異常斷線了。
這里看我異常斷線的處理,transport.abortConnection(),從字面意思上,直接丟棄這個連接,它會調用Protocol的connectionLost,而且它不管那個fd里面有沒有數據,全部丟棄。這個我們以后用netstat分析連接的時候,會進一步說明這個函數,現在只要記住,它會強行中斷這個連接,刪除任何緩存在里面的數據即可。
frontClient.py
# coding:utf-8 from twisted.internet import reactor, task from twisted.internet.protocol import Protocol, ClientFactory import struct from twisted.python import log import sys import json log.startLogging(sys.stdout) class EchoClient(Protocol): def __init__(self): self.command_func_dict = { 101: self.handle_verify_s, 102: self.handle_single_chat_s, 103: self.handle_group_chat_s, 104: self.handle_broadcast_chat_s } self.version = 0 self.state = "VERIFY" self.phone_number = "" def connectionMade(self): log.msg("New connection", self.transport.getPeer()) def dataReceived(self, data): length, self.version, command_id = struct.unpack('!3I', data[:12]) content = data[12:length] if self.state == "VERIFY" and command_id == 101: self.handle_verify_s(content) else: self.handle_data(command_id, content) def handle_data(self, command_id, pack_data): self.command_func_dict[command_id](pack_data) def connectionLost(self, reason): log.msg("connection lost") def handle_verify_s(self, pack_data): """ 接受驗證結果 """ content = json.loads(pack_data) code = content.get('code') if code == 1: log.msg('驗證通過') self.state = "Data" def handle_single_chat_s(self, pack_data): """ 接受單聊 """ content = json.loads(pack_data) chat_from = content.get('chat_from') chat_content = content.get('chat_content') log.msg("[單聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def handle_group_chat_s(self, pack_data): """ 接受組聊 """ content = json.loads(pack_data) chat_from = content.get('chat_from') chat_content = content.get('chat_content') log.msg("[組聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def handle_broadcast_chat_s(self, pack_data): """ 接受廣播 """ content = json.loads(pack_data) chat_from = content.get('chat_from') chat_content = content.get('chat_content') log.msg("[群聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def send_verify(self, phone_number): """ 發送驗證 """ content = json.dumps(dict(phone_number=phone_number)) self.send_data(content, 1) def send_single_chat(self, chat_from, chat_to, chat_content): """ 發送單聊內容 """ content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content)) self.send_data(content, 2) def send_group_chat(self, chat_from, chat_to, chat_content): """ 發送組聊內容 """ content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content)) self.send_data(content, 3) def send_broadcast_chat(self, chat_from, chat_content): """ 發送群聊內容 """ content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_data(content, 4) def send_data(self, send_content, command_id): """ 發送函數 """ length = 12 + len(send_content) version = self.version command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) self.transport.write(header_pack + send_content) def send_heartbeat(self): """ 發送心跳包 """ length = 12 version = self.version command_id = 5 header = [length, version, command_id] header_pack = struct.pack('!3I', *header) self.transport.write(header_pack) class EchoClientFactory(ClientFactory): def __init__(self): self.p = EchoClient() def startedConnecting(self, connector): log.msg("Started to connect") def buildProtocol(self, addr): log.msg("Connected.") return self.p def clientConnectionFailed(self, connector, reason): log.msg("Lost connection. Reason:", reason) def clientConnectionLost(self, connector, reason): log.msg("Connection failed. Reason:", reason) if __name__ == '__main__': cf = EchoClientFactory() chat_from = sys.argv[1] all_phone_numbers = ['000001', '000002', '000003', '000004'] all_phone_numbers.remove(chat_from) import random task_send_heartbeat = task.LoopingCall(cf.p.send_heartbeat) task_send_heartbeat.start(2, now=False) reactor.callLater(3, cf.p.send_verify, chat_from) reactor.callLater(10, cf.p.send_single_chat, chat_from, '000002', '你好,這是10秒的時候發送') reactor.callLater(20, cf.p.send_single_chat, chat_from, '000002', '你好,這是20秒的時候發送') reactor.connectTCP('192.168.5.60', 8124, cf) reactor.run()
這邊就添加了一個心跳包發送程序,每隔2秒發送一個心跳包。
我在000002的客戶端在10秒和20秒之間,拔掉了網線,看看調試效果,
先看服務器端的調試結果。
/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py 2016-06-22 15:15:23+0800 [-] Log opened. 2016-06-22 15:15:23+0800 [-] ChatFactory starting on 8124 2016-06-22 15:15:23+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7ff3c3615758> 2016-06-22 15:15:53+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.15', 39774) 2016-06-22 15:15:54+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.60', 36084) 2016-06-22 15:15:56+0800 [Chat,0,192.168.5.15] 歡迎, 000002! 2016-06-22 15:15:57+0800 [Chat,1,192.168.5.60] 歡迎, 000001! 2016-06-22 15:16:04+0800 [Chat,1,192.168.5.60] 你好,這是10秒的時候發送 2016-06-22 15:16:11+0800 [-] [000002]沒有檢測到心跳包,主動切斷 2016-06-22 15:16:11+0800 [-] [000002]:斷線 2016-06-22 15:16:14+0800 [Chat,1,192.168.5.60] 你好,這是20秒的時候發送 2016-06-22 15:16:14+0800 [Chat,1,192.168.5.60] Phone_number:000002 不在線,不能聊天.
看見沒有,已經能主動檢測到了。
再看一下客戶端000002的調試結果
yudahai@yu-sony:~/PycharmProjects/flask001$ python frontClient.py 000002 2016-06-22 15:15:53+0800 [-] Log opened. 2016-06-22 15:15:53+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f4e3e3d56c8> 2016-06-22 15:15:53+0800 [-] Started to connect 2016-06-22 15:15:53+0800 [Uninitialized] Connected. 2016-06-22 15:15:53+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124) 2016-06-22 15:15:56+0800 [EchoClient,client] 驗證通過 2016-06-22 15:16:04+0800 [EchoClient,client] [單聊][000001]:你好,這是10秒的時候發送 2016-06-22 15:24:27+0800 [EchoClient,client] connection lost 2016-06-22 15:24:27+0800 [EchoClient,client] Connection failed. Reason: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion. ] 2016-06-22 15:24:27+0800 [EchoClient,client] Stopping factory <__main__.EchoClientFactory instance at 0x7f4e3e3d56c8>
比較有意思,15:16我中斷了連接,沒有接受到,這時候服務器主動切斷網絡,再連接上來的時候,它已經接受到消息,自己被中斷了,其實客戶端應該有個斷線重連機制,不過這是客戶端的事,主要看你的業務需求。
到這,利用心跳包來檢測異常網絡情況就完成了,如果你有更好的方案,歡迎大家跟我討論,畢竟我不是專門做tcpserver的,很多東西可能沒有研究到。
下一章,我們研究twisted連接redis,把一些很狀態轉移到redis中,這樣,其他模塊就能共享這個狀態了,這在物聯網中,用到尤其多,比如設備在線斷線狀態、報警狀態等,前端web可以直接拿來使用了;以后我們還會講rabbitmq在twisted中的應用。