基於Http協議訂閱發布系統設計
--物聯網系統架構設計
1,訂閱發布(subscriber-publisher)
訂閱發布模式最典型的應用場景就是消息系統的設計。在消息系統的架構中,消息的發送者稱作(publisher),消息的接收者稱作(subscriber),參見wikipedia: Publish–subscribe pattern。整個消息系統的架構可以用如下圖1來描述:

圖1
由圖1可知消息系統主要包括3個組件: 發布者,訂閱者和消息代理(Broker),而整個消息系統的核心即是Broker,而目前就業務能力而言Broker的實現難點主要在於它的吞吐量。拿手機消息推送舉例,在當前的移動互聯時代,就我們很常見的大多數app用戶數基本都是百萬級別以上(流行app基本是千萬級別),這意味着Broker至少要能支持百萬台設備的訂閱,使用單台服務器做Broker顯然不能解決問題。而在物聯網時代,訂閱者將不再只有手機,訂閱者可以是任何電子設備,這種場景的級別將是手機數量的百倍。
2,Mqtt協議的發布訂閱系統實現方案
2.1,Mqtt協議
根據官方的定義,mqtt協議即是
machine-to-machine (M2M)的連接協議,該協議就是為發布訂閱模式設計的非常輕量的消息傳輸協議。具體參見:
http://mqtt.org/
從mqtt協議定義可知,該mqtt就是為發布訂閱系統而設計,並且非常輕量。
2.2,實現方案
實現一套完整的發布訂閱系統,主要就是兩個組件(client和broker)一個協議規范(mqtt)。
目前流行的開源mqtt client實現是paho(
http://eclipse.org/paho); 流行的開源mqtt broker實現包括 apache apollo 和
Eclipse Mosquitto(
http://eclipse.org/mosquitto
), mosquitto的優點是非常輕量,使用一台樹莓派(或路由器)這樣的小型設備足夠服務一個家庭的設備連接。
2.3, 架構設計
發布訂閱的服務系統架構非常簡單,基本都遵照圖1的基本架構模式。對於一個家庭的物聯網應用,如果設備僅想要在局域網內訪問,則broker只需要安裝在(基於NanoPi或RasPi開發的)小型的設備中或者直接集成到路由器中。當然對於真正的物聯網應用,我們還是希望設備可以通過互聯網就可以管理和控制,所以很多broker實際應當在互聯網服務器中。
2.4, Mqtt協議的訂閱發布系統交互原理
首先引用一下開源項目paho提供的python版客戶端執行訂閱和發布動作的demo,代碼非常簡短

1 #susbscriber 2 import paho.mqtt.client as mqtt 3 4 # The callback for when the client receives a CONNACK response from the server. 5 def on_connect(client, userdata, rc): 6 client.subscribe("$SYS/#") 7 8 # The callback for when a PUBLISH message is received from the server. 9 def on_message(client, userdata, msg): 10 print(msg.topic+" "+str(msg.payload)) 11 12 client = mqtt.Client() 13 client.on_connect = on_connect 14 client.on_message = on_message 15 client.connect("iot.eclipse.org", 1883, 60) 16 17 # Blocking call that processes network traffic 18 client.loop_forever()
Subscriber: 從訂閱者客戶端代碼可知,訂閱者只需做2個動作(連接broker和建立循環等待的長連接)和提供2個接口函數(訂閱請求函數和處理broker響應結果的函數)。基本要素無非請求連接、訂閱指定topic消息、和處理響應結果,但loop_forever()是一個無限循環,這意味着客戶端和borker之間保持着一個socket長連接,所以從這里可以認識到broker的瓶頸之一便是能處理多少個這樣的長連接。
1 #publisher 2 import paho.mqtt.client as mqtt 3 4 client = mqtt.Client() 5 client.connect("iot.eclipse.org") 6 client.loop_start() 7 res = mqttc.publish("$SYS/#", "HELLO") 8 client. loop_stop(force=False)
Publisher: 從發布者客戶端代碼可知,發布者操作比訂閱者更加簡單,基本要素無非是建立連接、向broker發布指定topic消息,忽略結果響應處理過程。
subscriber和publisher的交互邏輯本質是基於tcp協議的socket實現,對於server端的socket打開mqtt協議端口,並開啟一個異步線程來持續監聽端口,等待client端(subscriber和publisher )的socket發出mqtt請求,client端的subscriber的mqtt請求有些不一樣,那就是subscriber的socket實際和server一直保持長連接,隨時等待server那邊推送過來的消息,直到連接關閉 。所以拋開細節處理問題,完全可以使用netty框架,基於mqtt協議很快的開發出一套server和client端的應用。
3,http協議broker設計實現

圖2 訂閱發布系統Broker設計
http協議和mqtt協議比較:
優點:http在互聯網時代得到最廣泛的應用, 充分檢驗了它的有效性和穩定性,充分的社區支持和成熟的開源資源可用
缺陷:相對mqtt協議太重,對網絡要求更高,直接基於http1.x無法實現發布訂閱(http1.x是單工協議,需要依賴websocket、servlet3.0等技術實現雙工,也可以使用http2.0,但目前支持較少)
本文是使用servlet3.0的技術實現基於http協議的發布/訂閱系統broker, 圖2所示即為物聯網broker系統設計架構。后台broker分成兩大模塊:發布中心(用戶和設備)和訂閱中心(用戶和設備),以及事件總線。這樣的設計或許會有疑惑,為什么不直接抽象成事件的發布和訂閱中心,如此不久和mqtt broker一致了么? 的確,既然是使用http協議實現,那為什么要完全仿照mqtt協議的模式呢,而且我們要設計的實際是一個“物聯網的業務系統“而不是一個“中間件“,所以如果你換了一個業務場景,你又得重新設計系統,而恰巧基於http協議servlet應用正是為業務系統提供了豐富的開源資源。
下面詳細解釋用戶發布中心和訂閱中心的設計,因為在物聯網的應用場景中,主要業務交互邏輯是圍繞用戶和設備之間做publish和subscribe.
用戶發布中心(publisher):
在物聯網場景中用戶充當了核心業務的publisher,對於broker的發布中心,接收到所有的前端用戶請求過來的數據都將被封裝成event在broker的內部系統中由發布中心廣播到訂閱中心。以摩拜單車為例,app是publisher的終端,摩拜單車的核心業務邏輯就是開鎖指令和一系列的交易邏輯。就開鎖動作而言,發布中心收到開鎖event,在publish這個event之前,針對這個event不同業務場景或許有不同的業務需求,典型場景有:該事件是否需要群發、該事件是否需要定時功能,該事件是否需要可靠發布。特別的,對於事件的可靠發布,在交易類系統中屬於必備要求。拿摩拜單車來說,開鎖指令發出后就會開始計時計費和扣錢,這時候就需要依賴broker在應用層面對數據做事務保證,而不能依賴基礎系統服務的穩定。
訂閱中心(subscriber):
對於訂閱中心(無論是用戶或設備)的設計,完全遵照table或key-value的數據結構來設計,也即是對於每一個請求,broker都將為其關聯一個handler以及和其對應id標識。當事件被發布到訂閱中心,訂閱中心的processor便會用事件ID(或唯一標識的設備ID)去查詢對應的handler,並作結果響應。由於是基於http協議,所以在具體實現時需要依賴servlet3.0或websoket技術。
4,領域建模
4.1 發布中心領域建模
發布中的核心功能是發布事件,因此Event是發布中心的核心領域對象。在圖2中已經闡明,事件發布所需要實現的基本功能要素,Event設計也就主要是達到第3部分所描述功能。

圖3 發布中心領域模型抽象
在圖3中可知,AbstractEvent即是Event的頂層的Entity抽象設計。因為發布中心可能會發布多種不同類型的Event,所以AbstractEvent必須有EventType屬性來表述事件的類型。無論是那種類型的Event實際都是一個Entity,既然是Entity就意味着有自己的ID,EventId作為event的唯一標識符,需要有一個明確的說明的是EventId表示意義實際相當於topic,這就是說不是每發布一個Event就會生成一個新的EventId。例如在摩拜單車的應用中,就開鎖這一類事件,對於每一輛單車,都有對應一個唯一的EventId。對於之前第3部分提到的關於事件需要實現周期、延時以及可靠發布,AbstractEvent定義了cronExpression和deliveryStatus屬性,其中cron表達式可以非常簡潔的描述和實現周期和延時的事件設定, 而deliveryStatus則需要使用狀態來保證分布式網絡環境下事件動作的事務。此外,定義GroupEvent是為了解決第3部分中提到的發布一個事件,響應多台設備。
4.2 訂閱中心領域建模
訂閱中心領域核心抽象是Handler,每一個handler對應為一個訂閱http request。每一個訂閱請求handler都持有其希望響應的EventId,攜帶的業務數據以及結果響應的回調方法。訂閱者期望的是當EventId標識的event發生時,可以立刻收到對應的事件響應,也即是說訂閱http request是作為一個保持長時間等待的網絡連接。因此所有的handler應當有一個holder將其緩存起來管理,這就是單例模式的HandlerHolder存在的意義。對於HandlerHolder在對handler緩存策略可以有兩種選擇:1, 以table形式緩存;2,以map形式緩存。相較兩種緩存策略各有優缺點,table形式節約存儲但查找代價高(可有序存放提高速度,實際在使用Java HashMap或ConcurrentHashMap實現的保持10^5個連接時,並不會多消耗太多內存,相對於List也僅僅是多幾十M的內存而已,因為map中的空桶實際存儲量極小),map形式查找快但耗存儲,但無論那種形式緩存都可以通過分級緩存來提高緩存能力(例如一級緩存簡要數據在系統內存,二級緩存主要數據在redis等緩存系統)。
在圖4中的狀態圖描述了(用戶和設備)訂閱中心以event驅動的handler轉移流程。初始時刻,設備發起訂閱CommandEvent請求,等待發布中心收到用戶發過來的CommandEvent請求,此時發布中心會去判斷該事件是否需要記錄事件交付狀態,如需要得到設備響應的OKEvent,則會去訂閱中心生成對應handler。此時,響應給設備的handler將攜帶deliveryStatus=Waiting 標識,等待設備返回確認結果。隨后,設備返回的確認響應即可通過發布中心發布OKEvent響應用戶處理結果。(實際處理流程應當更復雜,因為沒有考慮異常情況,如設備沒有收到響應結果、備響應結果丟失等,這些都需要做一些補償策略)

圖4 訂閱中心領域建模抽象