簡介
Apache Spark是一個開源、分布式、通用的分析引擎。多年來,它一直是大數據生態系統中對大型數據集進行批量和實時處理的主要工具。盡管對該平台的本地支持僅限於JVM語言集,但其他通常用於數據處理和分析的語言(如Python和R)已經加入了Spark的互操作層,以利用其功能。在2019年的Build大會上,微軟發布了Spark.NET。Spark.NET提供了為Spark互操作層編寫的綁定,允許您在.NET應用程序中使用諸如Spark SQL和Spark Streaming之類的組件。因為Spark.NET與.NET Standard 2.0兼容,可以運行Windows、Mac和Linux等操作系統。Spark.NET是Mobius項目的衍生版,該項目為Spark提供了.NET綁定。
這個示例從NYC Open Data門戶獲取餐館違規數據集,並使用Spark.NET處理它。然后,處理后的數據被用來訓練一個機器學習模型,該模型試圖預測一個機構在檢查后將獲得的等級。該模型將使用一個開源、跨平台的機器學習框架ML.NET進行訓練。最后,使用經過訓練的模型來指定一個期望的等級,從而豐富當前不存在等級的數據。
這個示例的源代碼可以在GitHub lqdev/RestaurantInspectionsSparkMLNET中找到。
必備條件
這個項目是用Ubuntu 18.04構建的,但是應該可以在Windows和Mac設備上運行。
Install Java
因為Spark是在JVM上運行的,所以您的PC上需要Java。所需的最小版本是Java 8。在終端輸入以下命令:
sudo apt install openjdk-8-jdk openjdk-8-jre
然后,確保最新安裝的版本是默認的。
sudo update-alternatives --config java
下載並配置Spark
使用Hadoop 2.7將Spark 2.4.1下載到您的計算機上。在本例中,我將它放在Downloads文件夾中。
wget https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz -O ~/Downloads/spark-2.4.1-bin-hadoop2.7.tgz
將最近下載的文件的內容解壓縮到/usr/bin/local目錄。
sudo tar -xvf ~/Downloads/spark-2.4.1-bin-hadoop2.7.tgz --directory /usr/local/bin
下載並配置.NET Spark Worker
下載.Net Spark worker到您的計算機上。在本例中,我將它放在Downloads文件夾中。
wget https://github.com/dotnet/spark/releases/download/v0.4.0/Microsoft.Spark.Worker.netcoreapp2.1.linux-x64-0.4.0.tar.gz -O ~/Downloads/Microsoft.Spark.Worker.netcoreapp2.1.linux-x64-0.4.0.tar.gz
將最新下載的文件的內容解壓縮到/usr/bin/local目錄。
sudo tar -xvf ~/Downloads/Microsoft.Spark.Worker.netcoreapp2.1.linux-x64-0.4.0.tar.gz --directory /usr/local/bin
最后,提高Microsoft.Spark.Worker的權限。這是執行用戶定義函數(UDF)所必需的。
sudo chmod +x /usr/local/bin/Microsoft.Spark.Worker-0.4.0/Microsoft.Spark.Worker
配置環境變量
下載並配置這些必備條件之后,將它們在系統中的位置配置為環境變量。打開/.bashrc文件並在文件末尾添加以下內容。
export SPARK_PATH=/usr/local/bin/spark-2.4.1-bin-hadoop2.7 export PATH=$SPARK_PATH/bin:$PATH export HADOOP_HOME=$SPARK_PATH export SPARK_HOME=$SPARK_PATH export DOTNET_WORKER_DIR=/usr/local/bin/Microsoft.Spark.Worker-0.4.0
解決方案說明
了解數據
此解決方案中使用的數據集是DOHMH New York City Restaurant Inspection Results,來自NYC Open Data門戶。它每天更新,包含餐館和大學食堂指定的和未決的檢查結果和違規情況。該數據集不包括已倒閉的企業。盡管數據集包含多個列,但在此解決方案中僅使用其中的一個子集。有關數據集的詳細描述,請訪問數據集網站。
了解解決方案
這個解決方案由四個不同的 .NET Core應用程序組成:
- RestaurantInspectionsETL:獲取原始數據的.NET Core控制台應用程序並使用Spark.NET將數據清理並轉換為一種更容易使用的格式,作為訓練和預測的輸入,使用ML.NET構建的機器學習模型。
- RestaurantInspectionsML:定義了ML.NET機器學習模型的輸入和輸出模型的.NET Core類庫。另外,這是保存訓練過的模型的地方。
- RestaurantInspectionsTraining: .NET Core控制台應用,使用RestaurantInspectionsETL應用生成的分級數據,使用ML.NET的Auto ML訓練一個多分類機器學習模型。
- RestaurantInspectionsEnrichment: .NET Core控制台應用程序,使用RestaurantInspectionsETL應用程序生成的未分級數據作為訓練ML.NET機器學習模型的輸入,該模型可根據檢測過程中發現的違規行為,預測某機構最可能得到的分級。
初始化解決方案
創建解決方案目錄
為您的項目創建一個名為RestaurantInspectionsSparkMLNET的新目錄,並使用以下命令導航到該目錄。
mkdir RestaurantInspectionsSparkMLNET && cd RestaurantInspectionsSparkMLNET
然后,使用dotnet cli創建一個解決方案。
dotnet new sln
為了確保使用.Net Core SDK 2.1版本作為目標框架,特別是如果您安裝了多個版本的.Net SDK,請在RestaurantInspectionsSparkMLNET解決方案目錄中創建一個名為globals.json的文件。
touch global.json
在global.json文件中添加以下內容。確保使用安裝在計算機上的特定版本的SDK。在本例中,我的計算機上安裝了版本2.1.801。可以使用 dotnet --list-sdks 命令列出安裝的SDK版本。
{
"sdk": {
"version": "2.1.801"
}
}
創建和配置ETL項目
ETL項目負責獲取原始源數據,並使用Spark應用一系列轉換來准備數據,以訓練機器學習模型,以及用缺失的分數來豐富數據。
在RestaurantInspectionsSparkMLNET解決方案目錄中,使用dotnet cli創建一個名為RestaurantInspectionsETL的新控制台應用程序。
dotnet new console -o RestaurantInspectionsETL
使用dotnet cli將新創建的項目添加到解決方案中。
dotnet sln add ./RestaurantInspectionsETL/
因為這個項目使用Microsoft.Spark NuGet包,使用dotnet cli安裝它。
dotnet add ./RestaurantInspectionsETL/ package Microsoft.Spark --version 0.4.0
創建和配置ML模型項目
ML模型類庫將包含定義輸入和輸出的域模型,以及經過訓練的模型本身。
在RestaurantInspectionsSparkMLNET解決方案目錄中,使用dotnet cli創建一個名為RestaurantInspectionsML的新類庫。
dotnet new classlib -o RestaurantInspectionsML
使用dotnet cli將新創建的項目添加到解決方案中。
dotnet sln add ./RestaurantInspectionsML/
因為這個項目使用Microsoft.ML NuGet包,使用dotnet cli安裝它。
dotnet add ./RestaurantInspectionsML/ package Microsoft.ML --version 1.3.1
創建和配置ML訓練項目
訓練項目的目的是使用RestaurantInspectionsETL項目預先處理過的分級數據作為輸入,利用ML.NET的Auto ML API來訓練一個多級分類模型。訓練后的模型將保存在RestaurantInspectionsML目錄中。
在RestaurantInspectionsSparkMLNET解決方案目錄中,使用dotnet cli創建一個名為RestaurantInspectionsTraining的新控制台應用程序。
dotnet new console -o RestaurantInspectionsTraining
使用dotnet cli將新創建的項目添加到解決方案中。
dotnet sln add ./RestaurantInspectionsTraining/
此項目依賴於在RestaurantInspectionsML項目中創建的域模型,因此需要向其添加引用。
dotnet add ./RestaurantInspectionsTraining/ reference ./RestaurantInspectionsML/
因為這個項目使用的是Microsoft.Auto.ML NuGet包,使用dotnet cli安裝它。
dotnet add ./RestaurantInspectionsTraining/ package Microsoft.ML.AutoML --version 0.15.1
創建和配置豐富化數據項目
豐富化數據應用程序使用RestaurantInspectionsTraining應用程序創建的機器學習模型訓練數據,並使用它預測RestaurantInspectionsETL應用創建的pre-processed未分類數據最有可能獲得根據檢查中發現的違法情況得到的等級。
在RestaurantInspectionsSparkMLNET解決方案目錄中,使用dotnet cli創建一個名為RestaurantInspectionsEnrichment的新控制台應用程序。
dotnet new console -o RestaurantInspectionsEnrichment
使用dotnet cli將新創建的項目添加到解決方案中。
dotnet sln add ./RestaurantInspectionsEnrichment/
此項目依賴於在RestaurantInspectionsML項目中創建的域模型,因此需要向其添加引用。
dotnet add ./RestaurantInspectionsEnrichment/ reference ./RestaurantInspectionsML/
請使用以下NuGet包:
- Microsoft.Spark
- Microsoft.ML.LightGBM (這不是必需的,但是如果最后的模型是LightGBM模型,那么預測可能會失敗).
使用以下命令安裝軟件包:
dotnet add ./RestaurantInspectionsEnrichment/ package Microsoft.Spark --version 0.4.0 dotnet add ./RestaurantInspectionsEnrichment/ package Microsoft.ML.LightGBM --version 1.3.1
構建ETL程序
第一步是准備數據。為此使用Spark.NET轉換集合。
下載數據
導航到RestaurantInspectionsETL項目並創建一個數據目錄。
mkdir Data
然后,將數據下載到新創建的Data目錄中。
wget https://data.cityofnewyork.us/api/views/43nn-pn8j/rows.csv?accessType=DOWNLOAD -O Data/NYC-Restaurant-Inspections.csv
構建ETL管道
將下列using添加到Program.cs文件中。
using System; using System.IO; using Microsoft.Spark.Sql; using static Microsoft.Spark.Sql.Functions;
並不是所有的列都相關。在Program.cs文件的Main方法中,定義要刪除的列。
string[] dropCols = new string[] { "CAMIS", "CUISINE DESCRIPTION", "VIOLATION DESCRIPTION", "BORO", "BUILDING", "STREET", "ZIPCODE", "PHONE", "ACTION", "GRADE DATE", "RECORD DATE", "Latitude", "Longitude", "Community Board", "Council District", "Census Tract", "BIN", "BBL", "NTA" };
Spark應用程序的入口點是SparkSession。在Program.cs文件的Main方法中創建SparkSession。
var sc = SparkSession .Builder() .AppName("Restaurant_Inspections_ETL") .GetOrCreate();
然后,將存儲在NYC-Restaurant-Inspections.csv文件中的數據加載到一個DataFrame中。
DataFrame df = sc .Read() .Option("header", "true") .Option("inferSchema", "true") .Csv("Data/NYC-Restaurant-Inspections.csv");
可以將DataFrames看作數據庫中的表或Excel中的表。Spark有各種表示數據的方法,但是數據流是Spark.NET支持的格式。此外,DataFrame API更高級,更容易使用。
加載數據之后,通過創建一個新的DataFrame來刪除不需要的數據,這個DataFrame排除了dropCols和缺失的值。
DataFrame cleanDf = df .Drop(dropCols) .WithColumnRenamed("INSPECTION DATE","INSPECTIONDATE") .WithColumnRenamed("INSPECTION TYPE","INSPECTIONTYPE") .WithColumnRenamed("CRITICAL FLAG","CRITICALFLAG") .WithColumnRenamed("VIOLATION CODE","VIOLATIONCODE") .Na() .Drop();
通常,機器學習模型期望值是數值,因此在ETL步驟中,嘗試將盡可能多的值轉換為數值。CRITICALFLAG列包含可以編碼為0/1的“Y/N”值。
DataFrame labeledFlagDf = cleanDf .WithColumn("CRITICALFLAG", When(Functions.Col("CRITICALFLAG") == "Y",1) .Otherwise(0));
該數據集每行包含一個違規項,對應不同的檢查。因此,所有的違規行為都需要通過業務和檢查進行匯總。
DataFrame groupedDf = labeledFlagDf .GroupBy("DBA", "INSPECTIONDATE", "INSPECTIONTYPE", "CRITICALFLAG", "SCORE", "GRADE") .Agg(Functions.CollectSet(Functions.Col("VIOLATIONCODE")).Alias("CODES")) .Drop("DBA", "INSPECTIONDATE") .WithColumn("CODES", Functions.ArrayJoin(Functions.Col("CODES"), ",")) .Select("INSPECTIONTYPE", "CODES", "CRITICALFLAG", "SCORE", "GRADE");
既然數據是用於訓練和預測的格式,那么將清理后的數據流分成兩個新的數據流,分級的和未分級的。分級數據集是用來訓練機器學習模型的數據。未分級的數據將被用來完整豐富化。
DataFrame gradedDf = groupedDf .Filter( Col("GRADE") == "A" | Col("GRADE") == "B" | Col("GRADE") == "C" ); DataFrame ungradedDf = groupedDf .Filter( Col("GRADE") != "A" & Col("GRADE") != "B" & Col("GRADE") != "C" );
將DataFrames保存為csv文件供以后使用。
var timestamp = ((DateTimeOffset) DateTime.UtcNow).ToUnixTimeSeconds().ToString(); var saveDirectory = Path.Join("Output",timestamp); if(!Directory.Exists(saveDirectory)) { Directory.CreateDirectory(saveDirectory); } gradedDf.Write().Csv(Path.Join(saveDirectory,"Graded")); ungradedDf.Write().Csv(Path.Join(saveDirectory,"Ungraded"));
發布並運行ETL應用程序
最終的Program.cs文件應該如下所示:
using System; using System.IO; using Microsoft.Spark.Sql; using static Microsoft.Spark.Sql.Functions; namespace RestaurantInspectionsETL { class Program { static void Main(string[] args) { // Define columns to remove string[] dropCols = new string[] { "CAMIS", "CUISINE DESCRIPTION", "VIOLATION DESCRIPTION", "BORO", "BUILDING", "STREET", "ZIPCODE", "PHONE", "ACTION", "GRADE DATE", "RECORD DATE", "Latitude", "Longitude", "Community Board", "Council District", "Census Tract", "BIN", "BBL", "NTA" }; // Create SparkSession var sc = SparkSession .Builder() .AppName("Restaurant_Inspections_ETL") .GetOrCreate(); // Load data DataFrame df = sc .Read() .Option("header", "true") .Option("inferSchema", "true") .Csv("Data/NYC-Restaurant-Inspections.csv"); //Remove columns and missing values DataFrame cleanDf = df .Drop(dropCols) .WithColumnRenamed("INSPECTION DATE","INSPECTIONDATE") .WithColumnRenamed("INSPECTION TYPE","INSPECTIONTYPE") .WithColumnRenamed("CRITICAL FLAG","CRITICALFLAG") .WithColumnRenamed("VIOLATION CODE","VIOLATIONCODE") .Na() .Drop(); // Encode CRITICAL FLAG column DataFrame labeledFlagDf = cleanDf .WithColumn("CRITICALFLAG", When(Functions.Col("CRITICALFLAG") == "Y",1) .Otherwise(0)); // Aggregate violations by business and inspection DataFrame groupedDf = labeledFlagDf .GroupBy("DBA", "INSPECTIONDATE", "INSPECTIONTYPE", "CRITICALFLAG", "SCORE", "GRADE") .Agg(Functions.CollectSet(Functions.Col("VIOLATIONCODE")).Alias("CODES")) .Drop("DBA", "INSPECTIONDATE") .WithColumn("CODES", Functions.ArrayJoin(Functions.Col("CODES"), ",")) .Select("INSPECTIONTYPE", "CODES", "CRITICALFLAG", "SCORE", "GRADE"); // Split into graded and ungraded DataFrames DataFrame gradedDf = groupedDf .Filter( Col("GRADE") == "A" | Col("GRADE") == "B" | Col("GRADE") == "C" ); DataFrame ungradedDf = groupedDf .Filter( Col("GRADE") != "A" & Col("GRADE") != "B" & Col("GRADE") != "C" ); // Save DataFrames var timestamp = ((DateTimeOffset) DateTime.UtcNow).ToUnixTimeSeconds().ToString(); var saveDirectory = Path.Join("Output",timestamp); if(!Directory.Exists(saveDirectory)) { Directory.CreateDirectory(saveDirectory); } gradedDf.Write().Csv(Path.Join(saveDirectory,"Graded")); ungradedDf.Write().Csv(Path.Join(saveDirectory,"Ungraded")); } } }
使用以下命令發布應用程序。
dotnet publish -f netcoreapp2.1 -r ubuntu.18.04-x64
使用spark-submit運行應用程序。
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local bin/Debug/netcoreapp2.1/ubuntu.18.04-x64/publish/microsoft-spark-2.4.x-0.4.0.jar dotnet bin/Debug/netcoreapp2.1/ubuntu.18.04-x64/publish/RestaurantInspectionsETL.dll
構建ML Domain
定義輸入模型結構
導航到RestaurantInspectionsTraining目錄並創建一個名為ModelInput.cs的新文件。
touch ModelInput.cs
打開ModelInput.cs文件並添加以下代碼。
using Microsoft.ML.Data; namespace RestaurantInspectionsML { public class ModelInput { [LoadColumn(0)] public string InspectionType { get; set; } [LoadColumn(1)] public string Codes { get; set; } [LoadColumn(2)] public float CriticalFlag { get; set; } [LoadColumn(3)] public float InspectionScore { get; set; } [LoadColumn(4)] [ColumnName("Label")] public string Grade { get; set; } } }
在模型中使用Attributes,定義了五個屬性:
- InspectionType:檢驗的類型。
- Codes:檢查中發現違規類型的代碼。
- CriticalFlag:指示檢查期間的任何違規行為是否嚴重(導致食源性疾病)。
- InspectionScore:檢查后分配的分數。
- Grade:檢查后指定的字母等級
LoadColumn屬性定義了該列在文件中的位置。最后一列中的數據被分配給Grade屬性,然后在IDataView中作為標簽引用。ML.NET算法使用ColumnName屬性指定默認的列名,並在模型類中保持這個命名,這樣就不需要在訓練管道中將特性和標簽列定義為參數。
定義輸出模型結構
在RestaurantInspectionsTraining目錄中創建一個名為ModelOutput.cs的新文件。
touch ModelOutput.cs
打開ModelOutput.cs文件並添加以下代碼。
namespace RestaurantInspectionsML { public class ModelOutput { public float[] Scores { get; set; } public string PredictedLabel { get; set; } } }
對於輸出模型,ModelOutput類使用模型訓練過程生成的輸出的默認列名的屬性:
- Scores:一個包含所有預測類的概率的浮點向量。
- PredictedLabel:預測的值。在這種情況下,PredictedLabel是在檢查之后指定的預測等級,它給出了該檢查的一組特性。
構建模型訓練應用程序
該應用程序訓練了一個多級分類算法。找到具有正確參數的“最佳”算法需要進行實驗。幸運的是,如果您為ML.NET提供了您想要訓練的算法類型,那么ML.NET的Auto ML就可以為您做到這一點。
加載分級數據
導航到restaurantinspectionproject目錄,並將以下using語句添加到Program.cs類中。
using System; using System.IO; using System.Linq; using Microsoft.ML; using static Microsoft.ML.DataOperationsCatalog; using Microsoft.ML.AutoML; using RestaurantInspectionsML;
在Program.cs文件的Main方法中,定義存儲數據文件的路徑。
string solutionDirectory = "/home/lqdev/Development/RestaurantInspectionsSparkMLNET"; string dataLocation = Path.Combine(solutionDirectory,"RestaurantInspectionsETL","Output");
ML.NET應用程序的入口點有MLContext,需要初始化一個MLContext實例。
MLContext mlContext = new MLContext();
接下來,獲取數據文件的路徑。RestaurantInspectionsETL應用程序生成的輸出既包含csv文件,也包含關於創建它們的分區的信息。對於訓練,只需要csv文件。
var latestOutput = Directory .GetDirectories(dataLocation) .Select(directory => new DirectoryInfo(directory)) .OrderBy(directoryInfo => directoryInfo.Name) .Select(directory => Path.Join(directory.FullName,"Graded")) .First(); var dataFilePaths = Directory .GetFiles(latestOutput) .Where(file => file.EndsWith("csv")) .ToArray();
然后,將數據加載到IDataView中。IDataView類似於DataFrame,因為它可以將數據表示為行、列及其模式。
var dataLoader = mlContext.Data.CreateTextLoader<ModelInput>(separatorChar:',', hasHeader:false, allowQuoting:true, trimWhitespace:true); IDataView data = dataLoader.Load(dataFilePaths);
將數據分成訓練集和測試集進行評估是一個很好的實踐。將數據分成80%的訓練集和20%的測試集。
TrainTestData dataSplit = mlContext.Data.TrainTestSplit(data,testFraction:0.2); IDataView trainData = dataSplit.TrainSet; IDataView testData = dataSplit.TestSet;
創建實驗
Auto ML獲取數據,使用不同的模型和超參數進行實驗,以尋找“最佳”模型。定義實驗的設置。在這種情況下,模型將運行600秒或10分鍾,並嘗試找到日志損失最低的模型。
var experimentSettings = new MulticlassExperimentSettings(); experimentSettings.MaxExperimentTimeInSeconds = 600; experimentSettings.OptimizingMetric = MulticlassClassificationMetric.LogLoss;
然后,創建實驗。
var experiment = mlContext.Auto().CreateMulticlassClassificationExperiment(experimentSettings);
創建實驗之后,運行它。
var experimentResults = experiment.Execute(data, progressHandler: new ProgressHandler());
默認情況下,運行應用程序不會顯示進度信息。但是,可以將ProgressHandler對象傳遞到調用已實現Report方法的實驗的Execute方法中。
在restaurantinspectionstrain項目目錄中,創建一個名為ProgressHandler.cs的新文件。
touch ProgressHandler.cs
然后,添加以下代碼:
using System; using Microsoft.ML.Data; using Microsoft.ML.AutoML; namespace RestaurantInspectionsTraining { public class ProgressHandler : IProgress<RunDetail<MulticlassClassificationMetrics>> { public void Report(RunDetail<MulticlassClassificationMetrics> run) { Console.WriteLine($"Trained {run.TrainerName} with Log Loss {run.ValidationMetrics.LogLoss:0.####} in {run.RuntimeInSeconds:0.##} seconds"); } } }
ProgressHandler類派生自IProgress<T>接口,該接口需要實現Report方法。每次運行后傳遞到Report方法中的對象是一個RunDetail<MulticlassClassificationMetrics>對象。每次運行完成時,都會調用Report方法並執行其中的代碼。
評估結果
一旦實驗完成運行,從最佳運行中獲取模型。將下列代碼添加到Program.cs的Main方法中。
var bestModel = experimentResults.BestRun.Model;
使用測試數據集評估模型的性能並測量其微精度度量。
IDataView scoredTestData = bestModel.Transform(testData); var metrics = mlContext.MulticlassClassification.Evaluate(scoredTestData); Console.WriteLine($"MicroAccuracy: {metrics.MicroAccuracy}");
保存訓練好的模型
最后,將訓練好的模型保存到RestaurantInspectionsML中。
string modelSavePath = Path.Join(solutionDirectory,"RestaurantInspectionsML","model.zip"); mlContext.Model.Save(bestModel, data.Schema, modelSavePath);
正常情況會在RestaurantInspectionsML項目中創建一個名為model.zip的文件。
通過向RestaurantInspectionsML.csproj添加以下內容,確保將訓練后的模型文件復制並輸出到RestaurantInspectionsML目錄中。
<ItemGroup> <None Include="model.zip"> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> </None> </ItemGroup>
將其復制到RestaurantInspectionsML的輸出目錄可以更容易地從RestaurantInspectionsEnrichment項目中引用,因為該項目已經包含了對RestaurantInspectionsML類庫的引用。
訓練模型
最終的Program.cs文件應該如下所示:
using System; using System.IO; using System.Linq; using Microsoft.ML; using static Microsoft.ML.DataOperationsCatalog; using Microsoft.ML.AutoML; using RestaurantInspectionsML; namespace RestaurantInspectionsTraining { class Program { static void Main(string[] args) { // Define source data directory paths string solutionDirectory = "/home/lqdev/Development/RestaurantInspectionsSparkMLNET"; string dataLocation = Path.Combine(solutionDirectory,"RestaurantInspectionsETL","Output"); // Initialize MLContext MLContext mlContext = new MLContext(); // Get directory name of most recent ETL output var latestOutput = Directory .GetDirectories(dataLocation) .Select(directory => new DirectoryInfo(directory)) .OrderBy(directoryInfo => directoryInfo.Name) .Select(directory => Path.Join(directory.FullName,"Graded")) .First(); var dataFilePaths = Directory .GetFiles(latestOutput) .Where(file => file.EndsWith("csv")) .ToArray(); // Load the data var dataLoader = mlContext.Data.CreateTextLoader<ModelInput>(separatorChar:',', hasHeader:false, allowQuoting:true, trimWhitespace:true); IDataView data = dataLoader.Load(dataFilePaths); // Split the data TrainTestData dataSplit = mlContext.Data.TrainTestSplit(data,testFraction:0.2); IDataView trainData = dataSplit.TrainSet; IDataView testData = dataSplit.TestSet; // Define experiment settings var experimentSettings = new MulticlassExperimentSettings(); experimentSettings.MaxExperimentTimeInSeconds = 600; experimentSettings.OptimizingMetric = MulticlassClassificationMetric.LogLoss; // Create experiment var experiment = mlContext.Auto().CreateMulticlassClassificationExperiment(experimentSettings); // Run experiment var experimentResults = experiment.Execute(data, progressHandler: new ProgressHandler()); // Best Run Results var bestModel = experimentResults.BestRun.Model; // Evaluate Model IDataView scoredTestData = bestModel.Transform(testData); var metrics = mlContext.MulticlassClassification.Evaluate(scoredTestData); Console.WriteLine($"MicroAccuracy: {metrics.MicroAccuracy}"); // Save Model string modelSavePath = Path.Join(solutionDirectory,"RestaurantInspectionsML","model.zip"); mlContext.Model.Save(bestModel, data.Schema, modelSavePath); } } }
完成所有代碼和配置后,從restaurantinspectionstrain目錄中,使用dotnet cli運行應用程序。注意,這將運行10來分鍾。
dotnet run
控制台輸出應該類似於下面的內容:
Trained LightGbmMulti with Log Loss 0.1547 in 1.55 seconds Trained FastTreeOva with Log Loss 0.0405 in 65.58 seconds Trained FastForestOva with Log Loss 0.0012 in 53.37 seconds Trained LightGbmMulti with Log Loss 0.0021 in 4.55 seconds Trained FastTreeOva with Log Loss 0.8315 in 5.22 seconds MicroAccuracy: 0.999389615839469
構建豐富化數據應用程序
該模型經過訓練后,可用於豐富化未分級數據。
初始化PredictionEngine
導航到RestaurantInspectionsEnrichment項目目錄,並將以下using語句添加到Program.cs類中。
using System.IO; using System.Linq; using Microsoft.ML; using Microsoft.Spark.Sql; using static Microsoft.Spark.Sql.Functions; using RestaurantInspectionsML;
要進行預測,必須將模型加載到應用程序中,因為預測一次只生成一行,所以還需要創建一個PredictionEngine。
在Program類中,定義PredictionEngine。
private static readonly PredictionEngine<ModelInput,ModelOutput> _predictionEngine;
然后,創建一個構造函數來加載模型並初始化它。
static Program() { MLContext mlContext = new MLContext(); ITransformer model = mlContext.Model.Load("model.zip",out DataViewSchema schema); _predictionEngine = mlContext.Model.CreatePredictionEngine<ModelInput,ModelOutput>(model); }
加載未分級的數據
在Program類的Main方法中,定義數據文件的位置。
string solutionDirectory = "/home/lqdev/Development/RestaurantInspectionsSparkMLNET"; string dataLocation = Path.Combine(solutionDirectory,"RestaurantInspectionsETL","Output");
然后,獲取由RestaurantInspectionsETL應用程序生成的最新未分級數據的路徑。
var latestOutput = Directory .GetDirectories(dataLocation) .Select(directory => new DirectoryInfo(directory)) .OrderBy(directoryInfo => directoryInfo.Name) .Select(directory => directory.FullName) .First();
為您的豐富化數據的應用程序初始化一個SparkSession。
var sc = SparkSession .Builder() .AppName("Restaurant_Inspections_Enrichment") .GetOrCreate();
由RestaurantInspectionsETL生成的數據沒有headers。但是,可以在加載數據時定義和設置架構。
var schema = @" INSPECTIONTYPE string, CODES string, CRITICALFLAG int, INSPECTIONSCORE int, GRADE string"; DataFrame df = sc .Read() .Schema(schema) .Csv(Path.Join(latestOutput,"Ungraded"));
定義UDF
Spark中沒有允許您使用PredictionEngine的內置函數。但是,Spark可以通過udf擴展。請記住,udf沒有像內置函數那樣進行優化。因此,盡可能多地使用內置函數。
在Program類中,創建一個名為PredictGrade的新方法,它接受一組特性,這些特性組成了訓練模型所期望的ModelInput。
public static string PredictGrade( string inspectionType, string violationCodes, int criticalFlag, int inspectionScore) { ModelInput input = new ModelInput { InspectionType=inspectionType, Codes=violationCodes, CriticalFlag=(float)criticalFlag, InspectionScore=(float)inspectionScore }; ModelOutput prediction = _predictionEngine.Predict(input); return prediction.PredictedLabel; }
然后,在Main方法中,將PredictGrade方法注冊為SparkSession中的UDF。
sc.Udf().Register<string,string,int,int,string>("PredictGrade",PredictGrade);
豐富化數據
一旦注冊了UDF,就可以在Select語句中使用它,Select語句將創建一個新的DataFrame,其中包括輸入特性以及經過訓練模型的預測等級輸出。
var enrichedDf = df .Select( Col("INSPECTIONTYPE"), Col("CODES"), Col("CRITICALFLAG"), Col("INSPECTIONSCORE"), CallUDF("PredictGrade", Col("INSPECTIONTYPE"), Col("CODES"), Col("CRITICALFLAG"), Col("INSPECTIONSCORE") ).Alias("PREDICTEDGRADE") );
最后,保存豐富化的DataFrame。
string outputId = new DirectoryInfo(latestOutput).Name; string enrichedOutputPath = Path.Join(solutionDirectory,"RestaurantInspectionsEnrichment","Output"); string savePath = Path.Join(enrichedOutputPath,outputId); if(!Directory.Exists(savePath)) { Directory.CreateDirectory(enrichedOutputPath); } enrichedDf.Write().Csv(savePath);
發布並運行豐富化數據的應用程序
最終的Program.cs文件應該如下所示。
using System.IO; using System.Linq; using Microsoft.ML; using Microsoft.Spark.Sql; using static Microsoft.Spark.Sql.Functions; using RestaurantInspectionsML; namespace RestaurantInspectionsEnrichment { class Program { private static readonly PredictionEngine<ModelInput,ModelOutput> _predictionEngine; static Program() { MLContext mlContext = new MLContext(); ITransformer model = mlContext.Model.Load("model.zip",out DataViewSchema schema); _predictionEngine = mlContext.Model.CreatePredictionEngine<ModelInput,ModelOutput>(model); } static void Main(string[] args) { // Define source data directory paths string solutionDirectory = "/home/lqdev/Development/RestaurantInspectionsSparkMLNET"; string dataLocation = Path.Combine(solutionDirectory,"RestaurantInspectionsETL","Output"); var latestOutput = Directory .GetDirectories(dataLocation) .Select(directory => new DirectoryInfo(directory)) .OrderBy(directoryInfo => directoryInfo.Name) .Select(directory => directory.FullName) .First(); var sc = SparkSession .Builder() .AppName("Restaurant_Inspections_Enrichment") .GetOrCreate(); var schema = @" INSPECTIONTYPE string, CODES string, CRITICALFLAG int, INSPECTIONSCORE int, GRADE string"; DataFrame df = sc .Read() .Schema(schema) .Csv(Path.Join(latestOutput,"Ungraded")); sc.Udf().Register<string,string,int,int,string>("PredictGrade",PredictGrade); var enrichedDf = df .Select( Col("INSPECTIONTYPE"), Col("CODES"), Col("CRITICALFLAG"), Col("INSPECTIONSCORE"), CallUDF("PredictGrade", Col("INSPECTIONTYPE"), Col("CODES"), Col("CRITICALFLAG"), Col("INSPECTIONSCORE") ).Alias("PREDICTEDGRADE") ); string outputId = new DirectoryInfo(latestOutput).Name; string enrichedOutputPath = Path.Join(solutionDirectory,"RestaurantInspectionsEnrichment","Output"); string savePath = Path.Join(enrichedOutputPath,outputId); if(!Directory.Exists(savePath)) { Directory.CreateDirectory(enrichedOutputPath); } enrichedDf.Write().Csv(savePath); } public static string PredictGrade( string inspectionType, string violationCodes, int criticalFlag, int inspectionScore) { ModelInput input = new ModelInput { InspectionType=inspectionType, Codes=violationCodes, CriticalFlag=(float)criticalFlag, InspectionScore=(float)inspectionScore }; ModelOutput prediction = _predictionEngine.Predict(input); return prediction.PredictedLabel; } } }
從RestaurantInspectionsEnrichment項目中使用以下命令發布應用程序。
dotnet publish -f netcoreapp2.1 -r ubuntu.18.04-x64
導航到發布目錄。在本例中,它是bin/Debug/netcoreapp2.1/ubuntu.18.04-x64/publish。
從發布目錄中,使用spark-submit運行應用程序。
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local microsoft-spark-2.4.x-0.4.0.jar dotnet RestaurantInspectionsEnrichment.dll
文件輸出應該類似於下面的內容:
Cycle Inspection / Initial Inspection,04N,1,13,A Cycle Inspection / Re-inspection,08A,0,9,A Cycle Inspection / Initial Inspection,"10B,10H",0,10,A Cycle Inspection / Initial Inspection,10F,0,10,A Cycle Inspection / Reopening Inspection,10F,0,3,C
結尾
這個解決方案展示了如何在.NET應用程序中使用Spark。因為它是.NET生態系統的一部分,所以可以利用其他組件和框架(如ML.NET)來擴展系統的功能。雖然這個示例是在本地單節點集群上開發和運行的,但Spark是按比例運行的。因此,可以通過設置集群並在其中運行ETL和工作負載來進一步改進該應用程序。
資源