實時計算平台


實時計算平台中的彈性集群資源管理

本文系微博運維數據平台(DIP)在實時計算平台的研發過程中集群資源管理方面的一些經驗總結和運用,主要關注以下幾個問題:
 
  • 異構資源如何整合?
  • 實時計算應用之間的物理資源如何隔離?
  • 集群資源利用率如何提高?
  • 集群運維成本如何降低?
 
1. 背景
 
 
這是我們初期的一個實時計算架構,大致划分為三個部分:
 
(1)日志收集;
 
     使用Rsynlog、Flume、Scribe匯聚各個業務方發送過來的日志數據;如果條件允許,業務方也可以直接將數據寫入Kafka。
 
(2)日志傳輸;
 
     使用Kafka作為日志收集組件與實時應用之間的一個高速傳輸通道,實際也是一個日志緩沖區。
 
(3)實時計算平台;
 
     實時計算平台根據使用場景的不同包含有以下兩種類型的應用:
 
(1)自助實時應用:依托於Spark Streaming、Spark SQL構建的通用實時處理模塊,旨在簡化用戶開發、部署、運維實時應用的相關工作,大多數時候用戶通過我們提供的Web頁面即可完成實時應用的創建;
 
(2)第三方應用托管:應用計算處理邏輯與自身業務結合比較緊密,無法很好地抽象出可復用的模塊,通常由業務方使用Spark Streaming自行開發完成,然后通過DIP平台統一部署;
 
這兩種類型的實時應用均以Spark Streaming的形式運行於使用Hadoop Yarn FairScheduler作為資源管理器的集群之上。
 
本文僅介紹實時計算平台中的集群資源管理方案,關於日志收集、日志傳輸的內容可分別參考以下兩篇文章:
 
Kafka Topic Partition Replica Assignment實現原理及資源隔離方案: http://www.cnblogs.com/yurunmiao/p/5550906.html
Flume FileChannel優化(擴展)實踐指南: http://www.cnblogs.com/yurunmiao/p/5603097.html
 
2. Hadoop Yarn集群資源管理
 
在我們Hadoop Yarn集群中,集群資源管理器使用的是公平調度器(FairScheduler),以業務方為單位進行資源划分,為每個業務方分配一個單獨的隊列,這個隊列關聯着一定的資源(CPU、MEM)。為了保障各個業務方的資源使用,我們將各個隊列的資源下限值(minResources)和資源上限值(maxResources)設置為相同,並禁用隊列(業務)之間的資源搶占。上文中提及的自助實時應用和第三方應用提交時,均需要提交至業務方各自的隊列中去運行。
 
既然已經為各個業務方分配隊列,且指定資源量,每個業務方就相當於在我們的Hadoop Yarn集群中擁有一定數量的服務器,集群運行時看起來是這個樣子的:
 
 
簡單起見,我們以隊列名稱代替業務方名稱,每一個Container的資源為1 core、1g mem,由此可以得出如下信息:
 
(1)Hadoop Yarn 集群的總資源量:42cores,42g mem;
(2)三個業務方,即:queueA、queueB、queueC;
(3)queueA占有6台服務器資源:18 cores,18g mem,queueB、queueC各占有4台服務器資源:12 cores,12g mem;
 
這里我們需要注意,Hadoop Yarn FairScheduler只能控制各個隊列的資源使用量,這是一種邏輯資源上的控制,並不能實際控制這些資源(Container)被分配在哪些服務器上運行。以queueA為例,queueA從資源使用分配的角度來說,它享有6台服務器的資源,但並不是說它會獨占集群中的某6台服務器。
 
集群運行時實際是這個樣子的:
 
 
queueA、queueB、queueC在各自資源使用量的范圍內“邏輯共享”集群資源。這些資源的表現形式可以理解為就是Container(每個Container運行時需要分配一定的資源量,如1 core,1g mem),各個業務方的所有Container是混合運行在集群內的各個服務器上的。
 
Hadoop Yarn FairScheduelr的資源管理方式在我們的離線計算場景里運行良好,但在實時計算場景里卻遇到了很多問題。
 
(1)異構資源如何整合?
 
異構資源包括兩個方面:服務器機型不一致、服務器角色不一致。
 
a. 服務器機型不一致
 
很多業務方在接入實時應用時無法提供與我們現有集群同等配置的服務器,大多數性能偏低。這些業務方通常是出於開發、計算、運維效率方面的考慮,從過去的計算引擎(如:nodejs-statsd)遷移至Spark Streaming,機型的選取是根據當時的情況來衡量的。
 
Hadoop已經為我們考慮到機型不一致的情況,它的解決方式是每一台服務器(計算節點,Hadoop NodeManager)的資源使用量可以通過配置文件(yarn-site.xml,yarn.nodemanager.resource.cpu-vcores、yarn.nodemanager.resource.memory-mb)進行設置。我們可以根據業務方服務器的實際情況合理地設置這台服務器可使用的資源量,再根據業務方服務器的資源量總和為其划分相應的隊列資源。
 
這個業務方的問題解決了,但是其它業務方會有這樣的疑問:“我們的實時業務很重要,申請的全部是高性能的服務器,如果我們的應用被調度到這些性能比較差的服務器中運行,計算性能會不會有損耗?”。
 
業務方的這種擔憂確實值得我們考慮,Spark Streaming Application是7 * 24小時不間斷運行的,如果這個應用的Containers被調度至性能迥異的Hadoop NodeManager服務器中去執行,確實無法保證性能較差的服務器上的Container不會拖慢整個實時應用的執行進度。
 
b. 服務器角色不一致
 
新業務方的接入不僅僅需要擴容集群的計算節點(Hadoop NodeManager),同時也需要擴容Kafka Brokers節點。多數情況下業務方無法提供這樣的冗余機器,這時我們設想使用“混合部署”的技術方案,即計算節點與Kafka Brokers節點部署在同一台服務器中,但是這會使得(1)中的“計算性能會不會有損耗”的問題愈演愈烈。
 
注:實時計算場景下,以Spark Streaming Application為例,Hadoop NodeManager Container屬於CPU密集型應用,Kafka Brokers屬於IO密集型應用,如果是獨立業務,兩者混合部署的效果還是不錯的,DIP平台在這方面已經有很好地實踐。
 
(2)實時計算應用之間的物理資源如何隔離?
 
物理資源包括四個方面:CPU、MEM、DISK、NETWORK。如果一台Hadoop NodeManager服務器中運行着不同業務方的Spark Streaming Application Containers,那么這些Containers之間就有可能存在互相影響的情況。
 
目前Hadoop Yarn僅僅支持CPU、MEM的資源管理,同時提供基於線程監控的內存使用機制和基於Cgroup的資源隔離機制,但對於DISK、NETWORK則缺乏相應的資源隔離機制,可能會引發Container之間的資源競爭導致Container執行效率低下,進而影響整個Spark Streaming Application的情況。
 
(3)集群資源利用率如何提高?
 
業務方提交Spark Streaming Application時可以通過以下三個參數指定實時應用運行期間的資源配額,
 
num-executors:Spark Streaming Application運行時需要申請多少個Containers;
executor-cores:Spark Streaming Application的每一個Container需要申請多少 cores;
executor-memory:Spark Streaming Application的每一個Container需要申請多少mem;
 
這三個參數值的選取需要考慮兩個因素:
 
業務方隊列資源冗余情況:DIP平台為業務方划分的隊列中是否有足夠的冗余資源用於新的Spark Streaming Application接入;
 
Spark Streaming Application資源需求情況:如果為該Application分配較多的資源,會導致資源浪費;如果分配較少的資源,會導致計算資源不足,進而影響業務;
 
作為集群資源的管理者,我們還需要考慮更多的問題:
 
a. 集群CPU、MEM資源分配不均衡;
 
    集群CPU資源被耗盡,MEM資源有冗余;或者集群MEM資源被耗盡,CPU資源有冗余;
 
b. 集群資源碎片化的問題;
 
    Hadoop NodeManager CPU/MEM資源均有冗余,卻無法分配給Spark Streaming Application使用,這是因為該Hadoop NodeManager CPU/MEM的資源剩余量不足以滿足任何一個業務方的Spark Streaming Application Container的資源需求量。如果出現這種現象的Hadoop NodeManager數目較多,會導致集群大量資源處於無法使用(浪費)的狀態,而Spark Streaming Application也會出現無法申請Container或申請不到足夠數量的Container的情況。
 
這兩種問題引發的現象:業務方隊列中的資源還有冗余,但是提交的應用卻遲遲分配不到資源。
 
(4)集群運維成本如何降低?
 
集群的運維成本主要來自於第三方實時應用托管,不同的應用對系統環境有不同的依賴,如:Spark Streaming使用Python進行開發,它運行時需要很多依賴的庫,這些庫需要在整個集群的所有Hadoop NodeManager機器上面進行安裝,各種版本沖突問題給我們帶來了很大的運維壓力,同時也是對我們集群環境的一種“污染”。
 
3. 彈性集群資源管理
 
異構資源、物理隔離、資源利用率、運維成本引發了我們對Hadoop Yarn集群資源管理的思考,直接想到的方案是為各個業務方構建自己的集群,如下圖所示:
 
 
根據我們以往的經驗,這種方式運維監控的成本極高,我們很快就否定了這種方案。
 
既然無法構建多個集群,那么是否可以在一個集群內部划分多個“資源池”?每個資源池關聯着若干台機器,為每一個業務方分配一個或多個資源池,業務方提交的應用僅在分配的資源池內運行,即僅在資源池關聯的那些服務器中運行。
 
 
我們以“資源池”的方式來討論一下是否可以解決上述提到的四個問題。
 
(1)異構資源;
 
我們將機型相同或相近的服務器組成一個或多個資源池,資源池的大小根據具體的業務需求而定。經過這樣的處理之后,我們可以認為任何一個資源池內的服務器的計算性能都是等同的。業務方提交的應用只會在某一個資源池內運行,這個資源池內的多個計算節點性能相同,服務器機型不一致可能帶來的性能損耗問題得到解決。
 
如果業務方需要“混合部署”,可以建立一個專用的資源池,將這些需要混合部署多個服務(這里特指Kafka Broker、Hadoop NodeManager)的服務器關聯至該資源池。通常情況下,這種“混合部署”模式的資源池是特定應用專用的,這個資源池內的服務器角色是一致的,服務器角色不一致可能帶來的性能損耗問題得到解決。
 
(2)物理資源隔離;
 
每個業務方僅能使用自己資源池內的物理資源,各個業務方之間的物理資源競爭問題得到解決。
 
對於特定的業務方而言,自己的資源池內可能會運行着多個實時應用,這些實時應用之間依然可能存在着物理資源競爭的問題,這方面我們是這么認為的:
 
a. 因為資源池是業務方獨享的,業務方在開發部署應用時應充分考慮應用之間的物理資源競爭問題,也就是說,將同一個業務方多個應用之間的物理資源資源問題交由業務方自己負責;
 
b. 如果有特殊情況,我們可以為同一個業務方划分多個資源池,甚至一個應用一個資源池;
 
這樣物理資源隔離問題得到解決。
 
(3)資源利用率;
 
“一個集群、多個隊列”的模式下,業務方是不需要考慮資源利用率的問題的,只需要根據隊列分配的資源大小和應用需要的資源來部署應用就可以了。從業務方的角度看,只要資源的使用處於隊列分配資源的區間范圍內就是沒有問題的,這種角度實際上是比較“自私”的,沒有考慮自己的行為對其它業務方的影響,資源分配不平衡、資源碎片化的問題本質即來源於此。
 
“一個集群、多個資源池”的模式下,一個資源池之內的資源是完全被獨享的,業務方必須在自己資源池的范圍內主動充分考慮資源分配不平衡、資源碎片化的問題。每個業務方的這種主動參與會帶來集群整體資源利用率的大幅提高。
 
假設我們有一台24 cores、128g mem的服務器,如果選用它作為Hadoop NodeManager,一般情況下我們會這么設置:22 cores、120g mem,以使得這台服務器的物理資源能夠得到充分的利用,這也是業界比較常見的做法。集群運維過程中,我們發現這樣的現象(以CPU為例):“22 cores已經全部被分配,但是這台服務器的CPU利用率卻只有60%,而且不是個別現象”,這是對集群資源的巨大浪費。“一個集群、多個隊列”的模式下,我們的集群配置需要充分考慮到各個業務方的需求,只能使用上述的通用做法;“一個集群、多個資源池”的模式下,我們則可以根據應用的實際情況為某資源池內的Hadoop NodeManager作出“物別”地設置,如“44 cores、120g mem”,資源池可提供的資源“變多”,集群的整體利用率也會有所提升。
 
(4)運維成本;
 
運維成本主要來自於業務方的特殊需求,這種特殊需求的實施需要涉及到整個集群,包括后續加入的節點,人力成本和維護成本都很高。針對這種情況,我們對Hadoop NodeManager實現“容器化”(Docker)部署,如果業務方有特殊需求,則可以在我們提供的Docker基礎鏡像的基礎之上作出相應的變更,不會直接“污染”系統環境,容易回退,且變更后的鏡像只會部署在業務方特定資源池內的服務器上,不會影響到其它業務方,這種方式使得運維成本大幅下降。
 
綜上所述,“一個集群、多個資源池”的模式能夠解決異構資源、物理資源隔離、資源利用率、運維成本的問題。
 
4. 實現
 
Hadoop Yarn FairScheduler只支持應用之間的公平調度,我們需要對其進行相應的擴展,使之支持“資源池”的模式:
 
(1)資源池可配置,通過資源池名稱可關聯若干Hadoop NodeManager節點(使用主機名稱(Hostname)表示),資源池配置在公平調度器的配置文件fair-scheduler.xml中即可,如下圖所示:
 
 
多個資源池之間的Hadoop NodeManager節點不能有重合,即每一個Hadoop NodeManager節點僅能屬於一個資源池;
 
(2)資源池與隊列之間的對應關系依賴“名稱前綴”來識別;以“pool1”、“pool2”為例,如果隊列名稱以“pool1”為前綴,那么該隊列中的所有應用將運行於資源池pool1中;如果隊列名稱以“pool2”為前綴,該隊列中的所有應用將運行於資源池pool2中;
 
核心思想
 
Hadoop Yarn FairScheduler每次收到Hadoop NodeManager的“NODE_UPDATE”事件時,就會使用公平調度算法為每個處於運行狀態的應用分配Containers。我們需要在公平調度的基礎之上添加資源池的處理邏輯,如為某個應用分配Containers時:
 
(1)獲取“NODE_UPDATE”事件的Hadoop NodeManager的主機名稱;
(2)獲取應用的提交隊列名稱,並根據隊列名稱獲取對應的資源池;如果可以找到對應的資源池,則繼續(3);如果未找到對應的資源池,則結束分配過程,繼續處理下一個應用;
(3)如果該資源池的節點列表中包含(1)中的主機名稱,則繼續使用公平調度算法完成分配;否則,結束分配過程,繼續處理下一個應用;
 
實現
 
Hadoop Yarn FairScheduler原有處理邏輯(Hadoop 2.5.0-cdh5.3.2 org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue.assignContainer):
 
 
在此基礎之上添加資源池的處理邏輯:
 
 
(1)獲取“NODE_UPDATE”的Hadoop NodeManager的主機名稱nodeHostName;
(2)獲取資源池信息groups,其中“key”表示資源池名稱,“value”表示資源池關聯着的Hadoop NodeManager HostNames;
(3)獲取待分配應用的隊列名稱queueName;
(4)尋找隊列名稱queueName對應資源池中的主機名稱列表nodes;
(5)如果nodes包含nodeHostName,則繼續分配過程;否則結束分配過程,繼續下一個應用;
 
也就是說,Hadoop Yarn FairScheduler的原有邏輯,只要收到集群中任何一個Hadoop NodeManager的“NODE_UPDATE”事件,就會根據公平調度算法完成Containers的分配過程;添加資源池的處理邏輯之后,提交至隊列queueName中的所有應用,只有收到來自對應資源池中的Hadoop NodeManager的“NODE_UPDATE”事件時,才會根據公平高度算法完成Containers的分配過程。
 
我們還需要處理這兩種異常情況:
 
(1)應用提交至某隊列,該隊列沒有配置對應的資源池;
(1)應用提交至某隊列,該隊列對應資源池的主機名稱列表為空;
 
這兩種情況我們的處理方式相同,終止該應用的執行,如下:
 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.addApplication
 
 
此外,還涉及到公平調度器fair-scheduler.xml的源碼擴展,如下:
 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService.reloadAllocations
 
 
 
 
實際就是從公平調度器配置文件fair-scheduler.xml中解析出資源池的信息,存儲變量groups中,詳細過程不再贅述。
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
分類:  HadoopKafkaSpark StreamingYarn


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM