catalogue
1. Twisted理論基礎 2. 異步編程模式與Reactor 3. Twisted網絡編程 4. reactor進程管理編程 5. Twisted並發連接
1. Twisted理論基礎
0x1: 異步編程模型
事件驅動編程是一種編程范式,這里程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程范式是(單線程)同步以及多線程編程
在這個模型中,任務是交錯完成,值得注意的是: 這是在單線程的控制下。這要比多線程模型簡單多了,因為編程人員總可以認為只有一個任務在執行,而其它的在停止狀態
在異步編程模型與多線程模型之間還有一個不同
1. 在多線程程序中,對於停止某個線程啟動另外一個線程,其決定權並不在程序員手里而在操作系統那里,因此,程序員在編寫程序過程中必須要假設在任何時候一個線程都有可能被停止而啟動另外一個線程 2. 相反,在異步模型中,所有事件是以異步的方式到達的,然后CPU同樣以異步的方式從Cache隊列中取出事件進行處理,一個任務要想運行必須顯式放棄當前運行的任務的控制權。這也是相比多線程模型來說,最簡潔的地方
0x2: 異步編程優點
1. 在單線程同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之后它們才能依次執行。這種明確的執行順序和串行化處理的行為是很容易推斷得出的。如果任務之間並沒有互相依賴的關系,但仍然需要互相等待的話這就使得程序不必要的降低了運行速度 2. 在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操作系統來管理,在多處理器系統上可以並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其他線程得以繼續執行。與完成類似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,因為這類程序不得不通過線程同步機制如鎖、可重入函數、線程局部存儲或者其他機制來處理線程安全問題,如果實現不當就會導致出現微妙的bug
與同步模型相比,異步模型的優勢在如下情況下會得到發揮
1. 有大量的任務,以至於可以認為在一個時刻至少有一個任務要運行 2. 任務執行大量的I/O操作,這樣同步模型就會在因為任務阻塞而浪費大量的時間 3. 任務之間相互獨立,以至於任務內部的交互很少 //這些條件大多在CS模式中的網絡比較繁忙的服務器端出現(如WEB服務器)
Relevant Link:
https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p01.html
2. 異步編程模式與Reactor
1. 異步模式客戶端一次性與全部服務器完成連接,而不像同步模式那樣一次只連接一個,連接完成后等待新事件的到來 2. 用來進行通信的Socket方法是非阻塞模的,這是通過調用setblocking(0)來實現的 3. select模塊中的select方法是用來識別其監視的socket是否有完成數據接收的,如果沒有它就處於阻塞狀態。 4. 當從服務器中讀取數據時,會盡量多地從Socket讀取數據直到它阻塞為止,然后讀下一個Socket接收的數據(如果有數據接收的話)。這意味着我們需要跟蹤記錄從不同服務器傳送過來數據的接收情況
以上過程可以被設計成為一個模式: reactor模式
這個循環就是個"reactor"(反應堆),因為它等待事件的發生然后對其作相應的反應。正因為如此,它也被稱作事件循環。由於交互式系統都要進行I/O操作,因此這種循環也有時被稱作select loop,這是由於select調用被用來等待I/O操作。因此,在本程序中的select循環中,一個事件的發生意味着一個socket端處有數據來到
值得注意的是,select並不是唯一的等待I/O操作的函數,它僅僅是一個比較古老的函數,現在有一些新API可以完成select的工作而且性能更優,它們已經在不同的系統上實現了。不考慮性能上的因素,它們都完成同樣的工作
1. 監視一系列sockets(文件描述符) 2. 並阻塞程序 3. 直到至少有一個准備好的I/O操作
一個真正reactor模式的實現是需要實現循環獨立抽象出來並具有如下的功能
1. 監視一系列與I/O操作相關的文件描述符(description) 2. 不停地匯報那些准備好的I/O操作的文件描述符 3. 處理所有不同系統會出現的I/O事件 4. 提供優雅的抽象來幫助在使用reactor時少花些心思去考慮它的存在 5. 提供可以在抽象層外使用的公共協議實現
0x1: Twisted中的異步事件模型
Twisted實現了設計模式中的反應堆(reactor)模式,這種模式在單線程環境中調度多個事件源產生的事件到它們各自的事件處理例程中去
Twisted的核心就是reactor事件循環。Reactor可以感知網絡、文件系統以及定時器事件。它等待然后處理這些事件,從特定於平台的行為中抽象出來,並提供統一的接口,使得在網絡協議棧的任何位置對事件做出響應都變得簡單
基本上reactor完成的任務就是
while True: timeout = time_until_next_timed_event() events = wait_for_events(timeout) events += timed_events_until(now()) for event in events: event.process()
Twisted目前在所有平台上的默認reactor都是基於poll API的。此外,Twisted還支持一些特定於平台的高容量多路復用API。這些reactor包括基於FreeBSD中kqueue機制的KQueue reactor,支持epoll接口的系統(目前是Linux 2.6)中的epoll reactor,以及基於Windows下的輸入輸出完成端口的IOCP reactor
在實現輪詢的相關細節中,Twisted需要考慮的包括
1. 網絡和文件系統的限制 2. 緩沖行為 3. 如何檢測連接丟失 4. 出現錯誤時的返回值
Twisted的reactor實現同時也考慮了正確使用底層的非阻塞式API,並正確處理各種邊界情況。由於Python中沒有暴露出IOCP API,因此Twisted需要維護自己的實現
0x2: Deferreds
Deferred對象以抽象化的方式表達了一種思想,即結果還尚不存在。它同樣能夠幫助管理產生這個結果所需要的回調鏈。當從函數中返回時,Deferred對象承諾在某個時刻函數將產生一個結果。返回的Deferred對象中包含所有注冊到事件上的回調引用,因此在函數間只需要傳遞這一個對象即可,跟蹤這個對象比單獨管理所有的回調要簡單的多
Deferred對象包含一對回調鏈
1. 一個是針對操作成功的回調 2. 一個是針對操作失敗的回調
初始狀態下Deferred對象的兩條鏈都為空。在事件處理的過程中,每個階段都為其添加處理成功的回調和處理失敗的回調。當一個異步結果到來時,Deferred對象就被"激活",那么處理成功的回調和處理失敗的回調就可以以合適的方式按照它們添加進來的順序依次得到調用
0x3: Transports
Transports代表網絡中兩個通信結點之間的連接。Transports負責描述連接的細節,比如連接是面向流式的還是面向數據報的,流控以及可靠性。TCP、UDP和Unix套接字可作為transports的例子。它們被設計為”滿足最小功能單元,同時具有最大程度的可復用性“,而且從協議實現中分離出來,這讓許多協議可以采用相同類型的傳輸。Transports實現了ITransports接口,它包含如下的方法
1. write: 以非阻塞的方式按順序依次將數據寫到物理連接上 2. writeSequence: 將一個字符串列表寫到物理連接上 3. loseConnection: 將所有掛起的數據寫入,然后關閉連接 4. getPeer: 取得連接中對端的地址信息 5. getHost: 取得連接中本端的地址信息
將transports從協議中分離出來也使得對這兩個層次的測試變得更加簡單。可以通過簡單地寫入一個字符串來模擬傳輸,用這種方式來檢查
0x4: Protocols
Protocols描述了如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含如下的方法
1. makeConnection: 在transport對象和服務器之間建立一條連接 2. connectionMade: 連接建立起來后調用 3. dataReceived: 接收數據時調用 4. connectionLost: 關閉連接時調用
Relevant Link:
https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p02.html https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p04.html http://blog.csdn.net/hanhuili/article/details/9389433 http://blog.sina.com.cn/s/blog_704b6af70100py9n.html
3. Twisted網絡編程
0x1: Writing Servers
from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver from twisted.internet import reactor class Chat(LineReceiver): def __init__(self, users): self.users = users self.name = None self.state = "GETNAME" def connectionMade(self): self.sendLine("What's your name?") def connectionLost(self, reason): if self.name in self.users: del self.users[self.name] def lineReceived(self, line): if self.state == "GETNAME": self.handle_GETNAME(line) else: self.handle_CHAT(line) def handle_GETNAME(self, name): if name in self.users: self.sendLine("Name taken, please choose another.") return self.sendLine("Welcome, %s!" % (name,)) self.name = name self.users[name] = self self.state = "CHAT" def handle_CHAT(self, message): message = "<%s> %s" % (self.name, message) for name, protocol in self.users.iteritems(): if protocol != self: protocol.sendLine(message) class ChatFactory(Factory): def __init__(self): self.users = {} # maps user names to Chat instances def buildProtocol(self, addr): return Chat(self.users) reactor.listenTCP(8123, ChatFactory()) reactor.run()
0x2: Writing Clients
Twisted is a framework designed to be very flexible, and let you write powerful clients. The cost of this flexibility is a few layers in the way to writing your client
1. single-use clients
In many cases, the protocol only needs to connect to the server once, and the code just wants to get a connected instance of the protocol. In those cases twisted.internet.endpoints provides the appropriate API, and in particular connectProtocol which takes a protocol instance rather than a factory.
from twisted.internet import reactor from twisted.internet.protocol import Protocol from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol class Greeter(Protocol): def sendMessage(self, msg): self.transport.write("MESSAGE %s\n" % msg) def gotProtocol(p): p.sendMessage("Hello") reactor.callLater(1, p.sendMessage, "This is sent in a second") reactor.callLater(2, p.transport.loseConnection) point = TCP4ClientEndpoint(reactor, "localhost", 1234) d = connectProtocol(point, Greeter()) d.addCallback(gotProtocol) reactor.run()
2. ClientFactory
Still, there’s plenty of code out there that uses lower-level APIs, and a few features (such as automatic reconnection) have not been re-implemented with endpoints yet, so in some cases they may be more convenient to use.
To use the lower-level connection APIs, you will need to call one of the reactor.connect* methods directly. For these cases, you need a ClientFactory . The ClientFactory is in charge of creating the Protocol and also receives events relating to the connection state. This allows it to do things like reconnect in the event of a connection error
from twisted.internet import reactor from twisted.internet.protocol import Protocol, ClientFactory from sys import stdout class Echo(Protocol): def dataReceived(self, data): stdout.write(data) class EchoClientFactory(ClientFactory): def startedConnecting(self, connector): print 'Started to connect.' def buildProtocol(self, addr): print 'Connected.' return Echo() def clientConnectionLost(self, connector, reason): print 'Lost connection. Reason:', reason def clientConnectionFailed(self, connector, reason): print 'Connection failed. Reason:', reason reactor.connectTCP(host, port, EchoClientFactory()) reactor.run()
3. A Higher-Level Example: ircLogBot
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ An example IRC log bot - logs a channel's events to a file. If someone says the bot's name in the channel followed by a ':', e.g. <foo> logbot: hello! the bot will reply: <logbot> foo: I am a log bot Run this script with two arguments, the channel name the bot should connect to, and file to log to, e.g.: $ python ircLogBot.py test test.log will log channel #test to the file 'test.log'. To run the script: $ python ircLogBot.py <channel> <file> """ # twisted imports from twisted.words.protocols import irc from twisted.internet import reactor, protocol from twisted.python import log # system imports import time, sys class MessageLogger: """ An independent logger class (because separation of application and protocol logic is a good thing). """ def __init__(self, file): self.file = file def log(self, message): """Write a message to the file.""" timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time())) self.file.write('%s %s\n' % (timestamp, message)) self.file.flush() def close(self): self.file.close() class LogBot(irc.IRCClient): """A logging IRC bot.""" nickname = "twistedbot" def connectionMade(self): irc.IRCClient.connectionMade(self) self.logger = MessageLogger(open(self.factory.filename, "a")) self.logger.log("[connected at %s]" % time.asctime(time.localtime(time.time()))) def connectionLost(self, reason): irc.IRCClient.connectionLost(self, reason) self.logger.log("[disconnected at %s]" % time.asctime(time.localtime(time.time()))) self.logger.close() # callbacks for events def signedOn(self): """Called when bot has succesfully signed on to server.""" self.join(self.factory.channel) def joined(self, channel): """This will get called when the bot joins the channel.""" self.logger.log("[I have joined %s]" % channel) def privmsg(self, user, channel, msg): """This will get called when the bot receives a message.""" user = user.split('!', 1)[0] self.logger.log("<%s> %s" % (user, msg)) # Check to see if they're sending me a private message if channel == self.nickname: msg = "It isn't nice to whisper! Play nice with the group." self.msg(user, msg) return # Otherwise check to see if it is a message directed at me if msg.startswith(self.nickname + ":"): msg = "%s: I am a log bot" % user self.msg(channel, msg) self.logger.log("<%s> %s" % (self.nickname, msg)) def action(self, user, channel, msg): """This will get called when the bot sees someone do an action.""" user = user.split('!', 1)[0] self.logger.log("* %s %s" % (user, msg)) # irc callbacks def irc_NICK(self, prefix, params): """Called when an IRC user changes their nickname.""" old_nick = prefix.split('!')[0] new_nick = params[0] self.logger.log("%s is now known as %s" % (old_nick, new_nick)) # For fun, override the method that determines how a nickname is changed on # collisions. The default method appends an underscore. def alterCollidedNick(self, nickname): """ Generate an altered version of a nickname that caused a collision in an effort to create an unused related name for subsequent registration. """ return nickname + '^' class LogBotFactory(protocol.ClientFactory): """A factory for LogBots. A new protocol instance will be created each time we connect to the server. """ def __init__(self, channel, filename): self.channel = channel self.filename = filename def buildProtocol(self, addr): p = LogBot() p.factory = self return p def clientConnectionLost(self, connector, reason): """If we get disconnected, reconnect to server.""" connector.connect() def clientConnectionFailed(self, connector, reason): print "connection failed:", reason reactor.stop() if __name__ == '__main__': # initialize logging log.startLogging(sys.stdout) # create factory protocol and application f = LogBotFactory(sys.argv[1], sys.argv[2]) # connect factory to this host and port reactor.connectTCP("irc.freenode.net", 6667, f) # run bot reactor.run()
4. Persistent Data in the Factory
When the protocol is created, it gets a reference to the factory as self.factory . It can then access attributes of the factory in its logic.
Factories have a default implementation of buildProtocol. It does the same thing the example above does using the protocol attribute of the factory to create the protocol instance. In the example above, the factory could be rewritten to look like this:
class LogBotFactory(protocol.ClientFactory): protocol = LogBot def __init__(self, channel, filename): self.channel = channel self.filename = filename
Relevant Link:
http://twisted.readthedocs.org/en/latest/core/howto/clients.html
4. reactor進程管理編程
Along with connection to servers across the internet, Twisted also connects to local processes with much the same API.
需要明白的是,reactor是一個編程范式,Twisted是基於這種異步事件編程模型實現的網絡編程框架,同樣的,reactor異步事件編程模型還可以用在進程時間管理上
0x1: Example
#!/usr/bin/env python # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. from twisted.internet import protocol from twisted.internet import reactor import re class MyPP(protocol.ProcessProtocol): def __init__(self, verses): self.verses = verses self.data = "" def connectionMade(self): print "connectionMade!" for i in range(self.verses): self.transport.write("Aleph-null bottles of beer on the wall,\n" + "Aleph-null bottles of beer,\n" + "Take one down and pass it around,\n" + "Aleph-null bottles of beer on the wall.\n") self.transport.closeStdin() # tell them we're done def outReceived(self, data): print "outReceived! with %d bytes!" % len(data) self.data = self.data + data def errReceived(self, data): print "errReceived! with %d bytes!" % len(data) def inConnectionLost(self): print "inConnectionLost! stdin is closed! (we probably did it)" def outConnectionLost(self): print "outConnectionLost! The child closed their stdout!" # now is the time to examine what they wrote #print "I saw them write:", self.data (dummy, lines, words, chars, file) = re.split(r'\s+', self.data) print "I saw %s lines" % lines def errConnectionLost(self): print "errConnectionLost! The child closed their stderr." def processExited(self, reason): print "processExited, status %d" % (reason.value.exitCode,) def processEnded(self, reason): print "processEnded, status %d" % (reason.value.exitCode,) print "quitting" reactor.stop() pp = MyPP(10) reactor.spawnProcess(pp, "wc", ["wc"], {}) reactor.run()
0x2: Example
class GPGProtocol(ProcessProtocol): def __init__(self, crypttext): self.crypttext = crypttext self.plaintext = "" self.status = "" def connectionMade(self): self.transport.writeToChild(3, self.passphrase) self.transport.closeChildFD(3) self.transport.writeToChild(0, self.crypttext) self.transport.closeChildFD(0) def childDataReceived(self, childFD, data): if childFD == 1: self.plaintext += data if childFD == 4: self.status += data def processEnded(self, status): rc = status.value.exitCode if rc == 0: self.deferred.callback(self) else: self.deferred.errback(rc) def decrypt(crypttext): gp = GPGProtocol(crypttext) gp.deferred = Deferred() cmd = ["gpg", "--decrypt", "--passphrase-fd", "3", "--status-fd", "4", "--batch"] p = reactor.spawnProcess(gp, cmd[0], cmd, env=None, childFDs={0:"w", 1:"r", 2:2, 3:"w", 4:"r"}) return gp.deferred
Relevant Link:
http://twistedmatrix.com/documents/12.2.0/core/howto/process.html
5. Twisted並發連接
Some time back I had to write a network server which need to support ~50K concurrent clients in a single box. Server-Client communication used a propitiatory protocol on top of TCP where RawBinaryData Struct is used as the messaging format. Clients exchanged periodic keep-alives which server used to check health state. As most of the operations were IO based(socket/db) we decided to used python/twisted to implement server.
On performing load tests we found that server is able to handle only 1024 client after which connections are failing. Increased per process max open files (1024) to 100000 (ulimit -n 100000) and still the connections failed at 1024.
0x1: select limitation
select fails after 1024 fds as FD_SETSIZE max to 1024. Twisted's default reactor seems to be based on select. As a natural progression poll was tried next to over come max open fd issue.
0x2: poll limitation
poll solves the max fd issue. But as the number of concurrent clients started increasing, performance dropped drastically. Poll implementation does O(n) operations internally and performance drops as number of fds increases.
0x3: epoll
Epoll reactor solved both problems and gave awesome performance. libevent is another library build on top of epoll.
0x4: Async frameworks
do not waste time with 'select/poll' based approaches if the number of concurrent connection expected is above 1K. Following are some of the event-loop based frameworks where this is applicable.
1. Eventlet (python) 2. Gevent (python) is similar to eventlet uses libevent which is build on top of epoll. 3. C++ ACE 4. Java Netty 5. Ruby Eventmachine
0x5: Choosing a Reactor and GUI Toolkit Integration(new twisted)
Twisted provides a variety of implementations of the twisted.internet.reactor. The specialized implementations are suited for different purposes and are designed to integrate better with particular platforms.
The epoll()-based reactor is Twisted's default on Linux. Other platforms use poll(), or the most cross-platform reactor, select().
Platform-specific reactor implementations exist for:
Poll for Linux Epoll for Linux 2.6 WaitForMultipleObjects (WFMO) for Win32 Input/Output Completion Port (IOCP) for Win32 KQueue for FreeBSD and Mac OS X CoreFoundation for Mac OS X
1. Select()-based Reactor
The select reactor is the default on platforms that don't provide a better alternative that covers all use cases. If the select reactor is desired, it may be installed via:
from twisted.internet import selectreactor selectreactor.install() from twisted.internet import reactor
2. Poll-based Reactor
The PollReactor will work on any platform that provides select.poll. With larger numbers of connected sockets, it may provide for better performance than the SelectReactor.
from twisted.internet import pollreactor pollreactor.install() from twisted.internet import reactor
3. KQueue
The KQueue Reactor allows Twisted to use FreeBSD's kqueue mechanism for event scheduling
from twisted.internet import kqreactor kqreactor.install() from twisted.internet import reactor
4. WaitForMultipleObjects (WFMO) for Win32
from twisted.internet import win32eventreactor win32eventreactor.install() from twisted.internet import reactor
5. Input/Output Completion Port (IOCP) for Win32
Windows provides a fast, scalable event notification system known as IO Completion Ports, or IOCP for short. Twisted includes a reactor based on IOCP which is nearly complete.
from twisted.internet import iocpreactor iocpreactor.install() from twisted.internet import reactor
6. Epoll-based Reactor
The EPollReactor will work on any platform that provides epoll
, today only Linux 2.6 and over. The implementation of the epoll reactor currently uses the Level Triggered interface, which is basically like poll() but scales much better.
from twisted.internet import epollreactor epollreactor.install() from twisted.internet import reactor
Relevant Link:
https://moythreads.com/wordpress/2009/12/22/select-system-call-limitation/ http://pipeit.blogspot.com/2011/07/select-poll-and-epoll-twisted-story.html http://twistedmatrix.com/documents/13.2.0/core/howto/choosing-reactor.html#auto2
Copyright (c) 2016 LittleHann All rights reserved