關於 Apache Pulsar
Apache Pulsar 是 Apache 軟件基金會頂級項目,是下一代雲原生分布式消息流平台,集消息、存儲、輕量化函數式計算為一體,采用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據復制,具有強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性。
GitHub 地址:http://github.com/apache/pulsar/
案例導讀:本案例介紹了清華大學能源互聯網創新研究院將 Apache Pulsar 落地能源互聯網方向的實踐。Pulsar 的雲原生架構、Schema、Functions 等特性滿足了相關業務需求,也減輕了他們開發和運維負擔。
閱讀本文需要大約 8 分鍾。
團隊及業務簡介
能源互聯網是電力與能源工業發展的方向。隨着信息、通信和互聯網技術的飛速發展,可獲取的數據量正以爆炸式方式迅猛增長,傳統的數據處理方法已難以應對這些海量且增長極快的信息資產,大數據理論正是在這樣的狀態下應運而生。大數據處理技術能幫助我們透過海量數據快速分辨其運行狀態及發展趨勢,在紛繁的世界中獨具洞察力。
清華大學能源互聯網創新研究院能源大數據與開放生態研究中心匯集了國內外能源及電力大數據領域的多位專家,致力於推動大數據基礎理論和實踐應用的全面創新。能源大數據與開放生態研究中心將大數據技術應用於能源互聯網、智能電網和智慧用能等工程場景,結合高性能優化、並行計算和人工智能等先進技術,研發適用於能源電力行業特點的大數據 / 雲計算平台,和基於數據驅動的能源電力系統的高級應用,從而實現大數據產業的發展,形成以數據為核心的新型產業鏈,推動我國能源產業的轉型與升級。
挑戰
我們團隊的業務主要是與電力相關的物聯網場景,旨在實現用戶對傳感器等設備數據的需求開發。我們團隊規模較小,但任務繁雜,希望能更快更穩地實現客戶的需求。
在整理業務需求后,我們提出以后端即服務(BaaS)為主、基於消息的服務方案。在物聯網領域內,基於這樣的解決方案,我們可以共用更多基礎設施服務,同時可以快速應對不同需求進行業務開發。考慮到特殊的業務需求,我們的平台需要具備以下特性:
- 多租戶:平台要實現業務分離,服務不分離,又可以確保安全審核,滿足客戶對數據安全性的敏感需求,就必須支持多租戶。此外,還可以在通訊、數據、業務這三方面提供一些基礎服務,比如自定義數據結構的 Schema Registry,自定義數據歸屬的 ACL 權限管理(增加刪改的 API 接口),以及實現各種業務的自定義函數引擎。
- Schema Registry:滿足不同需求和應用場景下設備多變的數據結構,提供允許自定義數據結構的 Schema Registry。
- 通用 API:提供包含增加刪改的 HTTP RESTful APIs 和相應的 WebSocket 接口,確保在通訊上提供基礎服務,並基於這一基礎服務進行擴展。
- ACL 權限管理:可自定義數據的 ACL 權限控制服務,保障數據安全。
- 時序數據庫:多數情況下,物聯網場景都在和時序數據打交道,所以我們選擇了基於 PostgreSQL 的開源 TimeScaleDB,並且依托 TimeScaleDB 做了一系列時序數據的聚合查詢接口。
- 用戶自定義 functions:實現各種業務的自定義函數引擎。
之前我們使用基於 RabbitMQ 和 Celery 的方案來實現用戶自定義 functions 的函數引擎。這一方案的最初使用效果良好,但隨着業務的增長,問題越來越多。我們的小團隊不得不花更多時間來解決問題和優化整體方案。當 Celery 作為任務隊列時,這些問題尤為嚴重。
我們花費大量的時間和精力處理的問題主要有兩個:
- 需要仔細配置 Celery 的 worker 和 task,避免執行時間長的任務阻塞其他任務;
- Worker 更新時需要中斷服務,更新時間也相對較長。
此外,在特殊場景中,如果單個消息比較大且消息處理時間長時,Celery 和 RabbitMQ 的內存負擔都比較大。
隨着客戶數量和項目數量的增加,這些問題變得日益突出,我們決定找一個新產品替代原有方案。
為什么選擇 Apache Pulsar?
如上所述,我們希望消息中間件可以提供以下特性:
- 多租戶
- 可靠性和高可用
- 支持多協議,尤其可以很方便地轉換協議:在物聯網領域,我們需要應對不同的通信協議,把不同通信協議的數據全部導入到消息中間件中。
- 支持多語言:我們團隊主要使用 Go 語言,但我們會和很多使用其他語言的團隊合作,所以消息中間件最好可以支持其他語言。
- 作為輕量級計算引擎實現簡單的消息處理。
在調研不同的消息中間件時,我們很快發現了 Pulsar。通過 Pulsar 的文檔和發布日志,我們了解到 Pulsar 有很多優秀的特性,所以決定對 Pulsar 進行測試和評估。經過深入研究、學習,我們發現 Pulsar 的雲原生架構、Schema、Functions 等非常適合我們的業務需求。
- 雲原生:Pulsar 支持雲原生,擁有諸多優秀的特性,如計算與存儲分離,可以很好地利用雲的彈性伸縮能力,保證擴容和容錯。此外,Pulsar 對 Kubernetes 的良好支持也在一定程度上幫助我們將一部分業務輕松遷移到了 Kubernetes 上。
- Pulsar Functions:Pulsar Functions 是一個優秀的輕量級計算引擎,可以很好地取代 Celery 方案。我們可以更多地嘗試使用 Pulsar Functions 來處理業務,這是我們選擇 Pulsar 的主要原因。
- 分層存儲:這一特性能夠節約存儲成本。我們的使用場景會產生很多傳感器的原始數據,需要作為冷數據存儲。借助分層存儲,我們可以直接將這些冷數據存儲在價格更低的存儲服務中,也無需開發額外的服務來存儲數據。
- MQTT/MoP:Pulsar 對各種協議的兼容展示了社區的開放性。在 MoP 發布前,我們開發了 MQTT 協議的轉發工具,把 MQTT 協議上的數據轉發到 Pulsar 中。
- Pulsar Schema:我們的平台通過 JSON 來描述數據 schema,通過對接 Pulsar Schema 和我們自己的 Schema Registry,可以實現消息序列化的工作。目前 Pulsar 在 Go Schema 的功能仍處於起步階段,我們也會嘗試做一些實踐與貢獻。
- 多語言:我們很看重多語言支持,尤其是 Go 語言。Pulsar 有 Go 語言相應的客戶端、Go function runtime、基於 Go 語言實現的 Pulsarctl 等。我們也希望 Pulsar 未來可以支持更多語言,因為我們不能預見客戶的需求,支持多語言能夠幫助我們更輕松地解決問題。
- Pulsar Manager & Dashboard:Pulsar 在各個層級都啟動了接口來獲取 Metrics。Pulsar 的其他工具(如 Prometheus、Grafana、Pulsar Manager)能夠幫助我們減輕運維、優化、排錯的投入。
- 開源:Pulsar 社區開放、活躍、友好。有 StreamNative 這樣的公司做支撐,用戶可以放心地選擇 Pulsar,把業務遷移到 Pulsar 上。
深入了解 Pulsar 后,我們決定對 Pulsar 進行測試,並嘗試遷移一個生產環境的應用。
遷移試驗:樓宇智慧用電
樓宇智慧用電是我們在用電分析和預測領域做的一次嘗試,我們希望采集到辦公室中每一個用電點的用電信息。在研究院新辦公樓裝修初期,我們進行了技術評估,將使用 zigbee 協議的智能插座列入了裝修方案。整個部署包含三層樓,約 700 個智能插座和 50 個 zigbee 網關。插座部署在辦公場所的所有用電點,包含工位插座、牆壁插座以及中央空調風機插座。所有數據通過智能插座廠商提供的局域網廣播方案,將廣播數據轉發到 Pulsar 中實現數據點的采集和預處理。目前用電量數據每 10 秒鍾上送一次,其他與用戶相關的操作(包括開關插座、插拔用電設備)則實時上送。針對這些數據,我們做了一些數據可視化的嘗試,並把數據貢獻給研究院的其他團隊進行分析,或用作開發算法的參考信息和原始數據。
基於智能插座設備廠商提供的 MQTT 方案,我們嘗試將 MQTT 協議的數據都轉發到 Pulsar 中。在轉發過程中,我們遇到的主要問題是 MQTT topic 和 Pulsar topic 的映射。我們的解決方案是直接把所有的 MQTT 數據轉發到同一個 Pulsar topic 中,同時把部分元數據包裝在轉發的消息中,再通過 Pulsar Functions 做消息路由,把消息轉發到不同的業務 topic 中。下圖展示了如何將傳感器產生的數據傳送至平台並最終入庫。
在從 MQTT 轉發數據到 Pulsar 的過程中,我們默認把所有設備的數據都轉發到同一個 topic 中,並通過 verificate function 進行驗證(包括解密和內容檢查),保障數據的合法性。合法的數據會被轉發到一個中間 topic 等待消息路由分發,消息分發的 function 會從數據中解析出設備類型和消息類型,再轉發到對應業務 topic 中,等待被對應業務 topic 綁定的 ETL function 做處理。在使用 ETL function 處理時,我們也會根據設備類型提取不同的數據,對網關設備提取網關狀態、設備信息,對插座提取用電數據和插座的狀態信息。這些信息會匹配我們平台的 Schema Registry 數據結構,我們再把生成的數據做 Schema Mapping(通過 Functions 實現),最后統一轉發這些結構化的數據到 sink topic 中,由 sink function 寫入到數據庫。
樓宇智慧用電的遷移測試有力驗證了 Pulsar 符合我們的需求。在遷移過程中,我們查閱了 Pulsar 文檔,從社區獲得了大力支持和幫助,遷移過程高效、順利。借助 Functions 的開放與便利,我們很快完成了流程圖中所有 function 的開發和調試,上線了整個業務系統。
在業務遷移過程中,Pulsar 運行狀態良好,團隊一致認為 Pulsar 可以幫助我們減輕開發和運維負擔,所以我們選擇 Pulsar 作為研究中心唯一的消息中間件服務,我們的小團隊也開始跟隨 Pulsar 一起進行一系列雲原生遷移和優化工作。
決定方案后,我們將 Apache Pulsar 進一步應用到電網智能傳感和智能變電所的場景,這些場景都與物聯網、能源和電力相關。下文將詳細介紹我們如何使用 Pulsar 和 Pulsar Functions,以及如何通過 Pulsar Functions 簡化傳感器數據流的相關處理。
Pulsar x 電網智能傳感
電網智能傳感場景主要基於清華大學能源互聯網創新研究院與電網公司合作的輸電線路智能多參數傳感器集成研究項目。該項目的傳感器來自不同的廠家,分布在輸電線路的各個位置,傳感器類型因此也不盡相同,包括桿塔、桿塔上、輸電線路側等十多種。整個系統目前接入總長度約六百公里,包含六百多個桿塔的輸電線路傳感器。這一場景主要負責對各種傳感器的數據進行在線監測和告警,同時,我們也單獨針對電壓傳感器做了暫態電壓分析。
這個應用場景有兩個難點:一是來自不同廠商的傳感器沒有統一的通信協議,有的使用電力相關的 IEC104 規約,有的使用 protobuf 或其他廠商自定義協議;二是項目數據量比較大,有些傳感器可能會單次產生 20 MB 甚至更大的消息,有些傳感器則每秒上傳一次數據。
借助 Pulsar,我們選擇在 producer 端不做任何數據處理,直接將數據轉發到 Pulsar 中,再通過 Pulsar Functions 做進一步的數據預處理和其他業務操作。以電壓傳感器為例,電壓傳感器會產生三類數據,分別是心跳數據、穩態波形數據和暫態波形數據。其中心跳數據和穩態波形數據通過 protobuf 協議傳輸,暫態數據則通過 zip 壓縮文件的形式傳輸。接收到 protobuf 的數據后,借助 Pulsar Functions 進行一系列的數據處理,包括通過解密 function 完成數據解密和 protobuf 的反序列化,再對數據進行路由,通過對應的 ETL function 做數據處理和解析,最后通過 Schema Mapping 將數據入庫。我們把這個流程的每一步都封裝成獨立的 Pulsar function,這樣做出於三點考慮:
- 我們希望監控到整個數據流過程中每一個環節的狀態,采集每個過程的 metrics,並且觀測一些重點指標,比如是否存在 backlog 積壓。狀態監測方便我們調整每個環節 function 的並行數量。
- 使整個數據流更加靈活,便於我們在不同流程中新增和刪除 function。
- 更大程度地保障了我們可以重用自己維護的 function。
這個方案也遇到了一些小困難,比如由於 function 比較多,我們需要花更多時間部署、維護每一個過程的中間 topic。目前,我們的解決方案是直接寫對應的代碼一次性完成部署和維護。雖然需要投入更多精力,但我們認為這種 function 的開發和部署模式是值得的。上文提到電壓傳感器除了會產生 protobuf 的兩種數據外,還會產生一種暫態數據。暫態數據一般在電網發生故障或異常時產生,類似電力系統的快照,記錄故障發生前到發生時,再到發生后的波形狀態。在電力系統中,暫態數據通常有標准的存儲方案和特定的解析接口。相對於傳感器產生的其他數據來說,這類數據的特點是比較大,動輒幾十兆。我們應對暫態數據的方案是先解壓縮這些數據,再分析數據文件。這里我們借助了 Pulsar Functions 多語言支持的特性,流程圖中的藍色部分使用 Go function 實現,黃色部分使用 Python 實現,Python 有一個解析電網暫態數據的庫,可以調用,就免去了我們自己花時間實現一套 Go 版本解析接口的工作。
Pulsar x 智能變電所
智能變電所是我們在變電系統中變電環節的一些嘗試,這個項目基於我們合作的智能輸變電設備廠商,希望基於開關櫃等變電所設備實現變電所的數據接入。這個項目的主要目標是實現實時監測、故障診斷和異常監測這三大功能。
在智能變電所的場景中,通常由設備生產廠商提供設備的故障診斷算法或診斷應用,我們需要將不同性質的算法或應用集成到現有方案中。客戶提供的算法可能直接在 Pulsar Functions 中調用,也可能是已經編譯好的可執行文件,甚至可能是其他語言的實現,比如 R 語言。針對這一系列問題,我們先把客戶提供的實現封裝在 Docker 容器中,在容器中實現一個最小的 Pulsar function runtime,再通過 Docker proxy function 和 Docker endpoint 溝通,在觸發 function 時創建對應算法的容器實現計算,最后將結果回傳到 Pulsar 對應的 topic 中。
另外,在這一場景中我們也遇到了一些應用層面的需求,比如消息推送。我們借助 Pulsar Functions 實現了一些業務功能,在 Functions 中可以很方便地調用不同服務商的接口,實現消息推送,比如短信、郵件、應用程序的推送服務。此外,通過 Pulsar Functions,我們得以把消息推送的業務需求從平台中解藕出來,把服務做成 function,便於后續在有同樣需求的場景中直接使用。
使用 Pulsar 遇到的問題及解決方案
我們在使用 Pulsar 的過程中遇到了一些問題,下文會分享解決這些問題的一些經驗,希望可以對准備或者已經在使用 Pulsar 的同學提供一些幫助。
第一個是關於 Pulsar 默認消息大小的問題。在默認配置下,Pulsar 支持的最大消息是 5 MB,在上文提到的智慧電網案例中,單條消息有時會超過 20 MB。我們根據文檔修改了 broker 配置文件中的 MaxMessageSize
參數,但修改的配置並沒有生效,超過 5 MB 的消息依然不能正常傳遞到 Pulsar 中。於是我們在 Pulsar 社區尋求幫助,得到了社區的迅速回應。這個問題的主要原因是 Pulsar 2.4.0 中 MaxMessageSize
沒有同步到 BookKeeper,所以即使 broker 可以接收更大的消息,broker 仍然不能把消息傳遞到負責存儲的 BookKeeper 中。因此除了修改 MaxMessageSize
值外,還需要修改 broker 和 BookKeeper 中 nettyFrameSizeBytes
相關配置,這些配置保持一致,Pulsar 就可以處理更大的單條消息。
第二個問題是我們在使用 Pulsar Functions 處理數據時,topic 中可能會出現 backlog 積壓越來越多的情況。Backlog 包括沒有發送給 Functions(consumer)的數據,也包括已發送但未被 Functions(consumer)ack 的數據。根據我們的經驗,在 Functions 場景下,消息積壓可能是因為 function 處理單條消息的速度慢,處理時間長,或者 function 崩潰。如果是因為 function 處理消息慢,一種解決方案是增加 function 的並行數量,再具體分析執行速度慢的原因並進行優化;另一種方案是把復雜的 function 分成多個簡單的 function,也就是在智能電網場景中提到的把一個復雜的 function 拆成多個 function,通過 function 的鏈式模式把整個流程鏈接起來。這樣我們可以很方便地觀測每一個 function 的狀態,也可以針對某個 function 做進一步的優化。如果由於 function 崩潰造成 backlog 積壓,則需要保障 function 的穩定性,並借助 function 的 log topic 進行調試。
第三個問題是當 producer 數量增加時,很難統一管理和觀測每個 producer 的狀態,即 producer 與 broker 之間的通信狀態和 producer 與數據源之間的通信狀態。針對這個問題,我們目前的解決方案是給 producer 增加心跳消息到對應的心跳 topic 做整體監控,同時,監控 producer 和 broker 的狀態連接。通過這些改動,我們可以較好地聚合觀測 producer 的運行狀態。我們注意到 GitHub 上也在討論類似問題,期待和社區一起提出更優秀的解決方案。
期待
我們期待 Pulsar 能改善或增加以下功能。
- Pulsar Functions Mesh 實現了對 function 進行類似於 Kubernetes 的服務編排,我們期待該功能的發布。上文提到我們實現了鏈式 function 的解決方案,但這種方式在維護上遇到很大挑戰,希望 Functions mesh 可以解決這個問題。
- 希望 Pulsar functions 支持更多語言的 runtime。我們用 function 做 Docker proxy function,這個方案雖然可行,但希望有更優秀的解決方案。
- IoT 場景很注重邊緣計算,我們希望 Pulsar 可以在邊緣計算上做一些嘗試。我們關注到 Pulsar 允許將 Functions 的消息推送到另一個 Pulsar 集群中,允許 Functions 與外部 Pulsar 集群通訊。通過這一改動,可以嘗試將 Pulsar 部署到邊緣設備上,並使用 Pulsar Functions 在這些設備上進行計算。部署 Pulsar 對內存的需求較大,在一些運算能力較弱的邊緣設備上部署 Pulsar 比較困難,希望 Pulsar 能在后續版本中優化或提供其他方案解決這一困擾。
結語
作為一個開源項目,Pulsar 正在快速發展,文檔更新迅速,社區響應及時,社區規模不斷壯大。我們希望深入了解 Pulsar,參與 Pulsar 開發貢獻,和社區分享我們的實踐經驗,與 Pulsar 社區共同發展。
在使用 Pulsar 的過程中,我們遇到一些困惑,感謝 StreamNative 團隊小伙伴們的大力支持,幫助我們順利將 Pulsar 應用到上述業務場景中。未來,我們會積極嘗試 Pulsar 的各種新功能,並將 Pulsar 應用於更多的能源互聯網場景中。
作者簡介
胡軍,清華大學電機系副教授,清華大學能源互聯網創新研究院能源大數據與開放生態研究中心執行主任,IEEE Member,CIGRE Member。