Apache Pulsar 在騰訊 Angel PowerFL 聯邦學習平台上的實踐


騰訊 Angel PowerFL 聯邦學習平台

聯邦學習作為新一代人工智能基礎技術,通過解決數據隱私與數據孤島問題,重塑金融、醫療、城市安防等領域。

騰訊 Angel PowerFL 聯邦學習平台構建在 Angel 機器學習平台上,利用 Angel-­PS 支持萬億級模型訓練的能力,將很多在 Worker 上的計算提升到 PS(參數服務器) 端;Angel PowerFL 為聯邦學習算法提供了計算、加密、存儲、狀態同步等基本操作接口,通過流程調度模塊協調參與方任務執行狀態,而通信模塊完成了任務訓練過程中所有數據的傳輸。Angel PowerFL 聯邦學習已經在騰訊金融雲、騰訊廣告聯合建模等業務中開始落地,並取得初步的效果。

Angel 機器學習平台:https://github.com/Angel-ML

Angel PowerFL 對聯邦通信服務的要求

Angel PowerFL 聯邦學習平台在訓練任務過程當中,對參與方之間的消息通信要求極高,要求消息系統必須穩定可靠、保持高性能且能保證數據安全。Angel PowerFL 的學習任務在訓練過程當中,參與方之間會有大量的加密數據通過通信模塊傳輸,Angel PowerFL 對通信服務有以下需求:

➡️ 穩定可靠

Angel PowerFL 的學習任務時長從幾分鍾到幾小時,算法執行對數據的准確性要求很高,不同算法的數據傳輸峰值也不一樣,這需要通信模塊的服務足夠穩定,並且不能丟數據。

➡️ 高性能傳輸

Angel PowerFL 底層通過 Spark 進行計算,Executor 並發執行會產生很多待傳輸的中間數據,通信模塊需要將這些加密后的數據及時傳輸給對方,這就要求通信服務做到低延時、高吞吐量。

➡️ 數據安全

雖然 Angel PowerFL 所有數據都通過加密模塊進行了加密,但參與聯邦學習的業務可能分布在不同公司;跨公網進行傳輸,需要通信模塊足夠安全,不易被攻擊。

為什么選擇 Pulsar

聯邦通信服務在做技術預研的時候,考慮過 RPC 直連、HDFS 同步、MQ 同步三種技術方案。考慮到對安全和性能的要求比較高,排除了 RPC 直連和 HDFS 同步方案,確定采用 MQ 同步方案。

MQ 可選的服務很多,比如 Pulsar、Kafka、RabbitMQ、TubeMQ 等。考慮到 Angel PowerFL 對穩定性、可靠性、高性能傳輸和數據安全有很高的需求,我們咨詢了騰訊數據平台部 MQ 團隊,他們向我們推薦了 Pulsar。

隨后,我們對 Pulsar 開展了深入調研,發現 Pulsar 內置的諸多特性,正好滿足了我們對消息系統的要求。Pulsar broker 和 bookie 采用了計算存儲分層架構,保證了數據穩定可靠,性能良好;Pulsar 支持跨地域復制(geo­-replication),解決了 PowerFL 跨聯邦同步 MQ 問題;而 Pulsar 的驗證和授權模式也能保證傳輸安全。

雲原生的計算與存儲分層架構

Apache Pulsar 是下一代雲原生分布式消息和事件流平台,采用了計算和存儲分層的架構:在 Broker 上進行 Pub/Sub 相關的計算,在 Apache BookKeeper 上存儲數據。

和傳統的消息平台(如 Kafka)相比,這種架構有明顯的優勢:

  • Broker 和 bookie 相互獨立,可以獨立擴展和容錯,提升系統的可用性。

  • 分區存儲不受單個節點存儲容量的限制,數據分布更均勻。

  • BookKeeper 存儲安全可靠,保證消息不丟失,同時支持批量刷盤以獲得更高吞吐量。

Pulsar Geo­-replication

Pulsar 原生支持跨地域復制(Geo­-replication),可以在多個數據中心的多個 Pulsar 集群中同時同步/異步復制數據。還可以在消息級別,通過 setReplicationClusters 控制消息復制到哪些集群。

在上圖中,無論 Producer P1、P2 和 P3 在什么時候分別將消息發布給 Cluster A、Cluster B 和 Cluster C 中的 topic T1,這些消息均會立刻復制到整個集群。一旦完成復制,Consumer C1 和 C2 即可從自己所在的集群消費這些消息。

水平擴展

由於 Pulsar 的存儲設計基於分片,Pulsar 把主題分區划分為更小的塊,稱其為分片。每個分片都作為 Apache BookKeeper ledger 來存儲,這樣構成分區的分片集合分布在 Apache BookKeeper 集群中。這樣設計方便我們管理容量和水平擴展,並且滿足高吞吐量的需求。

  • 容量管理簡單:主題分區的容量可以擴展至整個 BookKeeper 集群的容量,不受單個節點容量的限制。

  • 擴容簡單:擴容無需重新平衡或復制數據。添加新存儲節點時,新節點僅用於新分片或其副本,Pulsar 自動平衡分片分布和集群中的流量。

  • 高吞吐量:寫入流量分布在存儲層中,不會出現分區寫入爭用單個節點資源的情況。

經過深入調研后,我們決定在騰訊 Angel PowerFL 聯邦學習平台上使用 Apache Pulsar。

基於 Apache Pulsar 的聯邦通信方案

聯邦學習的各個業務(Angel PowerFL 稱之為 Party,每個 Party 有不同的 ID,如 10000/20000),可能分布在同個公司的不同部門(無網絡隔離),也可能分布在不同公司(跨公網),各個 Party 之間通過 Pulsar 跨地域復制功能進行同步復制,總體設計方案如下:

聯邦學習的每個訓練任務,通過消息的 producer 和 consumer 連接所在 Party 的 Pulsar 集群,集群名以 fl-pulsar-[partyID] 進行區分,訓練任務產生需要傳輸的中間數據后,生產者將這些數據發送給本地 Pulsar 集群。

Pulsar 集群收到數據后,通過 Pulsar proxy 建立的同步復制網絡通道,將數據發送給使用方 Party。而使用方 Party 的消費者,會一直監聽該訓練任務對應的 topic,當有數據到達后,直接消費數據進行下一步的計算。

在 Angel PowerFL 執行訓練任務時,driver 和每個 partition 會創建一個 channel 類型變量,該變量和 Pulsar 當中具體的 topic 一一對應,需要交換的數據都會經過生產者發送到這個 topic。

Angel PowerFL 支持多方聯邦,因此會有 2+ 個 Pulsar 集群需要同步復制數據。每個聯邦學習任務通過各自的 parties 任務參數指定了參與方,生產者在發送消息時調用 setReplicationClusters 接口,確保數據只在參與 Party 之間傳輸。

在 Angel PowerFL 的通信模塊中,我們充分利用了 Pulsar 的 geo-­replication、topic 限流、Token Authentication 等功能。下面我來詳細介紹如何在 Angel PowerFL 聯邦學習平台中使用 Pulsar。

Geo­-replication 去掉Global ZooKeeper 依賴

在 Angel PowerFL 聯邦學習平台上,部署一套完整的 Pulsar 依賴兩個 ZooKeeper 集群,分別是 Local ZooKeeper 和 Global ZooKeeper。Local ZooKeeper 和 Kafka 中的 ZooKeeper 作用類似,用來存儲元數據。而 Global ZooKeeper 則在 Pulsar 多個集群間中共享配置信息。

在 Angel PowerFL 場景中,每個 Party 加入前,都要先部署一個 Global ZooKeeper 的子節點,或者共用一套跨公司或跨地域的公共 ZooKeeper,這樣不僅會增加部署的難度,也會增加被攻擊的風險,不利於新 Party 加入。

Global ZooKeeper 中存儲的元數據,主要是集群名/服務地址/namespace 權限等信息。Pulsar 支持創建和加入新集群。我們通過以下兩個步驟注冊聯邦 Pulsar 集群的信息到 local ZooKeeper,就去除了對 Global ZooKeeper 的依賴:

步驟 1: 注冊新加入 Party 的 Pulsar 集群

# OTHER_CLUSTER_NAME 為待注冊 Party 的 Pulsar 集群名
# OTHER_CLUSTER_BROKER_URL為 Pulsar 集群對應的 broker 地址
./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} 
 --url http://${OTHER_CLUSTER_HTTP_URL} 
 --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}

步驟 2: 授予訓練用到的 namespace 訪問集群權限

./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} 
 -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}

對於新加入的 Party,只用提供與其對應的 Pulsar 的集群名/服務地址即可完成注冊,geo-replication 就可以通過注冊信息同步復制數據。

Client 增加 Token 認證

Pulsar 作為 Angel PowerFL 的通信模塊,沒有加入用戶級別的權限控制。為了進一步保證 client 生產和消費數據的安全,我們參考 Pulsar Client authentication using tokens based on JSON Web Tokens 增加了 token 認證,Angel PowerFL 的訓練任務除了配置當前 Party 使用的服務地址外,還需要配置 admin token。

https://pulsar.apache.org/docs/en/security-jwt/#token-authentication-overview
由於 Angel PowerFL 整套系統部署在 Kubernetes 上,我們通過容器准備 Pulsar 集群需要的 Public/Private keys 等文件,然后注冊到 K8S secret 中。

# 生成 fl-private.key 和 fl-public.key
docker run --rm -v "$(pwd)":/tmp 
 apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create-key-pair --output-private-key 
 /tmp/fl-private.key --output-public-key /tmp/fl-public.key
# 生成 admin-token.txt token 文件
echo -n `docker run --rm -v 
 "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create --private-key 
 file:///tmp/fl-private.key --subject admin`
# 將認證相關的文件注冊到 K8S
kubectl create secret generic token-symmetric-key 
 --from-file=TOKEN=admin-token.txt 
 --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}

開啟多集群 topic 自動回收

Pulsar 集群開啟了 geo-­replication 功能后,無法通過命令直接刪除用過的 topic,而 Angel PowerFL 訓練任務每次使用的任務是一次性的,任務結束后這些 topic 就沒用了,如果不及時刪除會出現大量累積。

對於通過 geo­-replication 開啟復制的 topic,可以配置 brokerDeleteInactivetopicsEnabled 參數,開啟 topic 自動回收。自動回收無用的 topic,需滿足以下幾個條件:

  • 當前 topic 沒有生產者( producer)或者消費者(consumer)連接
  • 當前 topic 沒有被訂閱
  • 當前 topic 沒有需要保留的信息

Angel PowerFL 部署的 Pulsar 集群,通過 brokerDeleteInactivetopicsEnabled 開啟 topic 自動回收。在執行訓練任務的過程中,使用后對每個 topic 按回收條件進行處理。同時,我們增加了

brokerDeleteInactivetopicsFrequencySeconds 配置,將回收的頻率設置為 3 小時。

優化 topic 限流

Angel PowerFL 中的訓練任務,在不同的數據集/算法/執行階段,生產數據的流量峰值也不同。目前生產環境中單個任務最大的數據量超過 200G/小時。訓練過程中,如果 Pulsar 連接中斷或者生產和消費過程出現異常,需要重新開始整個訓練任務。

為了規避 Pulsar 集群被單個訓練任務沖垮的風險,我們使用了 Pulsar 的限流功能。Pulsar 支持 message-rate 和 byte-rate 兩種生產限流策略,前者限制每秒生產消息的數量,后者限制每秒生產消息的大小。Angel PowerFL 將數據切分成多個 4M 的消息,通過 message-­rate 限制生產消息的數量。在 Angel PowerFL 中,我們將 namespace 的消息限制為 30 條(小於<30*4=120M/s):

./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30

剛開始測試 message-rate 的限流功能時,出現了限不住的情況(限流設置失效)。騰訊數據平台部 MQ 團隊負責 Pulsar 的同事幫忙一起排查,發現設置 topicPublisherThrottlingTickTimeMillis 參數后,限制不能生效。

因此我們想辦法在 broker 端啟用了精確的 topic 發布頻率限制,優化了限流功能並貢獻回社區,詳情見 PR-7078: introduce precise topic publish rate limiting。
https://github.com/apache/pulsar/pull/7078

優化 topic unloading 配置

Pulsar 根據 broker 集群負載狀況,可以將 topic 動態分配到 broker上。如果擁有該 topic 的broker 宕機,或者擁有該 topic 的 broker 負載過大,則該 topic 會立即重新分配給另一個 broker ;而重新分配的過程就是 topic 的 unloading,該操作意味着關閉 topic,釋放所有者(owner)。

理論上,topic unloading 由負載均衡調整,客戶端將經歷極小的延遲抖動,通常耗時 10ms 左右。但 Angel PowerFL 初期在執行訓練任務時,日志爆出大量因為 unloading topic 導致的連接異常。日志顯示 topic unloading 在不斷的重試,但都不成功:

[sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1 s

先來看 broker/namespace/bundle/topic 這四者的關系。Bundle 是 Pulsar namespace 的一個分片機制,namespace 被分片為 bundle 列表,每個 bundle 包含 namespace 的整個哈希范圍的一部分。Topic 不直接分配給 broker,而是通過計算 topic 的哈希碼將 topic 分配給特定的 bundle;每個 bundle 互相獨立,再被分配到不同的 broker 上。

Angel PowerFL 早期的任務 topic 沒有復用,一個 LR 算法訓練任務創建了 2000 多個 topic,每個 topic 生產的數據負載也不同,我們判斷上述斷連問題是由於短時間內(最小任務十分鍾內能結束,同時會有多個任務在運行)大量創建和使用 topic,導致負載不均衡,topic unloading 頻繁發生。為了降低 topic unloading 的頻率,我們調整了 Pulsar Bundle 的相關參數:

# 增加 broker 可最大分配 topic 數量
loadBalancerBrokerMaxTopics=500000
# 啟用自動拆分namespace bundle
loadBalancerAutoBundleSplitEnabled=true
# 增加觸發拆分 bundle 的 topic 數量
loadBalancerNamespaceBundleMaxTopics=10000
# 增加觸發拆分 bundle 的消息數
loadBalancerNamespaceBundleMaxMsgRate=10000

同時,在創建 namespace 時,把 bundle 數量默認設置為 64。

./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64

經過以上調整,Angel PowerFL 在任務執行期間沒有再出現過由於 topic unloading 導致的斷連。

Pulsar on Kubernetes

Angel PowerFL 的所有服務均通過 Helm 部署在 Kubernetes 上。Pulsar 作為其中的一個 chart,可以很好的利用 K8S 的資源隔離、快速擴縮容等特性。在 Angel PowerFL 使用 Helm 部署 Pulsar 的實踐中,我們總結了以下經驗:

🎙️ 使用 Local Persistent Volume 作為存儲

Pulsar 是 IO 敏感的服務,尤其 bookie 組件,在生產環境中建議使用 SSD 或獨立的磁盤。Angel PowerFL 在跑一些大數據集任務時,Pulsar 經常出現 “No Bookies Available” 的異常。這期間磁盤的 IO 使用率很高。

我們通過 Local Persistent Volume 將 bookie 和 ZooKeeper 等其它組件掛載到單獨的磁盤,減緩了磁盤 IO 競爭。我們也測試過將 Pulsar 的 PV 存儲換成 Ceph 和 NFS,性能都沒有直接使用 Local Persistent Volume 好。

🎙️ 使用 NodeSelector

Geo-replication 同步復制數據期間,broker 需要訪問對方的 Pulsar proxy 容器。Angel PowerFL 將網關機單獨打了標簽,通過 NodeSelector 將 broker 安裝在可訪問外網的網關機上。

🎙️ 配置 useHostNameAsBookieID

Bookie 是有狀態的組件,為了 bookie pod 重建后服務正常,需要配置 useHostNameAsBookieID,確保向 ZooKeeper 注冊的 ID 是 pod 的 hostname。

未來計划

Angel PowerFL 目前使用 Pulsar 快一年了,穩定運行時間最長的集群已經超過半年,未來對Pulsar 的使用計划主要有兩個。

👍 升級 Pulsar 到 2.6.x 版本

我們目前使用的是 Pulsar 2.5.2 版本,由於最近會使用 Pulsar Key_Shared 功能做 Angel-PS 的容災恢復。2.6.0 版本剛好有增強 Key_Shared 訂閱模式,所以我們預計未來一個月升級到 Pulsar 2.6.x。
https://github.com/apache/pulsar/pull/5928

👍 Pulsar on K8S 支持多磁盤掛載

Angel PowerFL 所有服務都運行在 Kubernetes 上(除了任務使用的 YARN 計算資源),Pulsar 作為其中的一個 chart 和其它服務一起部署,使用 Local Persistent Volume 作為存儲。但目前 bookie 只支持掛載一塊磁盤(目錄),對於多磁盤的機器沒有更充分的利用,我們計划增加該特性。

總結

我們介紹了在人工智能應用場景下,使用 Pulsar 作為 Angel PowerFL 通信模塊的相關實踐。在方案實現過程當中,我們充分使用了 Pulsar 諸多內置特性,並根據自身需求做了相關優化,如 geo-­replication 去掉 Global ZooKeeper 依賴,為 client 增加 token 認證,開啟多集群 topic 自動回收,優化 topic 限流功能和 topic unloading 配置等。

Pulsar 作為下一代雲原生分布式消息和流平台,有眾多吸引人的功能,在直播與短視頻、零售與電子商務、媒體、金融等行業有廣泛應用,期待 Pulsar 在不同的應用場景下不斷有新的案例落地。

致 謝

特別感謝騰訊數據平台部 MQ 團隊,在 Angel PowerFL 平台使用 Pulsar 過程中給與的技術指導。該團隊在 Apache Pulsar 和 TubeMQ 上有多年的技術積累,積極為 Pulsar 社區做出了巨大貢獻。Pulsar 社區十分活躍,正處於快速成長之中。我們會持續關注並和 Apache Pulsar 社區深入合作,把優化的功能奉獻給 Pulsar 社區,和社區其他用戶一起進一步完善、優化 Pulsar 的特性和功能,共同建設一個更強大完善的 Pulsar 社區。

作者簡介

張超,騰訊數據平台部高級工程師,負責 Angel PowerFL 聯邦通信/PowerFL on K8S 等工作。他和騰訊數據平台部 MQ 團隊一起將 Apache Pulsar 引入 PowerFL 聯邦學習平台,開啟了 Pulsar 在機器學習領域的應用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM