Python自動化開發從淺入深-進階(Twisted、Reactor)


    Twisted 是用Python實現的基於事件驅動的網絡引擎框架。

  Twisted 誕生於2000年初,在當時的網絡游戲開發者看來,無論他們使用哪種語言,手中都鮮有可兼顧擴展性及跨平台的網絡庫。Twisted的作者試圖在當時現有的環境下開發游戲,這一步走的非常艱難,他們迫切地需要一個可擴展性高、基於事件驅動、跨平台的網絡開發框架,為此他們決定自己實現一個,並從那些之前的游戲和網絡應用程序的開發者中學習,汲取他們的經驗教訓。

    Twisted 支持許多常見的傳輸及應用層協議,包括TCP、UDP、SSL/TLS、HTTP、IMAP、SSH、IRC以及FTP。就像Python一樣,Twisted也具有“內置電池”(batteries-included)的特點。Twisted對於其支持的所有協議都帶有客戶端和服務器實現,同時附帶有基於命令行的工具,使得配置和部署產品級的Twisted應用變得非常方便。

一. Twisted基礎

    --異步編程模型

       事件驅動編程是一種編程范式,程序的執行流由外部事件來決定。

    特點:包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。(另外兩種常見的編程范式是(單線程)同步以及多線程編程)

        在單線程下,異步模型任務是交錯完成的。

        異步編程模型與多線程模型之間的區別:

  1. 多線程:對線程的操縱權不在編程者手中,而在操作系統那里,因此,程序員在編寫程序過程中必須要假設在任何時候一個線程都有可能被停止而啟動另外一個線程。
  2. 異步模型:所有事件是以異步的方式到達,然后CPU同樣以異步的方式從Cache隊列中取出事件進行處理,一個任務要想運行必須顯式放棄當前運行的任務的控制權。這也是相比多線程模型來說,最簡潔的地方 

  --異步編程優點

  1. 在單線程同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之后它們才能依次執行。這種明確的執行順序和串行化處理的行為是很容易推斷得出的。如果任務之間並沒有互相依賴的關系,但仍然需要互相等待的話這就使得程序不必要的降低了運行速度。 
  2. 在多線程版本中,任務分別在獨立的線程中執行。這些線程由操作系統來管理,在多處理器系統上可以並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其他線程得以繼續執行。與完成類似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,因為這類程序不得不通過線程同步機制如鎖、可重入函數、線程局部存儲或者其他機制來處理線程安全問題,如果實現不當就會導致出現微妙的BUG。
    與同步模型相比,異步模型的優勢在如下情況下會得到發揮
 1. 有大量的任務,以至於可以認為在一個時刻至少有一個任務要運行。
  2. 任務執行大量的I/O操作,這樣同步模型就會在因為任務阻塞而浪費大量的時間。
  3. 任務之間相互獨立,以至於任務內部的交互很少。  

二. 異步編程模式與Reactor

  1. 異步模式客戶端一次性與全部服務器完成連接,而不像同步模式那樣一次只連接一個,連接完成后等待新事件的到來。
  2. 用來進行通信的Socket方法是非阻塞模式的,這是通過調用setblocking(0)來實現的。 
  3. select模塊中的select方法是用來識別其監視的socket是否有完成數據接收的,如果沒有它就處於阻塞狀態。
  4. 當從服務器中讀取數據時,會盡量多地從Socket讀取數據直到它阻塞為止,然后讀下一個Socket接收的數據(如果有數據接收的話)。這意味着我們需要跟蹤記錄從不同服務器傳送過來數據的接收情況。 

  以上過程可以被設計成為一個模式: reactor模式

              

 

      這個循環就是個"reactor"(反應堆),因為它等待事件的發生然后對其作出相應的反應。正因為如此,它也被稱作事件循環。由於交互式系統都要進行I/O操作,因此這種循環也有時被稱作select loop,這是由於select調用被用來等待I/O操作。因此,在程序的select循環中,一個事件的發生意味着一個socket端處有數據來到,如下:  

1. 監視一系列sockets(文件描述符)
2. 並阻塞程序
3. 直到至少有一個准備好的I/O操作
 
        

       一個真正reactor模式的實現是需要實現循環獨立抽象出來並具有如下的功能

1. 監視一系列與I/O操作相關的文件描述符(description)
2. 不停地匯報那些准備好的I/O操作的文件描述符
3. 處理所有不同系統會出現的I/O事件
4. 提供優雅的抽象來幫助在使用reactor時少花些心思去考慮它的存在
5. 提供可以在抽象層外使用的公共協議實現 

      Twisted實現了設計模式中的反應堆(reactor)模式,這種模式在單線程環境中調度多個事件源產生的事件到它們各自的事件處理例程中去。
      Twisted的核心就是reactor事件循環。Reactor可以感知網絡、文件系統以及定時器事件。它等待然后處理這些事件,從特定於平台的行為中抽象出來,並提供統一的接口,使得在網絡協議棧的任何位置對事件做出響應都變得簡單。 
     基本上reactor完成的任務就是

while True:
    timeout = time_until_next_timed_event()
    events = wait_for_events(timeout)
    events += timed_events_until(now())
    foreventin events:
        event.process()

 twisted_EchoServer.py

#!/usr/bin/env python
#__*__coding:utf-8__*__

from twisted.internet import protocol
from twisted.internet import reactor


# Transports
#
# Transports : 代表網絡中兩個通信結點之間的連接。
# Transports 負責描述連接的細節,比如連接是面向流式的還是面向數據報的,流控以及可靠性。TCP、UDP和Unix套接字可作為transports的例子。
# 它們被設計為“滿足最小功能單元,同時具有最大程度的可復用性”,而且從協議實現中分離出來,這讓許多協議可以采用相同類型的傳輸。
# Transports實現了ITransports接口,它包含如下的方法:
#
# write                   以非阻塞的方式按順序依次將數據寫到物理連接上
# writeSequence           將一個字符串列表寫到物理連接上
# loseConnection          將所有掛起的數據寫入,然后關閉連接
# getPeer                 取得連接中對端的地址信息
# getHost                 取得連接中本端的地址信息

# 將transports從協議中分離出來也使得對這兩個層次的測試變得更加簡單。可以通過簡單地寫入一個字符串來模擬傳輸,用這種方式來檢查。

# Protocols
#
# Protocols描述了如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含如下的方法:
# makeConnection               在transport對象和服務器之間建立一條連接
# connectionMade               連接建立起來后調用
# dataReceived                 接收數據時調用
# connectionLost               關閉連接時調用

#定義類(繼承Protocol)
class Echo(protocol.Protocol):
    def dataReceived(self, data):#接收數據時調用
        self.transport.write(data)#以非阻塞的方式按順序依次將數據寫到物理連接上


# reactor是twisted事件循環的核心,它提供了一些服務的基本接口,像網絡通信、線程和事件的分發。
#
# 一個真正reactor模式的實現是需要實現循環獨立抽象出來並具有如下的功能
#    1.監視一系列與I / O操作相關的文件描述符(description)
#    2.不停地匯報那些准備好的I / O操作的文件描述符
#    3.處理所有不同系統會出現的I / O事件
#    4.提供優雅的抽象來幫助在使用reactor時少花些心思去考慮它的存在
#    5.提供可以在抽象層外使用的公共協議實現

#Twisted的核心就是reactor事件循環。Reactor可以感知網絡、文件系統以及定時器事件。它等待然后處理這些事件,
# 從特定於平台的行為中抽象出來,並提供統一的接口,使得在網絡協議棧的任何位置對事件做出響應都變得簡單

#dir(reactor)
#============
#['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__hash__', '__implemented__', '__init__', '__module__', '__name__', '__new__',
#  '__providedBy__', '__provides__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_cancelCallLater',
#  '_cancellations', '_checkProcessArgs', '_childWaker', '_disconnectSelectable', '_doReadOrWrite', '_eventTriggers', '_handleSignals', '_initThreadPool', '_initThreads',
# '_insertNewDelayedCalls', '_installSignalHandlers', '_internalReaders', '_justStopped', '_lock', '_makeHelperReactor', '_moveCallLaterSooner', '_newTimedCalls',
#  '_pendingTimedCalls', '_preenDescriptors', '_reactor', '_reactorThread', '_reads', '_reallyStartRunning', '_registerAsIOThread', '_removeAll', '_started', '_startedBefore',
#  '_stopThreadPool', '_stopped', '_threadpoolStartupID', '_uninstallHandler', '_unmakeHelperReactor', '_wakerFactory', '_writes', 'addEvent', 'addReader', 'addSystemEventTrigger',
#  'addWriter', 'adoptDatagramPort', 'adoptStreamConnection', 'adoptStreamPort', 'callFromThread', 'callInThread', 'callLater', 'callWhenRunning', 'connectSSL', 'connectTCP',
# 'connectUNIX', 'connectUNIXDatagram', 'crash', 'disconnectAll', 'doIteration', 'doSelect', 'fireSystemEvent', 'getDelayedCalls', 'getReaders', 'getThreadPool', 'getWriters',
#  'installResolver', 'installWaker', 'installed', 'iterate', 'listenMulticast', 'listenSSL', 'listenTCP', 'listenUDP', 'listenUNIX', 'listenUNIXDatagram', 'mainLoop', 'removeAll',
# 'removeEvent', 'removeReader', 'removeSystemEventTrigger', 'removeWriter', 'resolve', 'resolver', 'run', 'runUntilCurrent', 'running', 'seconds', 'sigBreak', 'sigInt', 'sigTerm',
#  'spawnProcess', 'startRunning', 'stop', 'suggestThreadPoolSize', 'threadCallQueue', 'threadpool', 'threadpoolShutdownID', 'timeout', 'usingThreads', 'wakeUp', 'waker']


def main():
    # 子類this指明你的協議,Factory僅僅在server上可用。Subclass this to indicate that your protocol.Factory is only usable for server
    factory = protocol.ServerFactory()

    # 將類Echo賦給protocol
    factory.protocol = Echo

    reactor.listenTCP(1234,factory)#開始監聽端口,將factory作為參數傳進去
    reactor.run()                  #開始運行

if __name__ == '__main__':
    main()

twisted_EchoClient.py

#!/usr/bin/env python
#__*__coding:utf-8__*__

from twisted.internet import reactor, protocol

# Protocols
#
# Protocols描述了如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含如下的方法:
# makeConnection               在transport對象和服務器之間建立一條連接
# connectionMade               連接建立起來后調用
# dataReceived                 接收數據時調用
# connectionLost               關閉連接時調用

# a client protocol
#繼承protocol,處理reactor事件循環中事件到來的處理
class EchoClient(protocol.Protocol):
    """Once connected, send a message, then print the result.
    客戶端對事件的處理:包括:
      1、連接建立起來的事件處理
      2、數據到來時的事件處理
      3、沒有連接時的事件處理
    """

    def connectionMade(self):#連接建立起來后調用
        self.transport.write("hello alex!")

    def dataReceived(self, data):#數據到來,接收數據時調用
        "As soon as any data is received, write it back."
        print "Server said:", data
        self.transport.loseConnection()

    def connectionLost(self, reason):#沒有連接時的調用
        print "connection lost"

#客戶端工廠,處理連接
class EchoFactory(protocol.ClientFactory):
    protocol = EchoClient  # 將EchoClient協議實例化為protocol
    #客戶端連接失敗時的處理
    def clientConnectionFailed(self, connector, reason):
        print "Connection failed - goodbye!"
        reactor.stop()
    #丟失客戶端連接時的處理
    def clientConnectionLost(self, connector, reason):
        print "Connection lost - goodbye!"
        reactor.stop()


# this connects the protocol to a server running on port 8000
def main():
    f = EchoFactory() #對客戶端工廠實例化
    reactor.connectTCP("localhost", 1234, f)#為reactor事件循環指定通訊參數
    reactor.run()#reactor事件循環開始運行

# this only runs if the module was *not* imported
if __name__ == '__main__':
    main()

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM