Spark on K8S 在有贊的實踐


作者:胡加華&馮明瀟

部門:數據中台

 

一、前言

隨着近幾年業務快速發展與迭代,大數據的成本也水漲船高,如何優化成本,建設低成本高效率的底層服務成為了有贊數據基礎平台2020年的主旋律。本文主要介紹了隨着雲原生時代的到來,經歷7年發展的有贊離線計算平台如何擁抱雲原生,通過容器化改造、彈性伸縮、大數據組件的錯峰混部,做到業務成倍增長的情況下成本負增長。

首先介紹一下目前有贊離線計算的一些現狀。

  • 萬兆網卡的新集群,機器帶寬不再是瓶頸。之前我們完成了一次跨雲運營商(UCloud -> Qcloud)的集群遷移。而且新集群機型全部都是萬兆的網卡(之前老集群還存在部分千兆機型),所以帶寬不再是瓶頸。同時 Qcloud 的機型選擇更加靈活,機器伸縮也更加方便。
  • 90%以上的離線計算任務使用 Spark 引擎。我們 19 年完成離線任務從 Hive 到 Spark 的遷移,因此在考慮 K8s 容器化時,只針對 Spark 處理。
  • 存儲計算混部下的木桶效應。在 YARN 模式下,計算和存儲是混部的,當一種資源不足而集群擴容時,勢必造成了另一個資源的浪費。我們這邊定期擴容的原因總是計算資源先達到瓶頸(雖然一直在對離線任務進行治理和優化),而存儲資源相對比較浪費(即使存儲之前一直沒怎么去優化和治理,比如我們現在數據還是 3 副本的模式,而沒有去使用 Erasure Coding 去優化存儲效率)

針對現在離線計算的局限性,我們提出了新的方向:

  • 存儲計算分離,避免混部下資源的木桶效應。在集群混部的情況下當一種資源不足而需要擴容時,勢必造成了另一種資源的浪費。
  • 提升機器利用率。現在很多在線業務的應用都已經 K8s 容器化,如果離線計算也支持 K8s 的話,就可以在線離線混部,充分利用閑時資源。
  • 更好的資源隔離。docker 容器的資源隔離相比 YARN 基於 NodeManager 比較粗粒度的 container 資源監管更加精確,資源隔離的種類也更加豐富( 目前我們 Hadoop 還處於 2.6.x 的版本,只能做到內存資源的隔離,當然 Hadoop 新版本也開始支持 Docker)。

最終我們將 Spark 遷移到 K8s 環境上,本文會主要介紹 Spark 整體遷移到 K8s 環境過程中的改造,優化,踩坑的經驗,希望能夠幫助到大家。

本文的主要內容包括:

  • 技術方案
  • Spark 改造
  • 部署優化
  • 踩坑和經驗

二、技術方案

從 YARN 環境遷移到 K8s 環境有兩個明顯需要解決的問題,一是 executor dynamic allocation 能力缺失,二是在存儲計算分離之后的 shuffle 和 sort 數據存儲問題。針對這兩個問題我們做了技術方案,這兩個方案也分享給大家。

2.1 Dynamic Allocation

executor 動態分配是一個相對比較有用的功能,它能讓各個任務按需的伸縮資源,使集群的資源利用率更高。在 YARN 環境下需要基於 NodeManager 的 external shuffle service 才能開啟。而在 K8s 環境下因為沒有 external shuffle service 的方案而無法使用,所以我們引入了 SPARK-27963(Allow dynamic allocation without an external shuffle service)來緩解。這個方案是基於跟蹤 shuffle 引用,只有在 executor 產生的 shuffle 沒有被引用的 executor 才可以被釋放,同時 shuffle 引用清理又是基於 spark cleaner 的弱引用清理機制(清理時機依賴於 GC),這種方案釋放 executor 的效果相比原來會差很多。

為支持 K8s 上 executor 動態分配的 shuffle service 方案也有下面兩種方式,大家可以考慮一下( 據我們調研這兩種方式已有一些公司在生產上實踐):

    1. enabling shuffle service as a DaemonSet
      1. 早期 Spark 2.2 版本的實驗功能,將 shuffle service 作為 DaemonSet 部署到 K8s 集群。
      2. 缺陷就是需要用到 hostPath volume 來對接宿主機讀寫 shuffle 數據,跟宿主機綁死了(只能運行在能滿足磁盤需求的宿主機上)。
    2. remote storage for persisting shuffle data(SPARK-25299)
      1. 目前社區正在實現的方案,但進展比較緩慢,至今還沒有全部完成。

      2. 最近 Uber 開源一個比較完整的 RemoteShuffle 實現。我們這邊也在小規模嘗試和驗證。

 

2.2 存儲卷 (PV) 選擇

我們在 YARN 模式下集群使用的是存儲能力比較大的大數據機型,有多個大容量的本地機械盤,而計算存儲是共用這些本地機械盤。存儲計算分離之后就引入了新問題:怎么解決 Spark 計算所需的磁盤需求?
Spark 計算過程中的磁盤需求主要有 shuffle 和 sort 構成。shuffle 部分大家比較清楚,但 sort 往往被忽略,比如在 Spark 內部 sort 場景經常被用到的 UnsafeExternalSorter 。在 executor 內存不足的情況下, UnsafeExternalSorter 就會發生大量 spill 。spill 過程主要是對 IO 讀寫吞吐需求比較大,對存儲容量要求不高。

hostPath

出於降低成本的目的,我們評估了騰訊雲提供的各種機型和存儲產品,最終選擇的方案是:計算節點掛載多塊雲硬盤(塊存儲 CBS),然后在 Spark Pod 中使用 hostPath 方式來引用宿主機雲硬盤的掛載目錄。

目前這個方案也有很多不足:

  1. hostPath 方式的缺點很明顯,使用了 hostPath 的 Pod 綁定了特定的宿主機。你沒辦法將 Pod 調度到沒有雲硬盤掛載目錄的宿主機上,這對於同其他應用的混部會有很大的制約。

  2. 性能受到單盤 IO 吞吐限制,而又無法充分利用雲硬盤的總吞吐。雲商提供的雲硬盤會有一些 IO 吞吐上限設置,比如單塊高性能雲硬盤限制最大吞吐 150MB/s。因此我們會經常碰到由於 IO 不均勻導致某個雲硬盤達到限制瓶頸,而其他盤空閑的情況。

Ceph

為了解決上述問題,我們調研了分布式存儲 Ceph。Ceph 有下列優勢:

  1. 有比較高的讀寫性能。一方面它支持數據條帶化(striping),讀寫能並行利用多塊磁盤;另一方面新的 BuleStore 存儲引擎是基於閃存存儲介質設計,並在裸盤上構建(繞過了本地文件系統),性能上更加突出。

  2. 基於 RADOS 架構核心之上構建了三種存儲方案,分別是塊存儲、文件存儲和對象存儲,可以滿足不同的存儲需求。

  3. 作為雲計算的存儲方案有比較多的實踐(如 OpenStack),同時也是 K8s 官網支持的 volume plugin 之一。

Ceph 作為 K8s 存儲卷 (PV) 時選擇哪種存儲方案呢 ?

  1. 對於一般普通的應用的 Pod,使用 Ceph RBD 是第一選擇,無中心化的架構在穩定性和性能上更具優勢。

  2. 對於 Spark Pod,Pod 之間又有大量的數據讀寫交換(shuffle 和 sort),我覺得可以去嘗試 Ceph FS。因為它的訪問方式支持ReadWriteMany(可被多個節點讀寫掛載),那么在上面提到的 shuffle service 方案中,就可以有第三種嘗試方案:經過存儲卷 (PV)掛載之后,Spark Pod 的讀寫 shuffle 跟本地操作一樣,下游 stage 的 task 需要上游 task 產生的 shuffle 數據文件,只需要知道對應的 shuffle 數據文件路徑就可以直接轉為本地讀;當然這里需要基於 Spark ShuffleManager 擴展出新的一種“Local” ShuffleManager。這種方式不需要再引入額外的 shuffle service 模塊,總體上算是一個比較有吸引力的解決方向。

Ceph 的優勢很多,但復雜性、學習成本、運維難度也比較高。目前 Ceph 方案在有贊大數據還處於測試驗證的階段,我們也在摸索的過程去驗證它的可行性。

三、Spark 改造

Apache Spark 從 2.2 版本開始支持 K8s 環境,到 3.0 版本正式支持 K8s。在有贊,我們使用的 Spark 版本是 2.3.3 版本,Spark 還沒有正式支持 K8s 環境。為了實現對 K8s 環境的支持,需要對 Spark 做一些修改。

3.1 添加小文件合並功能

數據在流轉過程中經歷 filter/shuffle 等過程后,開發人員難以評估作業寫出的數據量。即使使用了 Spark 提供的 AE 功能,目前也只能控制 shuffle read 階段的數據量,寫出數據的大小實際還會受壓縮算法及格式的影響,因此在任務運行時,對分區的數據評估非常困難。

  • shuffle 分區過多過碎,寫入性能會較差且生成的小文件會非常多。
  • shuffle 分區過少過大,則寫入並發度可能會不夠,影響任務運行時間。
  • 在產生大量碎片文件后,任務數據讀取的速度會變慢(需要尋找讀入大量的文件,如果是機械盤更是需要大量的尋址操作),同時會對 HDFS NameNode 內存造成很大的壓力。

因此我們添加了合並小文件的操作。小文件校驗以及合並流程如下:

通過添加新的語法,替換掉了現有的 MR 版本的小文件合並。達到緩解 NameNode 的內存壓力,提高了下游任務性能的效果。改進點在於,現在小文件合並過程是同步合並的,為了更好的靈活性可以修改成為異步合並的模式。

3.2 日志收集服務

Spark 整體遷移到 K8s 之后,日志會隨着 K8s Pod 的釋放而被清除掉。會導致在出現任務異常的情況下,日志會隨着 executor 的釋放而丟失。會給排查線上問題帶來不便。

因此我們自己添加了一個新的組件,如上圖所示。在 Spark executor container 里面加入新的 Filebeat,通過 Filebeat 將日志數據輸出到 Kafka。通過注入環境變量的方式,將 app type , app id , executor Id 三個信息作為 Key,日志行作為內容,發給 Kafka。

吞吐量控制是通過 Kafka topic partition 的數量和 Flink job 的並發度來實現。Flink job 將 executor 級別日志聚合,保存到存儲中,實現了實時,可拓展的日志收集查詢服務,解決了 Spark 在 K8s 環境下日志丟失和不能方便的查詢日志的問題。同時這個服務也能夠提供給公司內部其它在 K8s 環境上運行的組件使用,比如說 Flink 和 Flume 。

3.3 Remote Shuffle Service

我們這里使用的是 Uber 開源的 remote shuffle service ,同時修改了 Spark 的 executor shuffle 數據記錄機制,實現了在使用 remote shuffle service 的情況下,不標記 executor 是否有活躍的 shuffle 數據,實現了在 K8s 環境下 executor 在任務運行完成后迅速釋放掉。而不會因為shuffle 數據由 full gc 回收不及時而導致 executor 沒有任務的情況下不回收。

同時由於 remote shuffle service 的存在,shuffle 數據的存儲離開了 executor pod ,即使在 executor 出現異常的情況下,shuffle 數據還是能夠獲取到,提高了 Spark SQL 任務的穩定性。

下圖是 remote shuffle service 原理圖:

3.4 Spark K8s Driver Pod 構建順序修改

Spark app 啟動需要先構建 Driver Pod,如果你不是通過構建鏡像,而是通過 configmap 的形式注入配置或者掛載 volume 的形式來啟動。對於 spark 系統來說,會先創建 Spark Driver Pod,后創建 configmaps 和 volumes,這會導致 Driver Pod 無法啟動,因為 Pod 在創建時需要依賴的 configmaps 和 volumes 必須存在才能夠正常創建。所以需要針對這種情況修改為先創建需要的資源,再創建 Driver Pod,最后再將兩者關聯起來。

3.5 添加對本地資源的分發

Spark app 啟動過程中,executor 和 driver 都是需要能夠訪問到資源的。如果使用 K8s 的話,會因為 executor 不能訪問到用戶代碼或者資源文件而任務失敗。有兩個解決方案可以處理。

方案一:對每一個新的任務把相關的資源文件放到 ${SPARK_HOME}/jars 目錄中,優點是處理依賴問題容易,缺點是每次需要打包新的鏡像,如果任務很多,需要很多個鏡像,會導致 Docker host 磁盤消耗很大。

方案二:修改 spark-submit 代碼,將資源文件和各種數據都上傳到 HDFS 上,根據特定規則生成目錄,然后在 executor 執行中,下載被上傳的資源文件,添加到 classpath 里面。

綜合考量之后,我們這里采用了方案二,通過 HDFS 系統暫存資源,然后在 executor 中下載資源。

3.6 web ui 暴露

Spark 任務在使用過程中,會有查看 web ui 來查看任務執行狀態的需求,在生產環境中,K8s executor Pod 是不能和辦公網絡環境聯通的,所以要使用 ingress 來轉發請求。要使用 ingress,需要新建對應的 service,配置需要暴露的端口,就可以實現對辦公網絡的 web ui 訪問。

在 K8s 系統中,service 的訪問信息是在集群內部才能生效的,不能在集群外部直接訪問。ingress 是 K8s 系統中為不同的 service 設置的負載均衡服務,是 service 的 “service”, 使用 K8s 統一的 ingress 服務可以通過域名的方式將不同的 service 暴露出去。ingress 的優勢在於可以屏蔽掉 driver Pod ip 的變化,服務重啟或者任務重新調度都會導致 Pod ip 發生變化,ingress 和 service 結合使用,可以實現通過域名訪問,而外部用戶對具體 Pod ip 的變化無感知。

3.7 Spark Pod label 擴展

默認情況 Spark driver Pod 和 executor Pod 的 nodeSelector 是相同的。如果想實現 driver Pod 被調度到特定的 K8s node 上,executor Pod 調度到其它的 node 上,需要對 Pod 創建過程做修改,使得 executor 和 driver pod 的 nodeSelector 不相同。

這個修改的主要目的是為了適應集群動態擴縮容,driver Pod 如果被驅逐任務會整體重算,計算成本太大,所以 driver Pod 需要調度在不會因縮容而驅逐 Pod 的機器上,executor 可以調度在多種機器上。如下圖所示:

3.8 Spark app 狀態管理

當用戶提交了 Spark app 任務到 K8s 環境時,spark-submit 進程會在申請創建 driver Pod 后立即退出,不會監控driver Pod 的狀態,只要 driver Pod 創建成功,spark-submit 進程就會直接返回 0 。Airflow 在調度的時候,是根據命令執行的返回碼來判斷任務執行是否成功,這樣即使任務失敗,但是 spark-submit 進程的返回碼還是會保持為 0 , Airflow 系統會認為任務執行成功。

為了解決 spark-submit 程序返回值和 driver Pod 運行結果無關問題,需要在 spark-submit 中監聽 driver Pod 運行結果,將 driver Pod 的返回值作為 Spark submit 進程的返回值。sssss
當 Airflow 任務需要殺掉一個 spark app 進程時,Airflow 會向 spark-submit 進程發送SIGKILL 命令,能夠成功的殺掉 spark-submit 進程,但是不會影響到 K8s 環境中對應的 driver Pod 運行狀態,會導致Airflow 停止任務功能失效。這里需要添加新的 shutdown hook ,確保 spark-submit 進程在收到 SIGTERM 命令時,shutdown hook 會將這個任務對應的driver Pod 刪除掉。這樣就解決了 Airflow 上 Spark app 任務的狀態和 spark-submit 進程無關的問題。

3.9 動態修改 dynamicAllocation.maxExecutors

Spark thriftserver 修改 dynamicAllocation.maxExecutors 參數啟動后,這個參數在運行過程中是不支持修改的。但是偶爾會遇到業務數據突然增大,或者臨時插入了新任務的情況。這時候為了加速離線任務的產出,我們會擴容設備,添加更多的計算資源,在添加了更多的計算資源后,因為是處於業務高峰期內,不能重啟服務,就需要能夠動態配置 dynamicAllocation.maxExecutors 參數。

四、部署優化

為了節省資源,提高對現有集群的利用率,我們在引入了 K8s 之后,對系統的部署模式也做了較大的優化。

4.1 錯峰混部

不同的業務系統會有不同的業務高峰時間,像離線業務系統典型任務高峰期間會是凌晨的 0 點到 9 點鍾。而像是 HBase 或者 Druid 提供 BI 展示和查詢的系統,常見的業務高峰期是工作日時間,在這個時間以外的其它時間中,可以將其它業務系統的 node 加入到 Spark 所使用的 K8s namespace 中。這樣,Spark on K8s 就可以使用其它業務系統的資源。

需要預先設置可使用資源,在特定時間范圍內將可使用資源的調度打開,結合上文中不同的 Pod label,就可以實現在特定時間內,executor 能夠使用混部服務器的資源。在這種情況下,不需要修改操作系統 CPU 優先級調度策略,在其它業務的低峰期間占用服務器資源不會影響到 RT。

下面會有一個業務系統的例子,混部后在線系統的資源利用率得到了明顯的提高。下圖中描述的是一個在03:00~23:00混部的在線業務系統。能夠看到在混部開啟時間內,集群無論是CPU還是內存的使用率有了明顯的上升。

4.2 彈性擴縮容

我們使用的是騰訊雲,能夠提供 K8s 集群對動態擴容的能力。離線任務在手游拍賣地圖調度上會顯示出周期性,如下圖展示了離線任務 K8s 隊列在高峰期的任務堆積現象。

可以很容易的發現,在任務高峰期,0:00~09:00 期間,任務會有堆積的現象。這意味着集群需要更多的計算資源,高峰期過后,集群就不會在有任務堆積等待執行的現象了。這里可以利用 k8s 快速變更集群節點數量的能力,在 00:00~09:00 時間范圍內,申請全量的資源來保證離線任務的產出,在 09:00~24:00 之間,釋放掉離線集群一半的資源完成日常工作負載。這樣可以節省在離線集群低負載時間內的雲服務資源的費用,也可以在遇到業務高峰時動態擴容來應對業務高峰。

五、踩坑和經驗

在使用 Spark 過程中,我們踩過一些坑,也積累了一些經驗。

5.1 K8s 誤殺 executor

Docker 的 containerd 存在一個 bug ,現象是 container 里的進程退出后,containerd-shim 不退出,在發生這個 bug 后,Docker 系統會認為 Docker 容器中的進程還在運行中。這導致在某些情況下,Docker 容器會嘗試不停的殺掉具有特定 PID 號的進程,在這個過程中,Docker 服務會向特定 PID 發送 KILL 消息。

在同一個節點上,會有其它的 executor 啟動,當發生了上文中的異常后,Docker 系統會持續的發送 KILL 給特定的 PID 。新的 Java 進程啟動后,工作過程中,可能新創建的 Thread ID 會和上文中的 PID 相同,會接收到 KILL 消息,導致線程異常退出,線程的異常退出會導致 Java 進程也異常退出,引起穩定性問題。

針對這個問題,需要升級到 containerd-shim 沒有異常的版本。

5.2 linux 內核參數調優

在 K8s 環境上運行時,executor 需要和 driver 保持網絡連接來維持心跳消息,executor 之間在獲取 shuffle 數據的情況下,也會需要新的網絡連接。這種情況下,會導致某些 executor 的連接數維持在一個比較高的狀態。在業務高峰期,偶現如下異常:

...... Successfully created connection to /x.x.x.x:38363 after 0 ms (0 ms spent in bootstraps)
21/01/22 14:25:12,980 WARN [shuffle-client-4-3] TransportChannelHandler: Exception in connection from /10.109.14.86:38363
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)

這個異常和 Linux 操作系統參數有關系。TCP 連接建立后,三次握手的最后一次握手后,連接會加入到 accept queue 中,這個隊列的計算公式是min(somaxconn,backlog),如果這個隊列打滿的話,會丟掉連接導致出現上文中的異常。在業務高峰期間,一個 executor 的 shuffle 可能會被數千個 executor 獲取,很容易導致部分 executor 的 accept queue被打滿。

針對這種情況,需要針對 Linux 內核參數進行調優。操作系統的全連接隊列參數保存在 /proc/sys/net/core/somaxconn 中,Spark 中使用的 netty 的全連接隊列參數是通過 spark.shuffle.io.backLog 參數配置的,程序運行中實際的隊列大小是這兩個值中的最小值。可以根據具體情況配置的更大一些。

修改了這些配置后,上文中的網絡異常幾乎沒有出現過了。

5.3 executor 丟失,導致任務持續等待

Spark thriftserver 系統在運行過程中,會啟動大量的 executor,每個 executor 有各自獨立的生命周期。

當一個 executor 失聯之后,Spark 系統內會發送一條 executorLost 消息。當系統收到 executorLost 消息之后,KubernetesClusterSchedulerBackend 會開始走 executorDisable 邏輯,這個邏輯會檢查 executorsPendingLossReason 隊列和 addressToExecutorId 這兩個隊列。當這兩個隊列數據有異常的情況,會導致丟失后的 executor 持續存在 ,一直不會被 remove,會進一步導致在這些已經丟失了的 executor 上的 task 不會結束。

所以需要改良這個邏輯,當 executorLost 之后,將邏輯從 disableExecutor 修改為 removeExecutor ,這樣就能解決 executor 失聯后,任務會直接卡住的問題。

5.4 同一個 executor 多個 task 持續等內存

如果一個 executor 配置多個 cores,就會有多個 task 分配到同一個 executor 上。在 Spark 系統中,內存的分配最后是通過 ExecutionMemoryPool 來實現

 while (true) {val numActiveTasks = memoryForTask.keys.sizeval curMem = memoryForTask(taskAttemptId) maybeGrowPool(numBytes - memoryFree)val maxPoolSize = computeMaxPoolSize()val maxMemoryPerTask = maxPoolSize / numActiveTasksval minMemoryPerTask = poolSize / (2 * numActiveTasks)val maxToGrant = math.min(numBytes, math.max(, maxMemoryPerTask - curMem))val toGrant = math.min(maxToGrant, memoryFree)if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) { logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free") lock.wait()//cause dead lock } else { memoryForTask(taskAttemptId) += toGrantreturn toGrant }}

 

具體分配內存的代碼邏輯如上。可以看到分配內存過程中,會有一個循環,循環過程中,會 wait 直到任務運行完成釋放內存才會 notify,這里會導致 Spark 任務在運行過程可能會等待數小時,在任務高峰期會導致任務執行時間不可控。所以需要優化這塊邏輯,添加任務分配超時機制,控制任務分配超時時間,當任務超時后,返回獲取到的內存數量為 0,讓 task 在當前 executor 上失敗,從而在其它的 executor 節點上執行。

5.5 shuffle 數據壞塊

19/12/10 01:53:53 ERROR executor.Executor: Exception in task 6.0 in stage 0.0 (TID 6)
java.io.IOException:
Stream is corrupted at
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:202)
at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:228)
at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
at org.apache.spark.io.ReadAheadInputStream$1.run(ReadAheadInputStream.java:168)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Spark 在大量數據做 shuffle 的過程中,偶然會看到如上圖所示日志,這個錯誤是因為 shuffle 數據寫出過程中損壞了導致的。

通過引入 SPARK-30225 和 SPARK-26089,一定程度上緩解了這個問題。30225 這個 issue 修改的核心在於,只有在數據需要重新讀取的情況下,才會重置 bytebuffer 位置指針。26089 這個 issue 的核心在於,讀取 shuffle 數據塊的 1/3,然后解壓檢查是否有錯誤,如果有錯誤直接拋出 fetchfailed exception,如果沒有錯誤,繼續解壓后續數據。如果解壓數據錯誤的地點已經超過了文件的 1/3,會拋出異常讓整個 task 失敗,通過添加新的消息,添加一種新的 TaskEndReason,重新計算 shuffle 數據。而不是直接拋出IOException,導致任務失敗。

5.6 spark 配置文件加載順序問題

app 任務需要打包才能運行,少量用戶會將一些資源文件打包到 fat jar 里面。這種情況下,再使用 --files 提交相同的資源文件,會導致 Spark 系統只能讀取到 fat jar 里面的資源文件,引發程序執行異常。例如 hive-site.xml 文件,如果打包進入 fat jar 會導致程序異常。這個解決方案也很簡單,需要將 Spark executor 的 user-dir 加入到 executor classpath 中就可以解決問題。

5.7 添加對 k8s 資源不足情況的處理

Message: Forbidden! User xxxx doesn't have permission. pods "thrift-jdbcodbc-server-xxxxx-exec-xxxx" is forbidden: exceeded quota: cpu-memory-quota, requested: requests.cpu=..., used: requests.cpu=..., limited: requests.cpu=....

Spark 程序啟動過程中,偶爾會遇到如上所示的錯誤信息。這個錯誤信息的含義當前 K8s namespace 資源用盡,超出了 resource quota 。

Spark app 任務在啟動時,會申請新的 Pod 作為運行 driver 的載體。在這個過程中,社區版本會在 driver Pod 申請過程中有一次超時等待,如果分配超時,spark-submit 進程會返回非 0 的數值,這會導致在沒有資源的情況下任務直接失敗,但是在批量任務調度過程中,任務因為資源情況或者優先級情況等待是一個很常見的現象,對於這種情況需要的是 Spark app 任務等待資源,當資源就緒后直接運行即可。

我們添加了資源不足情況下的重試等待機制。一個簡單的策略如重試 50 次,每次重試之間等待 10s。

五、結語

有贊大數據離線計算 Spark 任務從 YARN 上轉移到了 K8s 環境上,擁抱了雲原生,通過實現存儲計算分離,容器化和混部,具有了小時級別資源擴展能力,在面對業務高峰時,能夠更加游刃有余。經過各種改造,優化,踩坑,也補齊了開源版本 Spark 的問題,能夠更好的支撐業務。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM