事件驅動的簡明講解(python實現)


關鍵詞:編程范式,事件驅動,回調函數,觀察者模式
作者:碼匠信龍

舉個簡單的例子:
有些人喜歡的某個公眾號,然后去關注這個公眾號,哪天這個公眾號發布了篇新的文章,沒多久訂閱者就會在微信里收到這個公眾號推送的新消息,如果感興趣就打開來閱讀。

公眾號例子

事件驅動模型可以理解為上面的例子,是設計模式中觀察者模式的一種典型應用。除了訂閱公眾號外,如你關注某人的微博,關注某人的簡書,當被關注者發了個新狀態或者新文章,你會收到他們新的消息,這些都可以理解為事件驅動模型。

實際上,世間萬物各種屬性的變化,我們都可以抽象為事件,最直觀的是圖形界面應用里,如常見的點擊、雙擊、拖動操作,又或者是游戲里的英雄升級了,怪物死亡了等等,都可以視為一個事件發生了。而發送事件的事物稱為事件源,對這個事件感興趣的事物為監聽者,事件發生后監聽者會收到這個消息,然后做相應的反應。

例如上面公眾號例子可以翻譯為,監聽器(訂閱者)監聽了(關注了)事件源(公眾號),當事件源的發送事件時(公眾號發布文章),所有監聽該事件的監聽器(訂閱者)都會接收到消息並作出響應(閱讀文章)。

1.公眾號為事件源
2.訂閱者為事件監聽器
3.訂閱者關注公眾號,相當於監聽器監聽了事件源
4.公眾號發布文章這個動作為發送事件
5.訂閱者收到事件后,做出閱讀文章的響應動作

公眾號例子按事件驅動可以理解成下圖

公眾號例子翻譯

所以事件驅動模式可以進一步抽象理解為由事件源,事件對象,以及事件監聽器三元素構成,能完成監聽器監聽事件源、事件源發送事件,監聽器收到事件后調用響應函數的動作。

事件驅動主要包含以下元素和操作函數:
元素
1.事件源
2.事件監聽器
3.事件對象

操作函數
4.監聽動作
5.發送事件
6.調用監聽器響應函數

了解清楚了事件驅動的工作原理后,讀者可以試着用自己熟悉的編程語言實現,編程主要實現下面的內容,筆者后續給python實現

用戶根據實際業務邏輯定義
事件源 EventSources
監聽器 Listeners

事件管理者 EventManager
成員
1.響應函數隊列 Handlers
2.事件對象 Event
3.事件對象列表 EventQueue
操作函數
4.監聽動作 AddEventListener
5.發送事件 SendEvent
6.調用響應函數 EventProcess

在實際的軟件開發過程中,你會經常看到事件驅動的影子,幾乎所有的GUI界面都采用事件驅動編程模型,很多服務器網絡模型的消息處理也會采用,甚至復雜點的數據庫業務處理也會用這種模型,因為這種模型解耦事件發送者和接收者之間的聯系,事件可動態增加減少接收者,業務邏輯越復雜,越能體現它的優勢。下面,筆者用python實現EventManager事件管理類,大概就百來行代碼左右。

# encoding: UTF-8
# 系統模塊
from Queue import Queue, Empty
from threading import *
########################################################################
class EventManager:
    #----------------------------------------------------------------------
    def __init__(self):
        """初始化事件管理器"""
        # 事件對象列表
        self.__eventQueue = Queue()
        # 事件管理器開關
        self.__active = False
        # 事件處理線程
        self.__thread = Thread(target = self.__Run)

        # 這里的__handlers是一個字典,用來保存對應的事件的響應函數
        # 其中每個鍵對應的值是一個列表,列表中保存了對該事件監聽的響應函數,一對多
        self.__handlers = {}

    #----------------------------------------------------------------------
    def __Run(self):
        """引擎運行"""
        while self.__active == True:
            try:
                # 獲取事件的阻塞時間設為1秒
                event = self.__eventQueue.get(block = True, timeout = 1)  
                self.__EventProcess(event)
            except Empty:
                pass

    #----------------------------------------------------------------------
    def __EventProcess(self, event):
        """處理事件"""
        # 檢查是否存在對該事件進行監聽的處理函數
        if event.type_ in self.__handlers:
            # 若存在,則按順序將事件傳遞給處理函數執行
            for handler in self.__handlers[event.type_]:
                handler(event)

    #----------------------------------------------------------------------
    def Start(self):
        """啟動"""
        # 將事件管理器設為啟動
        self.__active = True
        # 啟動事件處理線程
        self.__thread.start()

    #----------------------------------------------------------------------
    def Stop(self):
        """停止"""
        # 將事件管理器設為停止
        self.__active = False
        # 等待事件處理線程退出
        self.__thread.join()

    #----------------------------------------------------------------------
    def AddEventListener(self, type_, handler):
        """綁定事件和監聽器處理函數"""
        # 嘗試獲取該事件類型對應的處理函數列表,若無則創建
        try:
            handlerList = self.__handlers[type_]
        except KeyError:
            handlerList = []
            
        self.__handlers[type_] = handlerList
        # 若要注冊的處理器不在該事件的處理器列表中,則注冊該事件
        if handler not in handlerList:
            handlerList.append(handler)
            
    #----------------------------------------------------------------------
    def RemoveEventListener(self, type_, handler):
        """移除監聽器的處理函數"""
        #讀者自己試着實現
        
    #----------------------------------------------------------------------
    def SendEvent(self, event):
        """發送事件,向事件隊列中存入事件"""
        self.__eventQueue.put(event)

########################################################################
"""事件對象"""
class Event:
    def __init__(self, type_=None):
        self.type_ = type_      # 事件類型
        self.dict = {}          # 字典用於保存具體的事件數據

測試代碼

#-------------------------------------------------------------------
# encoding: UTF-8
import sys
from datetime import datetime
from threading import *
from EventManager import *

#事件名稱  新文章
EVENT_ARTICAL = "Event_Artical"

#事件源 公眾號
class PublicAccounts:
    def __init__(self,eventManager):
        self.__eventManager = eventManager

    def WriteNewArtical(self):
        #事件對象,寫了新文章
        event = Event(type_=EVENT_ARTICAL)
        event.dict["artical"] = u'如何寫出更優雅的代碼\n'
        #發送事件
        self.__eventManager.SendEvent(event)
        print u'公眾號發送新文章\n'

#監聽器 訂閱者
class Listener:
    def __init__(self,username):
        self.__username = username

    #監聽器的處理函數 讀文章
    def ReadArtical(self,event):
        print(u'%s 收到新文章' % self.__username)
        print(u'正在閱讀新文章內容:%s'  % event.dict["artical"])

"""測試函數"""
#--------------------------------------------------------------------
def test():
    listner1 = Listener("thinkroom") #訂閱者1
    listner2 = Listener("steve")#訂閱者2

    eventManager = EventManager()
    
    #綁定事件和監聽器響應函數(新文章)
    eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)
    eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)
    eventManager.Start()

    publicAcc = PublicAccounts(eventManager)
    timer = Timer(2, publicAcc.WriteNewArtical)
    timer.start()
    
if __name__ == '__main__':
    test()

版權聲明:本文為博主原創,歡迎轉載分享,只需注明作者與出處http://thinkingroom.me


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM