徒手打造基於Spark的數據工廠(Data Factory):從設計到實現


在大數據處理和人工智能時代,數據工廠(Data Factory)無疑是一個非常重要的大數據處理平台。市面上也有成熟的相關產品,比如Azure Data Factory,不僅功能強大,而且依托微軟的雲計算平台Azure,為大數據處理提供了強大的計算能力,讓大數據處理變得更為穩定高效。由於工作中我的項目也與大數據處理相關,於是我就在思考,是否自己也可以設計打造一個數據工廠,以便尋求一些技術痛點的解決方案,並且引入一些有趣的新功能。 因此,我利用業余時間,逐步打造了一個基於Spark的數據工廠,並取名為Abacuza(Abacus是中國的“算盤”的意思,隱喻它是一個專門做數據計算的平台,使用“算盤”一詞的變體,也算是體現一點中國元素吧)。說是基於Spark,其實從整個架構來看,Abacuza並不一定非要基於Spark,只需要為其定制某種數據處理引擎的插件即可,所以,Spark其實僅僅是Abacuza的一個插件,當然,Spark是目前主流的數據處理引擎,Abacuza將其作為默認的數據處理插件。 Abacuza是開源的,項目地址是:https://github.com/daxnet/abacuza。徒手打造?是的,沒錯,從前端界面都后端開發,從代碼到持續集成,再到部署腳本和SDK與容器鏡像的發布,都是自己一步步弄出來的。項目主頁上有一個簡單的教程,后面我會詳細介紹一下。在介紹如何使用Abacuza之前,我們先了解一下它的整體架構和設計思想。雖然目前Abacuza還有很多功能沒有完成,但並不影響整個數據處理流程的執行。

整體架構

Abacuza和其它的數據工廠平台一樣,它的業務流程就是分三步走:數據讀入、數據處理、結果輸出。Abacuza的整體架構圖就很清楚地體現了這個業務流程:

(點擊查看大圖)

數據輸入部分

數據的輸入是由輸入端點(Input Endpoints)來定義的。Abacuza支持多種數據類型的輸入:CSV文件、JSON文件、TXT文本文件、Microsoft SQL Server(暫未完全實現)以及S3的對象存儲路徑,今后還可以繼續擴展輸入端點,以支持基於管道(Pipeline)的數據處理流程,這樣一來,用戶就不需要自己使用C#或者Scala來編寫數據處理的邏輯代碼,只需要一套JSON文件進行Pipeline定義就可以了。

數據處理部分

當數據輸入已經定義好以后,Abacuza會根據Input Endpoint的設置,將數據讀入,然后轉交給后端的數據處理集群(Cluster)進行處理。Abacuza可以以插件的形式支持不同類型的集群,如上文所說,Apache Spark是Abacuza所支持的一種數據處理集群,在上面的架構圖中可以看到,Abacuza Cluster Service管理這些集群,工作任務調度器(Job Scheduler)會通過Abacuza Cluster Service將數據處理任務分配到指定類型的集群上進行處理。 對於Spark而言,具體的數據處理邏輯是由用戶自己編寫代碼實現的。Spark原生支持Scala,也可以使用PySpark,Abacuza使用Microsoft .NET for Spark項目實現從.NET到Spark的綁定(Binding),用戶可以使用C#來編寫Spark的數據處理邏輯,后面的演練部分我會詳細介紹。 那么與Scala相比,通過.NET for Spark使用C#編寫的數據處理程序會不會有性能問題?嗯,會有點性能問題,請看下圖(圖片來源:微軟.NET for Spark官方網站):

在這個Benchmark中,處理相同總量的數據,Scala使用了375秒,.NET花了406秒,Python使用433秒,雖然與Scala相比有些差距,但是比Python要好一些。但是不用擔心,如果在你的應用場景中,性能是放在第一位的,那么Abacuza的Job Runner機制允許你使用Scala編寫數據處理程序,然后上傳到Spark集群執行(也就是你不需要依賴於.NET和C#)。

數據輸出部分

與數據輸入部分類似,處理之后的數據輸出方式是由輸出端點(Output Endpoints)來定義的。Abacuza也支持多種數據輸出方式:將結果打印到日志、將結果輸出到外部文件系統以及將結果輸出到當前項目所在的S3對象存儲路徑。無論是數據輸入部分還是輸出部分,這些端點都是可以定制的,並且可以通過ASP.NET Core的插件系統以及docker-compose或者Kubernetes的volume/Block Storage來實現動態加載。

相關概念和運作機理

Abacuza有以下這些概念:

  1. 集群(Cluster):一個集群是一個完整的大數據處理平台,比如Apache Spark
  2. 集群類型(Cluster Type):定義集群的類型,例如,運行在localhost的Spark集群和運行在雲端的Spark集群都是Spark集群,那么它們的集群類型就是spark。
  3. 集群連接(Cluster Connection):定義了Abacuza數據工廠訪問集群的方式,類似於數據庫系統的連接字符串
  4. 任務執行器(Job Runner):定義了數據處理任務應該如何被提交到集群上執行。它可以包含具體的數據處理業務邏輯
  5. 輸入端點(Input Endpoint):定義了原始數據(需要被處理的數據)的來源
  6. 輸出端點(Output Endpoint):定義了處理完成后的數據的輸出方式
  7. 項目(Project):一種類型數據處理任務的邏輯定義,它包括多個輸入端點、一個輸出端點以及多個數據處理版本(Revision)的信息,同時它還定義了應該使用哪個任務執行器來執行數據處理任務
  8. 數據處理版本(Revision):它歸屬於一個特定的項目,表示不同批次的數據處理結果

當一個用戶准備使用Abacuza完成一次大數據處理的任務時,一般會按照下面的步驟進行:

  1. 使用用戶名/密碼(暫時只支持用戶名密碼登錄)登錄Abacuza的管理界面
  2. 基於一個已經安裝好的集群(比如Apache Spark),配置它的集群類型集群連接,用來定義Abacuza與該集群的通信方式(集群和集群連接定義了數據應該在哪里被處理(where))
  3. 定義任務執行器,在任務執行器中,設置運行數據處理任務的集群類型,當數據處理任務被提交時,Abacuza Cluster Service會基於所選的集群類型,根據一定的算法來選擇一個集群進行數據處理。任務執行器中也定義了數據處理的邏輯,(比如,由Scala、C#或者Python編寫的應用程序,可以上傳到spark類型的集群上運行)。簡單地說,任務執行器定義了數據應該如何被處理(how
  4. 創建一個新的項目,在這個項目中,通過輸入端點來設置所需處理的數據來源,通過輸出端點來設置處理后的數據的存放地點,並設置該項目所用到的任務執行器。之后,用戶點擊Submit按鈕,將數據提交到集群上進行處理。處理完成后,在數據處理版本列表中查看結果

技術選型

Abacuza采用微服務架構風格,每個單獨的微服務都在容器中運行,目前實驗階段采用docker-compose進行容器編排,今后會加入Kubernetes支持。現將Abacuza所使用的框架與相關技術簡單羅列一下:

  1. Spark執行程序選擇Microsoft .NET for Spark,一方面自己對.NET技術棧比較熟悉,另一方面,.NET for Spark有着很好的流式數據處理的SDK API,並且可以很方便地整合ML.NET實現機器學習的業務場景
  2. 所有的微服務都是使用運行在.NET 5下的ASP.NET Core Web API實現,每個微服務的后端數據庫采用MongoDB
  3. 用於任務調度的Abacuza Job Service微服務使用Quartz.NET實現定期任務調度,用來提交數據處理任務以及更新任務狀態。后端同時采用了PostgreSQL數據庫
  4. 存儲層與服務層之間引入Redis做數據緩存,減少MongoDB的查詢負載
  5. 默認支持的Spark集群使用Apache Livy為其提供RESTful API接口
  6. 文件對象存儲采用MinIO S3
  7. API網關采用Ocelot框架
  8. 微服務的瞬態故障處理:Polly框架
  9. 身份認證與授權采用ASP.NET Core Identity集成的IdentityServer4解決方案
  10. 反向代理:nginx
  11. 前端頁面:Angular 12Angular powered BootstrapBootstrapAdminLTE

弱弱補一句:本人前端技術沒有后端技術精湛,所以前端頁面會有不少問題,樣式也不是那么的專業美觀,前端高手請忽略這些細節。;) Abacuza采用了插件化的設計,用戶可以根據需要擴展下面這些組件:

  • 實現自己的數據處理集群以及集群連接:因此你不必拘泥於使用Apache Spark
  • 實現自己的輸入端點輸出端點:因此你可以自定義數據的輸入部分和輸出部分
  • 實現自己的任務執行器:因此你可以選擇不采用基於.NET for Spark的解決方案,你可以自己用Scala或者Python來編寫數據處理程序

在Abacuza的管理界面中,可以很方便地看到目前系統中已經被加載的插件: 因此,Abacuza數據工廠應該可以滿足絕大部分大數據處理的業務場景。本身整個平台都是基於.NET開發,並且通過NuGet分發了Abacuza SDK,因此擴展這些組件是非常簡單的,后面的演練部分可以看到詳細介紹。

部署拓撲

以下是Abacuza的部署拓撲:

(點擊查看大圖)

 

整個部署結構還是比較簡單的:5個主要的微服務由基於Ocelot實現的API Gateway負責代理,Ocelot可以整合IdentityServer4,在Gateway的層面完成用戶的認證(Gateway層面的授權暫未實現)。基於IdentityServer4實現的Identity Service並沒有部署在API Gateway的后端,因為在這個架構中,它的認證授權策略與一般的微服務不同。API Gateway、Identity Service以及基於Angular實現的web app都由nginx反向代理,向外界(客戶端瀏覽器)提供統一的訪問端點。所有的后端服務都運行在docker里,並可以部署在Kubernetes中。

演練:在Abacuza上運行Word Count程序

Word Count是Spark官方推薦的第一個案例程序,它的任務是統計輸入文件中每個單詞的出現次數。.NET for Spark也有一個相同的Word Count案例。在此,我仍然使用Word Count案例,介紹如何在Abacuza上運行數據處理程序。

先決條件

你需要一台Windows、MacOS或者Linux的計算機,上面裝有.NET 5 SDK、docker以及docker-compose(如果是Windows或者MacOS,則安裝docker的桌面版),同時確保安裝了git客戶端命令行。

創建Word Count數據處理程序

首先使用dotnet命令行創建一個控制台應用程序,然后添加相關的引用:

$ dotnet new console -f net5.0 -n WordCountApp
$ cd WordCountApp
$ dotnet add package Microsoft.Spark --version 1.0.0
$ dotnet add package Abacuza.JobRunners.Spark.SDK --prerelease

然后在項目中新加入一個class文件,實現一個WordCountRunner類:

using Abacuza.JobRunners.Spark.SDK;
using Microsoft.Spark.Sql;

namespace WordCountApp
{
   public class WordCountRunner : SparkRunnerBase
   {
      public WordCountRunner(string[] args) : base(args)
      {
      }

      protected override DataFrame RunInternal(SparkSession sparkSession, DataFrame dataFrame)
            => dataFrame
               .Select(Functions.Split(Functions.Col("value"), " ").Alias("words"))
               .Select(Functions.Explode(Functions.Col("words"))
               .Alias("word"))
               .GroupBy("word")
               .Count()
               .OrderBy(Functions.Col("count").Desc());
   }
}

接下來修改Program.cs文件,在Main函數中調用WordCountRunner:

static void Main(string[] args)
{
   new WordCountRunner(args).Run();
}

然后,在命令行中,WordCountApp.csproj所在的目錄下,使用下面的命令來生成基於Linux x64平台的編譯輸出:

$ dotnet publish -c Release -f net5.0 -r linux-x64 -o published

最后,使用ZIP工具,將published下的所有文件(不包括published目錄本身)全部打包成一個ZIP壓縮包。例如,在Linux下,可以使用下面的命令將published目錄下的所有文件打成一個ZIP包:

$ zip -rj WordCountApp.zip published/.

Word Count程序已經寫好了,接下來我們就啟動Abacuza,並在其中運行這個WordCountApp。

運行Word Count程序

你可以使用git clone https://github.com/daxnet/abacuza.git命令,將Abacuza源代碼下載到本地,然后在Abacuza的根目錄下,使用下面的命令進行編譯:

$ docker-compose -f docker-compose.build.yaml build

編譯成功之后,用文本編輯器編輯template.env文件,在里面設置好本機的IP地址(不能使用localhost或者127.0.0.1,因為在容器環境中,localhost和127.0.0.1表示當前容器本身,而不是運行容器的主機),端口號可以默認:

然后,使用下面的命令啟動Abacuza:

$ docker-compose --env-file template.env up

啟動成功后,可以使用docker ps命令查看正在運行的容器:

用瀏覽器訪問http://<你的IP地址>:9320,即可打開Abacuza登錄界面,輸入用戶名super,密碼P@ssw0rd完成登錄,進入Dashboard(目前Dashboard還未完成)。然后在左側菜單中,點擊Cluster Connections,然后點擊右上角的Add Connection按鈕:

在彈出的對話框中,輸入集群連接的名稱和描述,集群類型選擇spark,在設置欄中,輸入用於連接Spark集群的JSON配置信息。由於我們本地啟動的Spark在容器中,直接使用本機的IP地址即可,如果你的Spark集群部署在其它機器上,也可以使用其它的IP地址。在配置完這些信息后,點擊Save按鈕保存:

接下來就是創建任務執行器。在Abacuza管理界面,點擊左邊的Job Runners菜單,然后點擊右上角的Add Job Runner按鈕:

在彈出的對話框中,輸入任務執行器的名稱和描述信息,集群類型選擇spark,之后當該任務執行器開始執行時,會挑選任意一個類型為spark的集群來處理數據。

填入這些基本信息后,點擊Save按鈕,此時會進入任務執行器的詳細頁面,用來進行進一步的設置。在Payload template中,輸入以下JSON文本:

{
  "file": "${jr:binaries:microsoft-spark-3-0_2.12-1.0.0.jar}",
  "className": "org.apache.spark.deploy.dotnet.DotnetRunner",
  "args": [
    "${jr:binaries:WordCountApp.zip}",
    "WordCountApp",
    "${proj:input-defs}",
    "${proj:output-defs}",
    "${proj:context}"
  ]
}

大概介紹一下每個參數:

  • file:指定了在Spark集群上需要運行的程序所在的JAR包,這里直接使用微軟的Spark JAR
  • className:指定了需要運行的程序在JAR包中的名稱,這里固定使用org.apache.spark.deploy.dotnet.DotnetRunner
  • ${jr:binaries:WordCountApp.zip} 表示由className指定的DotnetRunner會調用當前任務執行器中的二進制文件WordCountApp.zip中的程序來執行數據處理任務
  • WordCountApp 為ZIP包中可執行程序的名稱
  • ${proj:input-defs} 表示輸入文件及其配置將引用當前執行數據處理的項目中的輸入端點的定義
  • ${proj:output-defs} 表示輸出文件及其配置將引用當前執行數據處理的項目中的輸出端點的定義
  • ${proj:context} 表示Spark會從當前項目讀入相關信息並將其傳遞給任務執行器

在上面的配置中,引用了兩個binary文件:microsoft-spark-3-0_2.12-1.0.0.jar和WordCountApp.zip。於是,我們需要將這兩個文件上傳到任務執行器中。仍然在任務執行器的編輯界面,在Binaries列表中,點擊加號按鈕,將這兩個文件附加到任務執行器上。注意:microsoft-spark-3-0_2.12-1.0.0.jar文件位於上文用到的published目錄中,而WordCountApp.zip則是在上文中生成的ZIP壓縮包。

配置完成后,點擊Save & Close按鈕,保存任務執行器。 接下來,創建一個數據處理項目,在左邊的菜單中,點擊Projects,然后在右上角點擊Add Project按鈕:

在彈出的Add Project對話框中,輸入項目的名稱、描述,然后選擇輸入端點和輸出端點,以及負責處理該項目數據的任務執行器:

在此,我們將輸入端點設置為文本文件(Text Files),輸出端點設置為控制台(Console),也就是直接輸出到日志中。這些配置在后續的項目編輯頁面中也是可以更改的。一個項目可以包含多個輸入端點,但是只能有一個輸出端點。點擊Save按鈕保存設置,此時Abacuza會打開項目的詳細頁,在INPUT選項卡下,添加需要統計單詞出現次數的文本文件:

在OUTPUT選項卡下,確認輸出端點設置為Console:

然后點擊右上角或者右下角的Submit按鈕,提交數據處理任務,此時,選項卡會自動切換到REVISIONS,並且更新Job的狀態:

稍等片刻,如果數據處理成功,Job Status會從RUNNING變為COMPLETED:

點擊Actions欄中的文件按鈕,即可查看數據處理的日志輸出:

從日志文件中可以看到,Abacuza已經根據我們寫的數據處理程序,統計出輸入文件input.txt中每個單詞的出現次數。通過容器的日志輸出也能看到同樣的信息:

總結

本文介紹了自己純手工打造的數據工廠(Data Factory)的設計與實現,並開發了一個案例來演示該數據工廠完成數據處理的整個過程。之后還有很多功能可以完善:Dashboard、認證授權的優化、用戶與組的管理、第三方IdP的集成、Pipeline的實現等等,今后有空再慢慢弄吧。

 

歡迎訪問本人的個人站點https://sunnycoding.cn,獲得更好的閱讀體驗。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM