轉載於http://blog.chinaunix.net/xmlrpc.php?r=blog/article&uid=23504396&id=2929446
1年多前就看過相關內容了,當時python還不太會用看不懂別人寫的代碼,最近閑着又翻出來看看順便解讀下pyinotify的代碼
使用源自於
http://blog.daviesliu.net/2008/04/24/sync/
這里的代碼有2個錯誤,一個是base多定義了一次,另外就是有幾行縮進好像有點問題,需要自己控制下縮進
一行一行解讀
- flags = IN_CLOSE_WRITE|IN_CREATE|IN_Q_OVERFLOW
這里flags的值是int行的,這里原來我半天沒看懂。
如果寫成
flags = pyinotify.IN_DELETE | pyinotify.IN_CREATE就好懂多了,這里用幾個監控的類型的int值進行邏輯運算成監控需要監控的改變類型的數值具體數值怎么定義可以看 看pyinotify.py文件中的class EventsCodes:中定義FLAG_COLLECTIONS的數值
dirs = {}
定義一個空的字典
base = '/log/lighttpd/cache/images/icon/u241'
這里定義了需要監控的文件夾,注意上面連接代碼里有個base,自然是原作者忘記注釋了其中一個,我們改成/tmp來測試
class UpdateParentDir(ProcessEvent):這里之前看不懂,特別是下面的process_IN_CLOSE_WRITE(self, event):,都不知道event哪里來的因為以前學c么什么函數重載,類的重載。這里其實就是什么派生重載子類而已
我們先看在pyinotify.py里看ProcessEvent這個類,這個類繼承自_ProcessEvent這個類...,於是先去瞅瞅_ProcessEvent這個類
_ProcessEvent這個類沒有init方法,只有__call__方法,call方法相當於重載了(),具體我們可以測試,我們先call方法里加入print "++++++++++++"
到最后我們再看結果,先跳過
繼續看ProcessEvent類的init方法
- def __init__(self, pevent=None, **kargs):
- self.pevent = pevent
- self.my_init(**kargs)
這個init方法也很簡單,不賦值也沒有問題self.my_init(**kargs)是留給我們自己寫方法擴展的,可以不理會。所以這個init方法也沒什么好看鳥。
我們可以直接看別人重載的方法在源代碼pyinotify.py中的樣子
- def process_IN_Q_OVERFLOW(self, event):
- log.warning('Event queue overflowed.')
-
- def process_default(self, event):
- pass
非常明了,不重載之前,原函數只是把對應變化寫入log中,重載之后我們可以根據變化做自己想要的操作,比如備份改變的文件,或做同步操作之類。
現在重點是那個event,init里有說明type event: Event instance,不過UpdateParentDir還沒開始調用,所以我們先放下Event模塊不看。
先看下面的wm = WatchManager()
- class WatchManager:
- def __init__(self, exclude_filter=lambda path: False):
- self._fd = self._inotify_wrapper.inotify_init()
init里主要看self._fd
這個fd是返回inotify監控的節點滴,這里調用了c封裝的_inotify_wrapper,應該是初始化監控對象
WatchManager在監控代碼中也沒傳參數,看到后面代碼這個類還是通過類的add_watch方法傳入內容的,看add_watch方法
- def add_watch(self, path, mask, proc_fun=None, rec=False,
- auto_add=False, do_glob=False, quiet=True,
- exclude_filter=None):
這個方法主要是把path(也是就代碼中的base指定的目錄)格式化后傳入,然后返回個path中內容的字典,監控工作還是沒開始。
- wd = ret_[rpath] = self.__add_watch(rpath, mask,
- proc_fun,
- auto_add,
- exclude_filter)
add_watch里還調用了__add_watch,__add_watch里面又調用了watch方法,這里主要是從_inotify_wrapper這個c封裝中獲得inotify的對象
現在可以看把WatchManager和ProcessEvent聯系起來的Notifier類了
- class Notifier:
- def __init__(self, watch_manager, default_proc_fun=None, read_freq=0,
- threshold=0, timeout=None):
- """
- @type watch_manager: WatchManager instance
- @param default_proc_fun: Default processing method. If None, a new
- instance of PrintAllEvents will be assigned.
- @type default_proc_fun: instance of ProcessEvent
- """
- # Watch Manager instance
- self._watch_manager = watch_manager
- # File descriptor
- self._fd = self._watch_manager.get_fd()
- # Poll object and registration
- self._pollobj = select.poll()
- self._pollobj.register(self._fd, select.POLLIN)
- # This pipe is correctely initialized and used by ThreadedNotifier
- self._pipe = (-1, -1)
- # Event queue
- self._eventq = deque()
- # System processing functor, common to all events
- self._sys_proc_fun = _SysProcessEvent(self._watch_manager, self)
- # Default processing method
- self._default_proc_fun = default_proc_fun
- if default_proc_fun is None:
- self._default_proc_fun = PrintAllEvents()
- # Loop parameters
- self._read_freq = read_freq
- self._threshold = threshold
- self._timeout = timeout
- # Coalesce events option
- self._coalesce = False
- # set of str(raw_event), only used when coalesce option is True
- self._eventset = set()
Notifier類傳入一個wm類和ProcessEvent類,我們來自己看看init方法代碼
- self._fd = self._watch_manager.get_fd()
這里看上面WatchManager類的self._fd
- self._pollobj = select.poll()
這里就是重點了,poll模型,寫過socket的應該知道,異步非阻塞,這里可以看出處理消息方式了
shell中使用while read,這里使用poll模型,效率差距立判了。
python2.7以上才有epoll模型,高版本pyinotify應該會使用epoll模型,如果python版本高,應該自己可以修改這里的代碼來使用epoll
- self._sys_proc_fun = _SysProcessEvent(self._watch_manager, self)
再調用_SysProcessEvent類,這個類是ProcessEvent的父類,到下面才好理解這個是干嘛的
- self._default_proc_fun = default_proc_fun
這里就是我們傳入的ProcessEvent類,self._default_proc_fun和self._sys_proc_fun分別在什么情況下用要下面代碼才看得出來
init里其他的就不說了,定義隊列超時時間之類
ok,到Notifier類初始化完畢,我們的監控都還么正式開始,只是打開了入口(即self._fd = self._inotify_wrapper.inotify_init())
至於dirs.update(wm.add_watch(base, flags, rec=True, auto_add=True))這里當dirs不存在好了,因為wm.add_watch方法會返回一個監控目錄根目錄內容的字典
所以用了個dirs來裝返回值,其實沒有也無所謂。
正式開始是在notifier.loop()
我們來看Notifier類的loop方法
- def loop(self, callback=None, daemonize=False, **args):
- """
- Events are read only one time every min(read_freq, timeout)
- seconds at best and only if the size to read is >= threshold.
- After this method returns it must not be called again for the same
- instance.
-
- @param callback: Functor called after each event processing iteration.
- Expects to receive the notifier object (self) as first
- parameter. If this function returns True the loop is
- immediately terminated otherwise the loop method keeps
- looping.
- @type callback: callable object or function
- @param daemonize: This thread is daemonized if set to True.
- @type daemonize: boolean
- @param args: Optional and relevant only if daemonize is True. Remaining
- keyworded arguments are directly passed to daemonize see
- __daemonize() method. If pid_file=None or is set to a
- pathname the caller must ensure the file does not exist
- before this method is called otherwise an exception
- pyinotify.NotifierError will be raised. If pid_file=False
- it is still daemonized but the pid is not written in any
- file.
- @type args: various
- """
- if daemonize:
- self.__daemonize(**args)
-
- # Read and process events forever
- while 1:
- try:
- self.process_events()
- if (callback is not None) and (callback(self) is True):
- break
- ref_time = time.time()
- # check_events is blocking
- if self.check_events():
- self._sleep(ref_time)
- self.read_events()
- except KeyboardInterrupt:
- # Stop monitoring if sigint is caught (Control-C).
- log.debug('Pyinotify stops monitoring.')
- break
- # Close internals
- self.stop()
我們一行一行的看loop的代碼
loop傳入的參數daemonize可以看daemonize方法,這個其實就是把進程變守護進程的方法,這個和普通守護進程方法差不多
無非就是fork兩次setsid父進程退出一類
callback也沒什么大用貌似用來自定義的,跳過
下面終於看見while 1了我們的監控開始
loop的循環里首先try process_events方法,於是去看process_events方法
- def process_events(self):
- """
- Routine for processing events from queue by calling their
- associated proccessing method (an instance of ProcessEvent).
- It also does internal processings, to keep the system updated.
- """
- while self._eventq:
- raw_event = self._eventq.popleft() # pop next event
- watch_ = self._watch_manager.get_watch(raw_event.wd)
- if watch_ is None:
- # Not really sure how we ended up here, nor how we should
- # handle these types of events and if it is appropriate to
- # completly skip them (like we are doing here).
- log.warning("Unable to retrieve Watch object associated to %s",
- repr(raw_event))
- continue
- revent = self._sys_proc_fun(raw_event) # system processings
- if watch_ and watch_.proc_fun:
- watch_.proc_fun(revent) # user processings
- else:
- self._default_proc_fun(revent)
- self._sys_proc_fun.cleanup() # remove olds MOVED_* events records
- if self._coalesce:
- self._eventset.clear()
由於第一次執行的時候self._eventq隊列里肯定沒東西是空的我們先跳過process_events看loop方法后面的代碼
- if (callback is not None) and (callback(self) is True):
- break
- ref_time = time.time()
這兩行簡單,跳過
if self.check_events():
這里可以看check_events():方法,可以看見
- def check_events(self, timeout=None):
- """
- Check for new events available to read, blocks up to timeout
- milliseconds.
-
- @param timeout: If specified it overrides the corresponding instance
- attribute _timeout.
- @type timeout: int
-
- @return: New events to read.
- @rtype: bool
- """
- while True:
- try:
- # blocks up to 'timeout' milliseconds
- if timeout is None:
- timeout = self._timeout
- ret = self._pollobj.poll(timeout)
- except select.error, err:
- if err[0] == errno.EINTR:
- continue # interrupted, retry
- else:
- raise
- else:
- break
-
- if not ret or (self._pipe[0] == ret[0][0]):
- return False
- # only one fd is polled
- return ret[0][1] & select.POLLIN
check_events就是處理poll的,poll具體怎么用可以google的poll用法,我只用過select所以不太熟悉poll,但是原理是一樣的
其實loop里的while 1這里就相當於我以前寫select的
- while True:
- GetList,SendList,ErrList = select.select([self.socket,],[],[],0)
- if len(GetList) > 0:
- try:
- curSock,userAddr = self.socket.accept()
- # curSock.settimeout(15)
- self.socket_pool.append(curSock)
- print "get new socket"
- except:
- print "error or time out"
-
- get_sock_pool,send_sock_pool,err_sock_pool = select.select(self.socket_pool,[],[],0)
這樣的代碼了,不停的掃描socket緩沖區,當返回值大於0就接受數據。
loop也是一樣,不過用的是poll模型加deque隊列(deque隊列其實和list差不多,不過比list靈活,可以從兩端彈出、插入數值,list只能從后面插)
check完了就read
- def read_events(self):
- """
- Read events from device, build _RawEvents, and enqueue them.
- """
- buf_ = array.array('i', [0])
- # get event queue size
- if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1:
- return
- queue_size = buf_[0]
- if queue_size < self._threshold:
- log.debug('(fd: %d) %d bytes available to read but threshold is '
- 'fixed to %d bytes', self._fd, queue_size,
- self._threshold)
- return
-
- try:
- # Read content from file
- r = os.read(self._fd, queue_size)
- except Exception, msg:
- raise NotifierError(msg)
- log.debug('Event queue size: %d', queue_size)
- rsum = 0 # counter
- while rsum < queue_size:
- s_size = 16
- # Retrieve wd, mask, cookie and fname_len
- wd, mask, cookie, fname_len = struct.unpack('iIII',
- r[rsum:rsum+s_size])
- # Retrieve name
- fname, = struct.unpack('%ds' % fname_len,
- r[rsum + s_size:rsum + s_size + fname_len])
- rawevent = _RawEvent(wd, mask, cookie, fname)
- if self._coalesce:
- # Only enqueue new (unique) events.
- raweventstr = str(rawevent)
- if raweventstr not in self._eventset:
- self._eventset.add(raweventstr)
- self._eventq.append(rawevent)
- else:
- self._eventq.append(rawevent)
- rsum += s_size + fname_len
這兩個函數都和poll有關,看不懂無所謂,但是大概可以知道這里就是poll使得self._eventq()中有數據(就是把變化的內容傳入隊列)
read_events后process_events函數就能執行了。
看process_events中有數據以后的執行方式
當self._eventq有內容內容以后
- raw_event = self._eventq.popleft()
彈出隊列中的內容,這個raw_event就是Event類
- watch_ = self._watch_manager.get_watch(raw_event.wd)
通過剛才彈出的對象返回inotify對象
- if watch_ is None:
通過上面返回值判斷是否被監控,這個判斷保險用的,當作不存在
- revent = self._sys_proc_fun(raw_event)
創建個叫revent的_SysProcessEvent類,這個類傳入的參數raw_event是個event對象,這個event就是變動的文件的相關信息
- if watch_ and watch_.proc_fun:
- watch_.proc_fun(revent)
- else:
- self._default_proc_fun(revent)
判斷是否把這個類丟給_default_proc_fun。
這里執行了self._default_proc_fun(revent)的話,我們在UpdateParentDir(ProcessEvent):里的方法就會執行
_SysProcessEvent有啥用?其實沒啥用,這個類就是定義了默認的各種mark的處理方式讓傳入的類去繼承而已。
在_SysProcessEvent的process_IN_CREATE方法里加入
print "=============="
我們拿改好的代碼執行下,當創建一個文件時,出現下面打印(請無視掉caonima....謝謝)
- #!/usr/bin/python
-
- from pyinotify import *
- import os, os.path
-
- flags = IN_CLOSE_WRITE|IN_CREATE|IN_Q_OVERFLOW
- dirs = {}
- base = '/log/lighttpd/cache/images/icon/u241'
- base = 'tmp'
-
- class UpdateParentDir(ProcessEvent):
- def process_IN_CLOSE_WRITE(self, event):
- print 'modify', event.pathname
- mtime = os.path.getmtime(event.pathname)
- p = event.path
- while p.startswith(base):
- m = os.path.getmtime(p)
- if m < mtime:
- print 'update', p
- os.utime(p, (mtime,mtime))
- elif m > mtime:
- mtime = m
- p = os.path.dirname(p)
-
- process_IN_MODIFY = process_IN_CLOSE_WRITE
-
- def process_IN_Q_OVERFLOW(self, event):
- print 'over flow'
- max_queued_events.value *= 2
-
- def process_default(self, event):
- pass
-
- wm = WatchManager()
- notifier = Notifier(wm, UpdateParentDir())
- dirs.update(wm.add_watch(base, flags, rec=True, auto_add=True))
-
- notifier.loop()
{'/tmp': 1, '/tmp/.font-unix': 4, '/tmp/.wapi': 5, '/tmp/hsperfdata_root': 2, '/tmp/.ICE-unix': 3}
+++++++++++call+++caonima++++++++++++++
============sys========caonimai============
+++++++++++call+++caonima++++++++++++++
+++++++++++call+++caonima++++++++++++++
+++++++++++call+++caonima++++++++++++++
modify /tmp/14
分析下可以知道,繼承_ProcessEvent類的時候先call了一次
在process_events方法中有revent = self._sys_proc_fun(raw_event),所以創建創的時候打印了"========"
所以后面self._default_proc_fun(revent)重載的之前,_ProcessEvent中的process_IN_CREATE 其實已經執行過了,即使后面重載process_IN_CREATE方法,原來的process_IN_CREATE
方法還是會被調用過
至於程序怎么識別process_IN_xxx之類的方法可以看_ProcessEvent里的__call__方法
- meth = getattr(self, 'process_' + maskname, None)
- if meth is not None:
- return meth(event)
-
- meth = getattr(self, 'process_IN_' + maskname.split('_')[1], None)
getattr函數很簡單,返回名為process_+ maskname的函數
后面多定義了個process_IN +maskname的函數,所以process和process_IN都是可以的函數名
這個pyinotify最重要的就是這幾個函數
- self._inotify_wrapper = INotifyWrapper.create()
- if self._inotify_wrapper is None:
- raise InotifyBindingNotFoundError()
-
- self._fd = self._inotify_wrapper.inotify_init() # file descriptor
-
-
- wd = self._inotify_wrapper.inotify_add_watch(self._fd, path, mask)
我們自己寫個類似pyinotify的函數來試試直接調用INotifyWrapper看看
找了下發現INotifyWrapper也是pyinotify里面定義的類,最終找到
- try:
- libc_name = ctypes.util.find_library('c')
- except (OSError, IOError):
- pass # Will attemp to load it with None anyway.
-
- if sys.version_info >= (2, 6):
- self._libc = ctypes.CDLL(libc_name, use_errno=True)
- self._get_errno_func = ctypes.get_errno
- else:
- self._libc = ctypes.CDLL(libc_name)
- try:
- location = self._libc.__errno_location
- location.restype = ctypes.POINTER(ctypes.c_int)
- self._get_errno_func = lambda: location().contents.value
- except AttributeError:
- pass
-
- # Eventually check that libc has needed inotify bindings.
- if (not hasattr(self._libc, 'inotify_init') or
- not hasattr(self._libc, 'inotify_add_watch') or
- not hasattr(self._libc, 'inotify_rm_watch')):
- return False
- return True
最終發現是通過ctypes.CDLL('libc.so.6')掉出inotify相關的c封裝的