2010-01-07 javaeye http://angeloce.iteye.com/admin/blogs/565333
============================
>>>from twisted.internet import reactor >>>reactor <twisted.internet.selectreactor.SelectReactor object at 0x01C5BFD0>
reactor本來是一個模塊,怎么變成對象了?
查看 reactor.py, 看到就一個模塊方法selectreactor.install()
查看install方法:
def install(): """Configure the twisted mainloop to be run using the select() reactor. """ reactor = SelectReactor() from twisted.internet.main import installReactor installReactor(reactor)
這里生成了一個SelectReactor的對象,似乎就是我們要找的reactor.
再查看main.py
def installReactor(reactor): # this stuff should be common to all reactors. import twisted.internet import sys assert not sys.modules.has_key('twisted.internet.reactor'), \ "reactor already installed" twisted.internet.reactor = reactor sys.modules['twisted.internet.reactor'] = reactor
哦, reactor已經被偷梁換柱了.
回到selectreactor.py, 看看 SelectReactor 類是個什么東西.
SelectReactor 繼承父類posixbase.PosixReactorBase, 本身增加了一些方法, 似乎看不出什么.那我們就去posixbase.py看看他爸爸是干什么的.
PosixReactorBase繼承兩個父類_SignalReactorMixin 和 ReactorBase. 先不管其他, 尋根溯源,這兩個父類都來自於internet.base模塊.
好吧, 找到這里算是到頭了. ReactorBase 作為 "Reactor" 的基類, 提供了reactor大部分及其重要的方法, 另一些重要的方法由_SignalReactorMixin來擴展. 下面做下詳細的分析.
# 一般來說, 建立一個服務器基本遵循以下幾個步驟 # 以建立一個最基本的TCP服務器為例 #1 reactor.listenTCP(PORT, Factory()) #2 reactor.run()
對於#1比較好理解, 在posixbase.PosixReactorBase中
def listenTCP(self, port, factory, backlog=50, interface=''): """@see: twisted.internet.interfaces.IReactorTCP.listenTCP """ p = tcp.Port(port, factory, backlog, interface, self) p.startListening() return p # 其中包括socket的建立,綁定等等一系列手續. # 詳細內容以后再表.
對於#2, 在base.SignalReactorMixin中
def run(self, installSignalHandlers=True): self.startRunning(installSignalHandlers=installSignalHandlers) self.mainLoop() def mainLoop(self): while self._started: try: while self._started: # Advance simulation time in delayed event # processors. self.runUntilCurrent() t2 = self.timeout() t = self.running and t2 self.doIteration(t) except: log.msg("Unexpected error in main loop.") log.err() else: log.msg('Main loop terminated.')
reactor一直孜孜不倦地執行兩個方法:self.runUntilCurrent和 self.doIteration. 看看這兩個函數都是干什么的:
# 在ReactorBase中, runUntilCurrent方法主要做了兩件事, # 把self.threadCallQueue和self.pendingTimedCalls 里的對象執行一遍 def runUntilCurrent(self): if self.threadCallQueue: # Keep track of how many calls we actually make, as we're # making them, in case another call is added to the queue # while we're in this loop. count = 0 total = len(self.threadCallQueue) for (f, a, kw) in self.threadCallQueue: try: f(*a, **kw) except: log.err() count += 1 if count == total: break del self.threadCallQueue[:count] if self.threadCallQueue: if self.waker: self.waker.wakeUp() # insert new delayed calls now self._insertNewDelayedCalls() now = self.seconds() while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now): call = heappop(self._pendingTimedCalls) if call.cancelled: self._cancellations-=1 continue if call.delayed_time > 0: call.activate_delay() heappush(self._pendingTimedCalls, call) continue try: call.called = 1 call.func(*call.args, **call.kw) except: log.deferr() if hasattr(call, "creator"): e = "\n" e += " C: previous exception occurred in " + \ "a DelayedCall created here:\n" e += " C:" e += "".join(call.creator).rstrip().replace("\n","\n C:") e += "\n" log.msg(e) if (self._cancellations > 50 and self._cancellations > len(self._pendingTimedCalls) >> 1): self._cancellations = 0 self._pendingTimedCalls = [x for x in self._pendingTimedCalls if not x.cancelled] heapify(self._pendingTimedCalls) if self._justStopped: self._justStopped = False self.fireSystemEvent("shutdown")
# 回到SelectReactor中,查看 doSelect(doIteration)方法 # _select既是select.select函數 # self._reads和self._writes內存儲的應該都是類文件操作符,比如socket.. # 再看下self._doReadOrWrite方法,會發現所有的reader/writer都執行自身 # 的 doRead/doWrite方法. def doSelect(self, timeout): """ Run one iteration of the I/O monitor loop. This will run all selectables who had input or output readiness waiting for them. """ while 1: try: r, w, ignored = _select(self._reads.keys(), self._writes.keys(), [], timeout) break except ValueError, ve: # Possibly a file descriptor has gone negative? log.err() self._preenDescriptors() except TypeError, te: # Something *totally* invalid (object w/o fileno, non-integral # result) was passed log.err() self._preenDescriptors() except (select.error, IOError), se: # select(2) encountered an error if se.args[0] in (0, 2): # windows does this if it got an empty list if (not self._reads) and (not self._writes): return else: raise elif se.args[0] == EINTR: return elif se.args[0] == EBADF: self._preenDescriptors() else: # OK, I really don't know what's going on. Blow up. raise _drdw = self._doReadOrWrite _logrun = log.callWithLogger for selectables, method, fdset in ((r, "doRead", self._reads), (w,"doWrite", self._writes)): for selectable in selectables: # if this was disconnected in another thread, kill it. # ^^^^ --- what the !@#*? serious! -exarkun if selectable not in fdset: continue # This for pausing input when we're not ready for more. _logrun(selectable, _drdw, selectable, method, dict)
好吧,從上面基本可以看出, reactor在run循環里做了兩件事, 執行線程隊列和延遲對象隊列,操作類文件對象符.
對於線程隊列和延遲對象隊列, 還比較好理解.
對於類文件對象的隊列, reactor 是什么時候把它們加進的呢?
# 執行callLater后reactor把DelayedCall對象存放在_newTimedCalls隊列中
# 在執行ReactorBase.runUntilCurrent時,
# reactor執行了_insertNewDelayedCalls 方法
# 把_newTimedCalls內的數據存入_pendingTimedCalls隊列中
def callLater(self, _seconds, _f, *args, **kw):
tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
self._cancelCallLater,
self._moveCallLaterSooner,
seconds=self.seconds)
self._newTimedCalls.append(tple)
return tple
# 同樣對於thread
# callFromThread方法也是把thread存入到threadCallQueue中
# 直到在runUntilCurrent中執行
def callFromThread(self, f, *args, **kw):
self.threadCallQueue.append((f, args, kw))
觀看上面的代碼, reactor似乎沒有主動加入過 reader/writer, reactor如何操作socket的呢?
重新想象reactor在run之前還做過什么?
對了, 連接/建立連接!
就如reactor.listenTCP
def listenTCP(self, port, factory, backlog=50, interface=''): p = tcp.Port(port, factory, backlog, interface, self) p.startListening() return p
看看tcp.Port的設計
tcp.Port繼承於base.BasePort 和 tcp._SocketCloser,
而base.BasePort 繼承於abstract.FileDescriptor, 一個抽象的文件操作符類
tcp.Port實例化時沒有做太多動作, 我們聚焦在方法 startListening 上
# tcp.Port.startListening 生成並綁定了一個socket # 也沒有做什么過多的動作, 直接看看最下面的startReading def startListening(self): try: skt = self.createInternetSocket() skt.bind((self.interface, self.port)) except socket.error, le: raise CannotListenError, (self.interface, self.port, le) # Make sure that if we listened on port 0, we update that to # reflect what the OS actually assigned us. self._realPortNumber = skt.getsockname()[1] log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber)) # The order of the next 6 lines is kind of bizarre. If no one # can explain it, perhaps we should re-arrange them. self.factory.doStart() skt.listen(self.backlog) self.connected = True self.socket = skt self.fileno = self.socket.fileno self.numberAccepts = 100 self.startReading()
# 一直找到abstract.FileDescriptor.startReading # 執行了reactor.addReader def startReading(self): """Start waiting for read availability. """ self.reactor.addReader(self) # selectreactor.SelectReactor.addReader指明了 # 一個tcp.Port對象被作為reader加入到了reactor的reads隊列中 def addReader(self, reader): """ Add a FileDescriptor for notification of data available to read. """ self._reads[reader] = 1
原來在這里, 在reactor.listenTCP時候就被加入到了reader隊列中.
趕緊回頭看看, 在 selectreactor.SelectReactor.doSelect中,如果一個類文件操作符狀態改變了,會執行其doRead/doWriter方法.那去看看作為reader的tcp.Port的doRead方法.
# tcp.Port的socket接受了一個連接, # 並執行了self.factory.buildProtocol方法生成一個portocol # 通過self.transport生成了一個tcp.Server對象 def doRead(self): try: if platformType == "posix": numAccepts = self.numberAccepts else: # win32 event loop breaks if we do more than one accept() # in an iteration of the event loop. numAccepts = 1 for i in range(numAccepts): # we need this so we can deal with a factory's buildProtocol # calling our loseConnection if self.disconnecting: return try: skt, addr = self.socket.accept() except socket.error, e: if e.args[0] in (EWOULDBLOCK, EAGAIN): self.numberAccepts = i break elif e.args[0] == EPERM: # Netfilter on Linux may have rejected the # connection, but we get told to try to accept() # anyway. continue elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED): log.msg("Could not accept new connection (%s)" % ( errorcode[e.args[0]],)) break raise protocol = self.factory.buildProtocol(self._buildAddr(addr)) if protocol is None: skt.close() continue s = self.sessionno self.sessionno = s+1 transport = self.transport(skt, protocol, addr, self, s, self.reactor) transport = self._preMakeConnection(transport) protocol.makeConnection(transport) else: self.numberAccepts = self.numberAccepts+20 except: log.deferr()
雖然還有點迷糊, 不過知道了protocol對象產生於此處.那這個產生的transport實例具體作用是什么呢?
先看下 protocol.makeConnection
# protocol.BaseProtocol def makeConnection(self, transport): self.connected = 1 self.transport = transport self.connectionMade()
看到了一個熟悉的方法connectionMade!
protocol的三個事件方法 connectionMade, dataReceived, connectionLost是protocol最重要的三個方法了.
其一出現了, 剩下的兩個是在何處被觸發的呢?
先不急, 先看看transport 是怎么回事:
tcp.Server 來自於 父類 tcp.Connection. 而Connection繼承於abstract.FileDescriptor,又是一個類文件符.
tcp.Server實例時還是做了點小動作的
# tcp.Server def __init__(self, sock, protocol, client, server, sessionno, reactor): Connection.__init__(self, sock, protocol, reactor) self.server = server self.client = client self.sessionno = sessionno self.hostname = client[0] self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__, sessionno, self.hostname) self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__, self.sessionno, self.server._realPortNumber) self.startReading() self.connected = 1
self.startReading從 abstract.FileDescriptor上知曉是把 該實例作為reader加入到reactor隊列中的.
那我們就看看tcp.Server的doRead方法
# tcp.Connection def doRead(self): """Calls self.protocol.dataReceived with all available data. This reads up to self.bufferSize bytes of data from its socket, then calls self.dataReceived(data) to process it. If the connection is not lost through an error in the physical recv(), this function will return the result of the dataReceived call. """ try: data = self.socket.recv(self.bufferSize) except socket.error, se: if se.args[0] == EWOULDBLOCK: return else: return main.CONNECTION_LOST if not data: return main.CONNECTION_DONE return self.protocol.dataReceived(data)
眼前一亮, dataReceived方法!