twisted(3)--再談twisted


  上一章,我們直接寫了一個小例子來從整體講述twisted運行的大致過程,今天我們首先深入一些概念,在逐漸明白這些概念以后,我們會修改昨天寫的例子。

  先看下面一張圖:

  這個系列的第一篇文章,我們已經為大家展示了一張twisted的原理圖,那張圖,因為我們沒有捕獲任何socket事件,所以只有一個圈。這張圖上面的大圈代表捕獲socket事件,這個也是twisted最主要的功能,它已經為我們做了。並且提供了2個函數,transport.write寫入事件,dataReceived讀取事件。下面的小圈子,也就是我們自己的代碼,比如我們昨天的驗證、單聊、組聊等功能。大家一定要時時刻刻記住這張圖,編寫twisted代碼的時候,腦子里印着這張圖,這就跟我們以前寫c代碼的時候,一定要記住內存模型一樣。

  回到這個大圈,transport.write和dataReceived其實是經過很多層封裝的函數,它們本質上還是操作select模型中的寫文件描述符(write_fd)、讀文件描述符(read_fd),對應twisted的基礎類就是IWriteDescriptor和IReadDescriptor,如果我們比較熟悉select模型,我們都知道,每次新來一個連接,都是建立write_fd、read_fd、error_fd,select不停的輪詢這些fd,當其中任何一個滿足條件時,觸發相應的事件,這些所有的東西,twisted都已經幫我們做好了,而且異步化了。我們接受到事件,只管處理就好了。

  再看下面一個圖,

  仔細看上面這個圖,再對比之前的圖,twisted在socket這塊全部為我們做好。

  下面我們再講一下transport這個對象,這個對象在每個Protocol里面都會產生一個,它代表一個連接,這個連接可以是socket,也可以是unix的pipe,twisted已經為我們封裝好,一般不會自己去新建它。通常我們會用它來發送數據(write)、獲取連接另一方的信息(getPeer)。

  再看一下dataReceived這個函數,就是每次接到數據以后觸發事件,上面說了,就是每次循環,select檢查這些fd,fd被寫入就觸發。這時候大家想想,如果循環被阻塞,在這個data里面會有很多數據,按照我們昨天的程序,只會處理第一個數據,其他的可能被丟棄掉了。

  我們昨天的例子,把客戶端運行代碼稍微修改一下,在第10秒的時候,同時發送2個數據(粘包),看看服務器運行情況。

if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random
    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,這是單聊')
    reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,這是單聊')
    # reactor.callLater(11, cf.p.send_group_chat, chat_from, [random.choice(all_phone_numbers), random.choice(all_phone_numbers)], '你好,這是組聊')
    # reactor.callLater(12, cf.p.send_broadcast_chat, chat_from, '你好,這是群聊')

    reactor.connectTCP('127.0.0.1', 8124, cf)

    reactor.run()

  客戶端代碼已經更改,運行一下,看看服務器結果。

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 10:23:55+0800 [-] Log opened.
2016-06-22 10:23:55+0800 [-] ChatFactory starting on 8124
2016-06-22 10:23:55+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f382d908638>
2016-06-22 10:24:02+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 47834)
2016-06-22 10:24:05+0800 [Chat,0,127.0.0.1] 歡迎, 000001!
2016-06-22 10:24:12+0800 [Chat,0,127.0.0.1] 你好,這是單聊
2016-06-22 10:24:12+0800 [Chat,0,127.0.0.1] Phone_number:000002 不在線,不能聊天.

  果然,只處理了一個數據,后面一個直接丟棄掉了。

  通常來說,我們都會為每個Protocol申請一段內存,每次接受到數據以后,先存放到這段內存中,然后再集中處理,這樣,即使循環被blocking住或者客戶端粘包,我們也能正確處理。新的代碼如下:

  

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor
import struct
import json
from twisted.python import log
import sys
log.startLogging(sys.stdout)


class Chat(Protocol):
    def __init__(self, users):
        self.users = users
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat
        }
        self._data_buffer = bytes()

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        if self.phone_number in self.users:
            del self.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到數據以后的操作
        """
        self._data_buffer += data

        while True:
            length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12])

            if length > len(self._data_buffer):
                break

            content = self._data_buffer[12:length]

            if command_id not in [1, 2, 3, 4]:
                return

            if self.state == "VERIFY" and command_id == 1:
                self.handle_verify(content)
            else:
                self.handle_data(command_id, content)

            self._data_buffer = self._data_buffer[length:]

            if len(self._data_buffer) < 12:
                break

    def handle_verify(self, content):
        """
        驗證函數
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        if phone_number in self.users:
            log.msg("電話號碼<%s>存在老的連接." % phone_number.encode('utf-8'))
            self.users[phone_number].connectionLost("")
        log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根據command_id來分配函數
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        單播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        log.msg(chat_content.encode('utf-8'))
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        組播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        廣播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在線,不能聊天." % phone_number.encode('utf-8'))


class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self.users)


reactor.listenTCP(8124, ChatFactory())
reactor.run()

  我們在構造函數里面,加入了一個字段,這個字段就是self._data_buffer,在每次接受到數據以后,都循環處理這段內存。再看看運行結果,有什么不同。

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 10:40:42+0800 [-] Log opened.
2016-06-22 10:40:42+0800 [-] ChatFactory starting on 8124
2016-06-22 10:40:42+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f96860e0680>
2016-06-22 10:40:57+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 48010)
2016-06-22 10:41:00+0800 [Chat,0,127.0.0.1] 歡迎, 000001!
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] 你好,這是單聊
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] Phone_number:000004 不在線,不能聊天.
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] 你好,這是單聊
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] Phone_number:000003 不在線,不能聊天.

  是不是正確了?接受數據,我們先講到這,下面我們講開發tcpserver一定要處理的問題,異常斷線

異常斷線

  異常斷線的處理在tcpserver開發過程中必不可少,很多時候,尤其是無線、3G、4G網絡,信號不好的時候就斷線,由於是網絡問題,沒有經過tcp結束的4次握手,服務器不可能及時檢查到此事件,這時候就有可能出錯。通常我們會采取一種心跳包機制,即客戶端每隔一段時間就向服務器端發送一個心跳包,服務器端每隔一段時間就檢測一下,如果發現客戶端連續2次或者多次沒有發送心跳包,就認為客戶端已經掉線,再采取措施。

  好了,說了這么多,先要重新部署一下程序,我把一個客戶端發在我的另外一台筆記本上,先連接好,然后拔掉網線,再從服務器端發送一組數據過去,看看會發生什么。

  首先,我們把000002放在筆記本上,000001在服務器端,在10秒和20秒的時候,分別發送一個單聊給000002,看看服務器端和000002的情況。

000001的運行代碼修改如下:

if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random
    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, '000002', '你好,這是10秒的時候發送')
    reactor.callLater(20, cf.p.send_single_chat, chat_from, '000002', '你好,這是20秒的時候發送')

    reactor.connectTCP('127.0.0.1', 8124, cf)

    reactor.run()

  10秒和20秒,分別發送數據到服務器端,而000002端,在10秒和20秒的中間,拔掉網線,我們看看發生了什么情況。

  首先,服務器端的運行結果如下:

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 11:40:02+0800 [-] Log opened.
2016-06-22 11:40:02+0800 [-] ChatFactory starting on 8124
2016-06-22 11:40:02+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f9c39f89638>
2016-06-22 11:41:26+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.15', 57150)
2016-06-22 11:41:29+0800 [Chat,0,192.168.5.15] 歡迎, 000002!
2016-06-22 11:41:41+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 49526)
2016-06-22 11:41:44+0800 [Chat,1,127.0.0.1] 歡迎, 000001!
2016-06-22 11:41:51+0800 [Chat,1,127.0.0.1] 你好,這是10秒的時候發送
2016-06-22 11:42:01+0800 [Chat,1,127.0.0.1] 你好,這是20秒的時候發送

  它在000002中斷了以后,並沒有發現000002已經中斷,還是照樣write下去,其實本質上,它還是把數據發到了write_fd上,然后就是底層的事了。

  而000002客戶端的結果比較有意思。

2016-06-22 11:41:26+0800 [-] Log opened.
2016-06-22 11:41:26+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f4e75db7680>
2016-06-22 11:41:26+0800 [-] Started to connect
2016-06-22 11:41:26+0800 [Uninitialized] Connected.
2016-06-22 11:41:26+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-22 11:41:29+0800 [EchoClient,client] 驗證通過
2016-06-22 11:41:51+0800 [EchoClient,client] [單聊][000001]:你好,這是10秒的時候發送
2016-06-22 11:44:27+0800 [EchoClient,client] [單聊][000001]:你好,這是20秒的時候發送

  大家注意到沒有,居然還是收到了,但是看時間,時間和原來的是不對的。我后來把網線重新插上去,然后就接受到了。twisted把write_fd的數據重新發送給了客戶端,因為客戶端沒有任何改變,ip和端口都是原來的,網絡情況沒有改變,所以再次就連接上來。

  我們再試一下另外一種情況,也是移動端經常遇到的情況,就是切換網絡,比如從4G切換到無線網,看看會發生什么。

yudahai@yu-sony:~/PycharmProjects/flask001$ python frontClient.py 000002
2016-06-22 13:09:34+0800 [-] Log opened.
2016-06-22 13:09:34+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fd8a0408680>
2016-06-22 13:09:34+0800 [-] Started to connect
2016-06-22 13:09:34+0800 [Uninitialized] Connected.
2016-06-22 13:09:34+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-22 13:09:37+0800 [EchoClient,client] 驗證通過
2016-06-22 13:09:54+0800 [EchoClient,client] [單聊][000001]:你好,這是10秒的時候發送

  客戶端再也收不到了,這也是真實情況。通常來說,用戶切換網絡的時候,都會更改網絡信息,這時候移動客戶端再也收不到這個信息了,而且服務器端也不會報錯(以后要為我們做消息確認機制埋下伏筆。)

  既然收不到了,我們就解決這個問題,上面說了,增加心跳包機制,客戶端每隔一段時間發送一次心跳包,服務器端收到心跳包以后,記錄最近一次接受到的時間。每隔一段時間,服務器整體輪詢一次,如果發現某一個客戶端很長時間沒有接受到心跳包,就判定它為斷線,這時候主動切斷這個客戶端。

  心跳包的command_id也要加上,直接為5吧,內容為空。只是心跳包,沒有必要寫內容了。

  新代碼如下:
  frontTCP.py

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor, task
import struct
import json
from twisted.python import log
import sys
import time
log.startLogging(sys.stdout)


class Chat(Protocol):
    def __init__(self, users):
        self.users = users
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.last_heartbeat_time = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat,
            5: self.handle_heartbeat
        }
        self._data_buffer = bytes()

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        log.msg("[%s]:斷線" % self.phone_number.encode('utf-8'))
        if self.phone_number in self.users:
            del self.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到數據以后的操作
        """
        self._data_buffer += data

        while True:
            length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12])

            if length > len(self._data_buffer):
                break

            content = self._data_buffer[12:length]

            if command_id not in [1, 2, 3, 4, 5]:
                return

            if self.state == "VERIFY" and command_id == 1:
                self.handle_verify(content)
            else:
                self.handle_data(command_id, content)

            self._data_buffer = self._data_buffer[length:]

            if len(self._data_buffer) < 12:
                break

    def handle_heartbeat(self, content):
        """
        處理心跳包
        """
        self.last_heartbeat_time = int(time.time())

    def handle_verify(self, content):
        """
        驗證函數
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        if phone_number in self.users:
            log.msg("電話號碼<%s>存在老的連接." % phone_number.encode('utf-8'))
            self.users[phone_number].connectionLost("")
        log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根據command_id來分配函數
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        單播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        log.msg(chat_content.encode('utf-8'))
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        組播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        廣播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在線,不能聊天." % phone_number.encode('utf-8'))


class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self.users)

    def check_users_online(self):
        for key, value in self.users.items():
            if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4:
                log.msg("[%s]沒有檢測到心跳包,主動切斷" % key.encode('utf-8'))
                value.transport.abortConnection()

cf = ChatFactory()

task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False)

reactor.listenTCP(8124, cf)
reactor.run()

  就像上面所說的,加了一個接受心跳包的檢測的函數,handle_heartbeat,每次來一個心跳包,就把它相應的last_heartbeat_time變換一下,這樣,整體輪詢檢測的時候,我只要判斷最后一次連接時間和當前連接時間之差,就可以判斷它是不是異常斷線了。

  這里看我異常斷線的處理,transport.abortConnection(),從字面意思上,直接丟棄這個連接,它會調用Protocol的connectionLost,而且它不管那個fd里面有沒有數據,全部丟棄。這個我們以后用netstat分析連接的時候,會進一步說明這個函數,現在只要記住,它會強行中斷這個連接,刪除任何緩存在里面的數據即可。

 

  frontClient.py

# coding:utf-8
from twisted.internet import reactor, task
from twisted.internet.protocol import Protocol, ClientFactory
import struct
from twisted.python import log
import sys
import json
log.startLogging(sys.stdout)


class EchoClient(Protocol):
    def __init__(self):
        self.command_func_dict = {
            101: self.handle_verify_s,
            102: self.handle_single_chat_s,
            103: self.handle_group_chat_s,
            104: self.handle_broadcast_chat_s
        }
        self.version = 0
        self.state = "VERIFY"
        self.phone_number = ""

    def connectionMade(self):
        log.msg("New connection", self.transport.getPeer())

    def dataReceived(self, data):
        length, self.version, command_id = struct.unpack('!3I', data[:12])
        content = data[12:length]
        if self.state == "VERIFY" and command_id == 101:
            self.handle_verify_s(content)
        else:
            self.handle_data(command_id, content)

    def handle_data(self, command_id, pack_data):
        self.command_func_dict[command_id](pack_data)

    def connectionLost(self, reason):
        log.msg("connection lost")

    def handle_verify_s(self, pack_data):
        """
        接受驗證結果
        """
        content = json.loads(pack_data)
        code = content.get('code')
        if code == 1:
            log.msg('驗證通過')
        self.state = "Data"

    def handle_single_chat_s(self, pack_data):
        """
        接受單聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[單聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_group_chat_s(self, pack_data):
        """
        接受組聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[組聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_broadcast_chat_s(self, pack_data):
        """
        接受廣播
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[群聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def send_verify(self, phone_number):
        """
        發送驗證
        """
        content = json.dumps(dict(phone_number=phone_number))
        self.send_data(content, 1)

    def send_single_chat(self, chat_from, chat_to, chat_content):
        """
        發送單聊內容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 2)

    def send_group_chat(self, chat_from, chat_to, chat_content):
        """
        發送組聊內容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 3)

    def send_broadcast_chat(self, chat_from, chat_content):
        """
        發送群聊內容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))
        self.send_data(content, 4)

    def send_data(self, send_content, command_id):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        self.transport.write(header_pack + send_content)

    def send_heartbeat(self):
        """
        發送心跳包
        """
        length = 12
        version = self.version
        command_id = 5
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        self.transport.write(header_pack)


class EchoClientFactory(ClientFactory):
    def __init__(self):
        self.p = EchoClient()

    def startedConnecting(self, connector):
        log.msg("Started to connect")

    def buildProtocol(self, addr):
        log.msg("Connected.")
        return self.p

    def clientConnectionFailed(self, connector, reason):
        log.msg("Lost connection. Reason:", reason)

    def clientConnectionLost(self, connector, reason):
        log.msg("Connection failed. Reason:", reason)


if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random

    task_send_heartbeat = task.LoopingCall(cf.p.send_heartbeat)
    task_send_heartbeat.start(2, now=False)

    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, '000002', '你好,這是10秒的時候發送')
    reactor.callLater(20, cf.p.send_single_chat, chat_from, '000002', '你好,這是20秒的時候發送')

    reactor.connectTCP('192.168.5.60', 8124, cf)

    reactor.run()

  這邊就添加了一個心跳包發送程序,每隔2秒發送一個心跳包。

  我在000002的客戶端在10秒和20秒之間,拔掉了網線,看看調試效果,

  先看服務器端的調試結果。

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 15:15:23+0800 [-] Log opened.
2016-06-22 15:15:23+0800 [-] ChatFactory starting on 8124
2016-06-22 15:15:23+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7ff3c3615758>
2016-06-22 15:15:53+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.15', 39774)
2016-06-22 15:15:54+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.60', 36084)
2016-06-22 15:15:56+0800 [Chat,0,192.168.5.15] 歡迎, 000002!
2016-06-22 15:15:57+0800 [Chat,1,192.168.5.60] 歡迎, 000001!
2016-06-22 15:16:04+0800 [Chat,1,192.168.5.60] 你好,這是10秒的時候發送
2016-06-22 15:16:11+0800 [-] [000002]沒有檢測到心跳包,主動切斷
2016-06-22 15:16:11+0800 [-] [000002]:斷線
2016-06-22 15:16:14+0800 [Chat,1,192.168.5.60] 你好,這是20秒的時候發送
2016-06-22 15:16:14+0800 [Chat,1,192.168.5.60] Phone_number:000002 不在線,不能聊天.

  看見沒有,已經能主動檢測到了。

  再看一下客戶端000002的調試結果

yudahai@yu-sony:~/PycharmProjects/flask001$ python frontClient.py 000002
2016-06-22 15:15:53+0800 [-] Log opened.
2016-06-22 15:15:53+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f4e3e3d56c8>
2016-06-22 15:15:53+0800 [-] Started to connect
2016-06-22 15:15:53+0800 [Uninitialized] Connected.
2016-06-22 15:15:53+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-22 15:15:56+0800 [EchoClient,client] 驗證通過
2016-06-22 15:16:04+0800 [EchoClient,client] [單聊][000001]:你好,這是10秒的時候發送
2016-06-22 15:24:27+0800 [EchoClient,client] connection lost
2016-06-22 15:24:27+0800 [EchoClient,client] Connection failed. Reason: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion.
    ]
2016-06-22 15:24:27+0800 [EchoClient,client] Stopping factory <__main__.EchoClientFactory instance at 0x7f4e3e3d56c8>

  比較有意思,15:16我中斷了連接,沒有接受到,這時候服務器主動切斷網絡,再連接上來的時候,它已經接受到消息,自己被中斷了,其實客戶端應該有個斷線重連機制,不過這是客戶端的事,主要看你的業務需求。

  

  到這,利用心跳包來檢測異常網絡情況就完成了,如果你有更好的方案,歡迎大家跟我討論,畢竟我不是專門做tcpserver的,很多東西可能沒有研究到。

  下一章,我們研究twisted連接redis,把一些很狀態轉移到redis中,這樣,其他模塊就能共享這個狀態了,這在物聯網中,用到尤其多,比如設備在線斷線狀態、報警狀態等,前端web可以直接拿來使用了;以后我們還會講rabbitmq在twisted中的應用。

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM