Twisted框架學習


  Twisted是用Python實現的基於事件驅動的網絡引擎框架,是python中一個強大的異步IO庫。理解twisted的一個前提是弄清楚twisted中幾個核心的概念: reactor, Protocl, ProtocolFactory, Deffered

 1 reactor

twisted.internet.reactor

https://twistedmatrix.com/documents/current/core/howto/reactor-basics.html

reactor是twisted異步框架中的核心組件,是一個基於select,poll或epoll的事件循環,其監聽socket的狀態,當socket狀態有變化時(有新的連接請求,或接受到數據等)時,調用相應的組件來進行處理。如下圖的reactor loop一樣,不斷的循環掃描socket列表中監聽對象的狀態,當有特定事件發生時(socket狀態變化)調用回調函數callback,來處理事件,這時候執行權限交給回調函數,當我們的代碼處理完事件后,將執行權返回給reactor,繼續進行循環監聽。

2,Factory和Protocol

 

twisted.internet.protocol.Factory

 

twisted.internet.protocol.Protocol

參考文章:

 

https://twistedmatrix.com/documents/current/core/howto/servers.html

https://twistedmatrix.com/documents/current/core/howto/clients.html

https://zhuanlan.zhihu.com/p/28763807?utm_source=wechat_session&utm_medium=social

  Factory和Protocol都是用來處理一些配置和協議相關的底層業務,如socket之間連接,數據的發送格式等。Factory設置持久的,多個socket可共享的通用配置,Protocol為設置單個socket的特定配置。當有一個socket連接請求時,Factory創建一個Protocol實例,並將該實例的factory屬性指向自己,請求斷開時,protocol即被銷毀。如下第一幅圖中,基於twisted的一個異步服務器,其中endpiont為綁定的ip和端口,reactor監聽其socket的狀態,當有連接請求時,reactor調用Factory來設置相關配置,其隨后會創建(buildProtocol)Protocol實例,Protocol實例的Transport屬性會處理客戶端socket的請求,並執行相應的回調函數。在第二幅圖中可以看到,reactor共監聽四個socket,一個是服務端listening socket(其綁定的ip和port),和三個客戶端socket,而每個客戶端socket都有自己的Protocol來處理相應的數據交互請求,這些Protocol都由Factory創建。(也可以有多個Factory,每個Factory創建多個Protocol)

Factory: 主要用來創建protocol,也可以定義其他操作

twisted.internet.protocol.Factory

  Factory類的源碼如下,其有屬性protocol和方法buildProtocol()較為重要,其中protocol指向需要創建的Protocol類,從buildProtocol()方法可以看到其創建了Protocol實例,並且將該實例的factory屬性指向了Factory 實例。startFactory()和stopFactory()相當於鈎子函數,在factory和端口連接和斷開時調用。在實際應用時,一般選擇繼承Factory的子類,並實現相應的方法,如ClientFactory,SeverFactory。

@implementer(interfaces.IProtocolFactory, interfaces.ILoggingContext)
class Factory:
    """
    This is a factory which produces protocols.

    By default, buildProtocol will create a protocol of the class given in
    self.protocol.
    """

    # put a subclass of Protocol here:
    protocol = None

    numPorts = 0
    noisy = True

    @classmethod
    def forProtocol(cls, protocol, *args, **kwargs):
        """
        Create a factory for the given protocol.

        It sets the C{protocol} attribute and returns the constructed factory
        instance.

        @param protocol: A L{Protocol} subclass

        @param args: Positional arguments for the factory.

        @param kwargs: Keyword arguments for the factory.

        @return: A L{Factory} instance wired up to C{protocol}.
        """
        factory = cls(*args, **kwargs)
        factory.protocol = protocol
        return factory


    def logPrefix(self):
        """
        Describe this factory for log messages.
        """
        return self.__class__.__name__


    def doStart(self):
        """Make sure startFactory is called.

        Users should not call this function themselves!
        """
        if not self.numPorts:
            if self.noisy:
                _loggerFor(self).info("Starting factory {factory!r}",
                                      factory=self)
            self.startFactory()
        self.numPorts = self.numPorts + 1

    def doStop(self):
        """Make sure stopFactory is called.

        Users should not call this function themselves!
        """
        if self.numPorts == 0:
            # this shouldn't happen, but does sometimes and this is better
            # than blowing up in assert as we did previously.
            return
        self.numPorts = self.numPorts - 1
        if not self.numPorts:
            if self.noisy:
                _loggerFor(self).info("Stopping factory {factory!r}",
                                      factory=self)
            self.stopFactory()

    def startFactory(self):
        """This will be called before I begin listening on a Port or Connector.

        It will only be called once, even if the factory is connected
        to multiple ports.

        This can be used to perform 'unserialization' tasks that
        are best put off until things are actually running, such
        as connecting to a database, opening files, etcetera.
        """

    def stopFactory(self):
        """This will be called before I stop listening on all Ports/Connectors.

        This can be overridden to perform 'shutdown' tasks such as disconnecting
        database connections, closing files, etc.

        It will be called, for example, before an application shuts down,
        if it was connected to a port. User code should not call this function
        directly.
        """


    def buildProtocol(self, addr):
        """
        Create an instance of a subclass of Protocol.

        The returned instance will handle input on an incoming server
        connection, and an attribute "factory" pointing to the creating
        factory.

        Alternatively, L{None} may be returned to immediately close the
        new connection.

        Override this method to alter how Protocol instances get created.

        @param addr: an object implementing L{twisted.internet.interfaces.IAddress}
        """
        p = self.protocol()
        p.factory = self
        return p
Factory

  繼承Factory類,創建protocol實例,有兩種方式,一是設置其屬性protocol,不覆蓋buildProtocol()方法;二是不設置屬性protocol,覆蓋buildProtocol()方法,在方法內部創建protocol實例,並返回。代碼如下:

from twisted.internet.protocol import Factory, Protocol
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTD(Protocol):

    def connectionMade(self):
        # self.factory was set by the factory's default buildProtocol:
        self.transport.write(self.factory.quote + '\r\n')
        self.transport.loseConnection()


class QOTDFactory(Factory):

    # This will be used by the default buildProtocol to create new protocols:
    protocol = QOTD

    def __init__(self, quote=None):
        self.quote = quote or 'An apple a day keeps the doctor away'

endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory("configurable quote"))
reactor.run()
設置protocol屬性
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTD(Protocol):

    def connectionMade(self):
        # self.factory was set by the factory's default buildProtocol:
        self.transport.write(self.factory.quote + '\r\n')
        self.transport.loseConnection()

class QOTDFactory(Factory):
    def buildProtocol(self, addr):
        return QOTD()

# 8007 is the port you want to run under. Choose something >1024
endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory())
reactor.run()
實現buildProtocol方法

startFactory()和stopFactory()示例

from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver

#LineReceiver為一種protocol
class LoggingProtocol(LineReceiver):

    def lineReceived(self, line):
        self.factory.fp.write(line + '\n')


class LogfileFactory(Factory):

    protocol = LoggingProtocol

    def __init__(self, fileName):
        self.file = fileName

    def startFactory(self):
        self.fp = open(self.file, 'a')

    def stopFactory(self):
        self.fp.close()
View Code

 

Protocol:主要用來處理連接建立和斷開時的操作,以及數據的接受和發送操作

twisted.internet.protocol.Protocol

Protocol繼承了BaseProtocol, BaseProtocol和Protocol的源碼如下:

class BaseProtocol:
    """
    This is the abstract superclass of all protocols.

    Some methods have helpful default implementations here so that they can
    easily be shared, but otherwise the direct subclasses of this class are more
    interesting, L{Protocol} and L{ProcessProtocol}.
    """
    connected = 0
    transport = None

    def makeConnection(self, transport):
        """Make a connection to a transport and a server.

        This sets the 'transport' attribute of this Protocol, and calls the
        connectionMade() callback.
        """
        self.connected = 1
        self.transport = transport
        self.connectionMade()

    def connectionMade(self):
        """Called when a connection is made.

        This may be considered the initializer of the protocol, because
        it is called when the connection is completed.  For clients,
        this is called once the connection to the server has been
        established; for servers, this is called after an accept() call
        stops blocking and a socket has been received.  If you need to
        send any greeting or initial message, do it here.
        """
BaseProtocol
@implementer(interfaces.IProtocol, interfaces.ILoggingContext)
class Protocol(BaseProtocol):
    """
    This is the base class for streaming connection-oriented protocols.

    If you are going to write a new connection-oriented protocol for Twisted,
    start here.  Any protocol implementation, either client or server, should
    be a subclass of this class.

    The API is quite simple.  Implement L{dataReceived} to handle both
    event-based and synchronous input; output can be sent through the
    'transport' attribute, which is to be an instance that implements
    L{twisted.internet.interfaces.ITransport}.  Override C{connectionLost} to be
    notified when the connection ends.

    Some subclasses exist already to help you write common types of protocols:
    see the L{twisted.protocols.basic} module for a few of them.
    """

    def logPrefix(self):
        """
        Return a prefix matching the class name, to identify log messages
        related to this protocol instance.
        """
        return self.__class__.__name__


    def dataReceived(self, data):
        """Called whenever data is received.

        Use this method to translate to a higher-level message.  Usually, some
        callback will be made upon the receipt of each complete protocol
        message.

        @param data: a string of indeterminate length.  Please keep in mind
            that you will probably need to buffer some data, as partial
            (or multiple) protocol messages may be received!  I recommend
            that unit tests for protocols call through to this method with
            differing chunk sizes, down to one byte at a time.
        """

    def connectionLost(self, reason=connectionDone):
        """Called when the connection is shut down.

        Clear any circular references here, and any external references
        to this Protocol.  The connection has been closed.

        @type reason: L{twisted.python.failure.Failure}
        """
Protocol

  BaseProtocol有兩個屬性(connected和transport),從其makeConnection()中可以看到,每創建一個連接,conencted值加一,為transport賦值,並調用鈎子函數connectionMade().

  Protocol有兩個鈎子函數dataReceived()和connectionLost(), 分別在接受到客戶端數據和斷開連接時調用,自定義相應的代碼來處理數據和斷開連接時的清理工作。

 

twisted.protocols.basic

twisted.words.protocols

  除了繼承Protocol來定義操作外,還可以繼承其他的協議,如twisted.protocols.basic中的LineReceiver, LineReceiver等,twisted.words.protocols.irc中的IRCClient等

可以參見:https://twistedmatrix.com/documents/18.9.0/api/twisted.protocols.basic.html

https://twistedmatrix.com/documents/current/api/twisted.words.protocols.irc.html

繼承LineReciver

from twisted.protocols.basic import LineReceiver

class Answer(LineReceiver):

    answers = {'How are you?': 'Fine', None: "I don't know what you mean"}

    def lineReceived(self, line):
        if line in self.answers:
            self.sendLine(self.answers[line])
        else:
            self.sendLine(self.answers[None])
LineReciver

基於twisted的服務端

from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor

class Chat(LineReceiver):

    def __init__(self, users):
        self.users = users
        self.name = None
        self.state = "GETNAME"

    def connectionMade(self):
        self.sendLine("What's your name?")

    def connectionLost(self, reason):
        if self.name in self.users:
            del self.users[self.name]

    def lineReceived(self, line):
        if self.state == "GETNAME":
            self.handle_GETNAME(line)
        else:
            self.handle_CHAT(line)

    def handle_GETNAME(self, name):
        if name in self.users:
            self.sendLine("Name taken, please choose another.")
            return
        self.sendLine("Welcome, %s!" % (name,))
        self.name = name
        self.users[name] = self
        self.state = "CHAT"

    def handle_CHAT(self, message):
        message = "<%s> %s" % (self.name, message)
        for name, protocol in self.users.iteritems():
            if protocol != self:
                protocol.sendLine(message)


class ChatFactory(Factory):

    def __init__(self):
        self.users = {} # maps user names to Chat instances

    def buildProtocol(self, addr):
        return Chat(self.users)


reactor.listenTCP(8123, ChatFactory())
reactor.run()
View Code

繼承IRCClient

from twisted.words.protocols import irc
from twisted.internet import protocol

class LogBot(irc.IRCClient):

    def connectionMade(self):
        irc.IRCClient.connectionMade(self)
        self.logger = MessageLogger(open(self.factory.filename, "a"))
        self.logger.log("[connected at %s]" %
                        time.asctime(time.localtime(time.time())))

    def signedOn(self):
        self.join(self.factory.channel)


class LogBotFactory(protocol.ClientFactory):

    def __init__(self, channel, filename):
        self.channel = channel
        self.filename = filename

    def buildProtocol(self, addr):
        p = LogBot()
        p.factory = self
        return p
IRCCClient

基於twisted的客戶端

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.


"""
An example IRC log bot - logs a channel's events to a file.

If someone says the bot's name in the channel followed by a ':',
e.g.

    <foo> logbot: hello!

the bot will reply:

    <logbot> foo: I am a log bot

Run this script with two arguments, the channel name the bot should
connect to, and file to log to, e.g.:

    $ python ircLogBot.py test test.log

will log channel #test to the file 'test.log'.

To run the script:

    $ python ircLogBot.py <channel> <file>
"""


from __future__ import print_function

# twisted imports
from twisted.words.protocols import irc
from twisted.internet import reactor, protocol
from twisted.python import log

# system imports
import time, sys


class MessageLogger:
    """
    An independent logger class (because separation of application
    and protocol logic is a good thing).
    """
    def __init__(self, file):
        self.file = file

    def log(self, message):
        """Write a message to the file."""
        timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time()))
        self.file.write('%s %s\n' % (timestamp, message))
        self.file.flush()

    def close(self):
        self.file.close()


class LogBot(irc.IRCClient):
    """A logging IRC bot."""
    
    nickname = "twistedbot"
    
    def connectionMade(self):
        irc.IRCClient.connectionMade(self)
        self.logger = MessageLogger(open(self.factory.filename, "a"))
        self.logger.log("[connected at %s]" % 
                        time.asctime(time.localtime(time.time())))

    def connectionLost(self, reason):
        irc.IRCClient.connectionLost(self, reason)
        self.logger.log("[disconnected at %s]" % 
                        time.asctime(time.localtime(time.time())))
        self.logger.close()


    # callbacks for events

    def signedOn(self):
        """Called when bot has successfully signed on to server."""
        self.join(self.factory.channel)

    def joined(self, channel):
        """This will get called when the bot joins the channel."""
        self.logger.log("[I have joined %s]" % channel)

    def privmsg(self, user, channel, msg):
        """This will get called when the bot receives a message."""
        user = user.split('!', 1)[0]
        self.logger.log("<%s> %s" % (user, msg))
        
        # Check to see if they're sending me a private message
        if channel == self.nickname:
            msg = "It isn't nice to whisper!  Play nice with the group."
            self.msg(user, msg)
            return

        # Otherwise check to see if it is a message directed at me
        if msg.startswith(self.nickname + ":"):
            msg = "%s: I am a log bot" % user
            self.msg(channel, msg)
            self.logger.log("<%s> %s" % (self.nickname, msg))

    def action(self, user, channel, msg):
        """This will get called when the bot sees someone do an action."""
        user = user.split('!', 1)[0]
        self.logger.log("* %s %s" % (user, msg))

    # irc callbacks

    def irc_NICK(self, prefix, params):
        """Called when an IRC user changes their nickname."""
        old_nick = prefix.split('!')[0]
        new_nick = params[0]
        self.logger.log("%s is now known as %s" % (old_nick, new_nick))


    # For fun, override the method that determines how a nickname is changed on
    # collisions. The default method appends an underscore.
    def alterCollidedNick(self, nickname):
        """
        Generate an altered version of a nickname that caused a collision in an
        effort to create an unused related name for subsequent registration.
        """
        return nickname + '^'



class LogBotFactory(protocol.ClientFactory):
    """A factory for LogBots.

    A new protocol instance will be created each time we connect to the server.
    """

    def __init__(self, channel, filename):
        self.channel = channel
        self.filename = filename

    def buildProtocol(self, addr):
        p = LogBot()
        p.factory = self
        return p

    def clientConnectionLost(self, connector, reason):
        """If we get disconnected, reconnect to server."""
        connector.connect()

    def clientConnectionFailed(self, connector, reason):
        print("connection failed:", reason)
        reactor.stop()


if __name__ == '__main__':
    # initialize logging
    log.startLogging(sys.stdout)
    
    # create factory protocol and application
    f = LogBotFactory(sys.argv[1], sys.argv[2])

    # connect factory to this host and port
    reactor.connectTCP("irc.freenode.net", 6667, f)

    # run bot
    reactor.run()
View Code

 

 3,Deferred 和 DeferredList

參考文章:

https://twistedmatrix.com/documents/current/core/howto/defer.html

https://twistedmatrix.com/documents/current/core/howto/gendefer.html

Deferred

twisted.internet.defer.Deferred

http://krondo.com/a-second-interlude-deferred/

https://twistedmatrix.com/documents/current/api/twisted.internet.defer.Deferred.html

  Deferred用來管理回調函數,可以實現回調函數的鏈式執行。Deferred需要同時加入一對回調函數,一個calllback,正常執行時調用,一個errback,執行異常時調用。可以按順序,依次加入多個函數對,執行時根據加入順序依次鏈式執行。當只加入一個函數時,Deferred會默認加入一個另一個函數(但這個函數什么事情也不做,相當於pass),簡單示例如下:

from twisted.internet import reactor
from twisted.internet.protocol import Protocol,Factory
from twisted.internet.defer import Deferred

def down_page(url):
    print '去下載url網頁'
def down_failed():
    print '若下載網頁失敗時執行down_failed()'
def save_page():
    print '保存網頁'
def close_task():
    reactor.stop()
d = Deferred()
d.addCallbacks(down_page,down_failed)  #相當於d.addCallback(down_page)和d.addErrback(down_failed)
d.addCallback(save_page)  # Deferred默認加入d.addErrback(save_failed),但save_failed什么也不做
d.addBoth(close_task)   #相當於d.addCallbacks(close_task,close_task)
reactor.callWhenRunning(d.callback,'www.baidu.com')
reactor.run()

  Deferred的執行流如下左圖所示:其有兩條鏈,一條callbacks鏈,一條errbacks鏈,正常情況下,其會按照callback鏈依次執行加入的每一個callback函數,但若發生異常時,則會跳過該callback,而執行errback。下面的每一個箭頭都代表一條可能的執行路徑。如右圖中的一條路徑,依次執行第一個,第二個callback,但由於第二個callback拋出異常,執行第三對回調函數的errback,該errback函數捕獲了異常,繼續執行第四對回調函數的callback。(若該errback未能處理異常,而繼續傳遞異常,將執行第四對回調函數的errback)

  Deferred在執行回調函數時,其內部也會再返回一個Deferred對象,此時外部Deferred會暫停執行,將執行權交給reactor,當reactor執行內部Deferred的回調函數時,內部Deferred最后會調用外部Deferred的回調函數而切換到外部Deferred繼續執行。其執行流示意圖如下:

Reactor和Deferred間的執行權限切換如下:

 

 

 

 DeferredList

twisted.internet.defer.DeferredList

參考文章

http://krondo.com/deferreds-en-masse/

https://twistedmatrix.com/documents/current/api/twisted.internet.defer.DeferredList.html

  deferredList用來管理多個deferred對象,監聽其狀態,得到所有deferred對象的回調函數最后的返回值或異常。deferredList也可以像defer一樣添加回調函數,當其監聽的所有deferred對象執行完成后,調用deferredList的回調函數。

 

 4,inclineCallbacks

twisted.internet.defer.inlineCallbacks

參考文章:

http://krondo.com/just-another-way-to-spell-callback/

  inclineCallbacks可以理解為innerdeferred的回調函數

  理解inclineCallbacks,先來看看generator,下面的generator代碼的執行結果如下,執行流程中可以看到,每次yield時,就從generator函數內部跳轉到函數外部執行,函數外部執行后將結果send到generator內部,或者將異常throw到generator內部,generator內部函數繼續執行,此處我們可以想象為generator函數執行到yield時,內部函數掛起,調用外部函數,外部函數執行完成后將結果返回給generator處理,然后內部函數繼續執行。

#coding:utf-8

class Malfunction(Exception):
    pass

def my_generator():
    print 'starting up'

    val = yield 1
    print 'got:', val

    val = yield 2
    print 'got:', val

    try:
        yield 3
    except Malfunction:
        print 'malfunction!'

    yield 4

    print 'done'

gen = my_generator()  #gen通過send,throw向my_generator函數內部發送值,賦值給val;my_generator函數內部通過yield返回值給gen

print  gen.next(), 'from yield'  # start the generator
print  gen.send(10), 'from yield'   # send the value 10
print gen.send(20), 'from yield'   # send the value 20
print gen.throw(Malfunction()), 'from yield'  # raise an exception inside the generator

try:
    gen.next()
except StopIteration:
    pass
generator

  因此我們可以寫出如下的代碼,實現內部函數調用。

#coding:utf-8
def func1():
    print '執行回調函數1'
    return 'result1'

def func2():
    print '執行回調函數2'
    return 'result2'

def my_generator():
    print '內部函數開始執行'

    val1 = yield 1

    print '回調函數1執行完成,返回結果:',val1

    val2 = yield 2

    print '回調函數2執行完成,返回結果:',val2

    yield 3

    print '內部函數結束執行'
gen = my_generator()

gen.next()
t1 = func1()
gen.send(t1)

t2 = func2()
gen.send(t2)

try:
    gen.next()
except StopIteration as e:
    pass
View Code

  裝飾器@inclineCallbacks必須和yield搭配使用,其作用相當於gen.next, send , throw, 當generator函數內部yield時,其負責拿到外部函數結果並返回給generator。若yield一個單獨的值時,inclineCallbacks立即返回該值,繼續執行generator函數,若yield 一個deferred對象時,內部函數掛起,等deferred的回調函數執行完畢后,將回調函數的結果或異常返回,generator才繼續執行(若時異常時不捕獲,yield會拋出該異常)。具體流程如下面代碼:

from twisted.internet.defer import inlineCallbacks, Deferred

@inlineCallbacks
def my_callbacks():
    from twisted.internet import reactor

    print 'first callback'
    result = yield 1 # yielded values that aren't deferred come right back

    print 'second callback got', result
    d = Deferred()
    reactor.callLater(5, d.callback, 2)   #d的回調函數在5秒鍾后才執行,yield d 會使generator等待d執行完畢
    result = yield d # yielded deferreds will pause the generator

    print 'third callback got', result # the result of the deferred

    d = Deferred()
    reactor.callLater(5, d.errback, Exception(3))

    try:
        yield d
    except Exception, e:
        result = e

    print 'fourth callback got', repr(result) # the exception from the deferred

    reactor.stop()

from twisted.internet import reactor
reactor.callWhenRunning(my_callbacks)
reactor.run()
View Code

  另外,對於裝飾器@inclineCallbacks裝飾的generator的返回值也是一個deferred對象:

from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.internet import reactor

@inlineCallbacks
def my_generator():
    yield 1

    d = Deferred()
    reactor.callLater(5, d.callback, 2)
    yield d

    yield 2
d = my_generator()
print d, type(d)

reactor.run()
View Code

5,框架使用

https://twistedmatrix.com/documents/current/core/howto/application.html

 

參考:

學習教程:http://krondo.com/an-introduction-to-asynchronous-programming-and-twisted/

官方文檔:https://twistedmatrix.com/trac/wiki/Documentation

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM