導讀:
眾所周知,在大數據/數據庫領域,數據的存儲格式直接影響着系統的讀寫性能。spark是一種基於內存的快速、通用、可擴展的大數據計算引擎,適用於新時代的數據處理場景。在“大數據實踐解析(上):聊一聊spark的文件組織方式”中,我們分析了spark的多種文件存儲格式,以及分區和分桶的設計。接下來,本文通過簡單的例子來分析在Spark中的讀寫流程,主要聚焦於Spark中的高效並行讀寫以及在寫過程中如何保證事務性。
1、文件讀
如何在Spark中做到高效的查詢處理呢?這里主要有兩個優化手段:
1)減少不必要的數據處理。數據處理涉及文件的IO以及計算,它們分別需要耗費大量的IO帶寬和CPU計算。在實際的生產環境中,這兩類資源都是有限的,同時這些操作十分耗時,很容易成為瓶頸,所以減少不必要的數據處理能有效提高查詢的效率;
以下面的查詢為例:
spark.read.parquet("/data/events") .where("year = 2019") .where("city = 'Amsterdam'") .select("timestamp")
由於在events表中按照year字段做了分區,那么首先通過 year 字段我們就可以過濾掉所有year字段不為 2019 的分區:
因為文件是parquet的文件格式,通過謂詞下推可以幫助我們過濾掉 city 字段不是 "Amsterdam" 的 row groups;同時,由於我們的查詢最終需要輸出的投影字段只有 "timestamp" ,所以我們可以進行列裁剪優化,不用讀取其他不需要的字段,所以最終整個查詢所讀的數據只有剩下的少部分,過濾掉了大部分的數據,提升了整體的查詢效率:
2)並行處理,這里主流的思想分為兩類:任務並行和數據並行。任務並行指充分利用多核處理器的優勢,將大的任務分為一個個小的任務交給多個處理器執行並行處理;數據並行指現如今越來越豐富的SIMD指令,一次動作中處理多個數據,比如AVX-512可以一次處理16個32bit的整型數,這種也稱為向量化執行。當然,隨着其他新硬件的發展,並行也經常和GPU聯系在一起。本文主要分析Spark讀流程中的任務並行。
下面是Spark中一個讀任務的過程,它主要分為三個步驟:
(1)將數據按照某個字段進行hash,將數據盡可能均勻地分為多個大小一致的Partition;
(2)發起多個任務,每個任務對應到圖中的一個Executor;
(3)任務之間並行地進行各自負責的Partition數據讀操作,提升讀文件效率。
2、文件寫
Spark寫過程的目標主要是兩個:並行和事務性。其中並行的思想和讀流程一樣,將任務分配給不同的Executor進行寫操作,每個任務寫各自負責的數據,互不干擾。
為了保證寫過程的事務性,Spark在寫過程中,任何未完成的寫都是在臨時文件夾中進行寫文件操作。如下圖所示:寫過程中,results文件夾下只存在一個臨時的文件夾_temporary;不同的job擁有各自job id的文件目錄,相互隔離;同時在各目錄未完成的寫操作都是存在臨時文件夾下,task的每次執行都視為一個taskAttempt,並會分配一個task attempt id,該目錄下的文件是未commit之前的寫文件。
當task完成自己的寫任務時,會進行commit操作,commit成功后,該任務目錄下的臨時文件夾會移動,寫文件移到對應的位置,表示該任務已經寫完成。
當寫任務失敗時,首先需要刪除之前寫任務的臨時文件夾和未完成的文件,之后重新發起該寫任務(relaunch),直到寫任務commit提交完成。
整個任務的描述可用下圖表示,如果commit成功,將寫完成文件移動到最終的文件夾;如果未commit成功,寫失敗,刪除對應的文件,重新發起寫任務。當寫未完成時,所有寫數據都存在對應的臨時文件中,其他任務不可見,直到整個寫commit成功,保證了寫操作的事務性。
當所有任務完成時,所有的臨時文件夾都移動,留下最終的數據文件,它是最終commitJob之后的結果。
本文介紹的算法是 FileOutputCommitter v1的實現,它的commitJob階段由Driver負責依次移動數據到最終的目錄。但是在當前廣泛應用的雲環境下,通常采取存算分離的架構,這時數據一般存放在對象存儲中(如AWS S3,華為雲OBS),Spark FileOutputCommitter中的數據移動並不像HDFS文件系統移動那么高效,v1的commitJob過程耗時可能會非常長。為了提升FileOutputCommitter 的性能,業界提出了FileOutputCommitter v2的實現,它們可以通過 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version = 1或2 配置項來設置,它和v1的不同點在於,每個Task在commitTask時就將文件移動到最終的目錄,而在commitJob時,Driver只需要負責將Task留下來的空目錄刪除,這樣相比 v1 帶來好處是性能提升, 但是由於commit task時直接寫最終目錄,在執行未完成時,部分數據就對外可見。同時,如果job失敗了,成功的那部分task產生的數據也會殘留下來。這些情況導致spark寫作業的事務性和一致性無法得到保障。
其實v1也不完全一定能保證數據一致性,文件移動過程中完成的數據對外是可見的,這部分數據外部已經可以讀取,但是正在移動和還未移動的數據對外是不可見的,而在雲環境下,這個移動耗時會進一步加長,加重數據不一致的情況。
那么有沒有能夠使得Spark 分析在雲環境下也可以保證數據的事務性和一致性的解決方案呢?華為雲數據湖探索DLI(Data Lake Insight)改進了v1和v2這兩種算法,使得Spark 分析在雲環境下也可以保證數據的事務性和一致性,同時做到高性能,並且完全兼容Apache Spark和Apache Flink生態, 是實現批流一體的Serverless大數據計算分析服務,歡迎點擊體驗。
參考
【1】Databricks. 2020. Apache Spark's Built-In File Sources In Depth - Databricks. [online] Available at: <https://databricks.com/session_eu19/apache-sparks-built-in-file-sources-in-depth>.