C#碼農的大數據之路 - 使用C#編寫MR作業


系列目錄

寫在前面

從Hadoop出現至今,大數據幾乎就是Java平台專屬一般。雖然Hadoop或Spark也提供了接口可以與其他語言一起使用,但作為基於JVM運行的框架,Java系語言有着天生優勢。而且能找到的與大數據框架如Hadoop等使用介紹的文章也都以Java語言作為示例居多。許多C#er為了轉投大數據懷抱也開始學習Java。微軟為了擁抱大數據在這方面也做了許多,提供了一些工具及庫使C#可以更好的與Hadoop等協同工作。本系列中我們一同學習如何以我們熟悉語言來使用Hadoop等大數據平台,畢竟大數據的思想是一致的,算法是固定的,語言只是一個工具。做好准備,出發。

本文先來介紹下微軟這些年來在大數據平台上的工作。從中可以了解我們有哪些工具可用,方向是什么。

微軟的大數據策略

本部分內容參考了各種官方文檔,新聞報道總結而成,雖然已經努力確保正確,但難免出現疏漏,如果有錯誤請各位指出。

微軟的大數據策略大概分了兩個階段。

早期
微軟在Hadoop出現不久就已經開始關注,在Azure雲服務出現后,微軟實現一個Windows平台的Hadoop發行版名為HDInsight,並且在Azure中提供了名為HDInsight的大數據服務。在應用開發中,微軟提供了名為Microsoft HDInsight Emulator for Windows Azure的工具來支持開發過程中的調試。這個工具已經過時,不過現在仍然可以使用Web Platform Installer 5.0來安裝。實際上這個時期的微軟版Hadoop - HDInsight是基於Hortonworks Data Platform for Windows開發的,HDInsight在這個時期也是Windows和Azure平台專屬。
這個時期微軟還提供了一系列的Framework用於簡化使用C#開發Hadoop(准確的說是微軟版Hadoop - HDInsight),最初放在Codeplex上,稱為Microsoft .NET SDK For Hadoop,現在里面有部分API已經過時了,像是Microsoft.Hadoop.Client等。這些API提供使用C#編寫MapReduce的功能,但僅能用於基於Windows的HDInsight。

當下
后來微軟擁抱開源,擁抱Linux,微軟的產品也不再單一的限於Windows平台或着以.NET作為開發框架。對於HDInsight的開發微軟依然繼續與Hortonworks合作,但這時的HDInsight(3.4版以后)只有Linux版本。(微軟擁抱Linux,Hortonworks的HDP發行版都沒有Windows版了)運行在Azure上的HDInsight服務也都是跑在Linux服務器上。對於本地開發可以使用Hortonworks提供的HDP(Hortonworks Data Platform) SandBox作為虛擬環境。HDP SandBox提供了3種不同運行環境的鏡像,VirtualBox、VMware及Docker。樓主比較喜歡Docker,有關Docker版HDP SandBox的配置及使用VS連接這個虛擬環境的方法詳見此文

關於HDP
HDP是一種大數據棧的發行版,包含了如Hadoop,Spark,Storm等組件。同類發行版中,可能更為人所知的是Cloudera所出的發行版CDH。下面放一張Hortonworks官網的圖,2.5版本的HDP包含的組件一目了然:

MapReduce

微軟最早開始支持使用自己的技術棧如C#和.NET開發大數據程序就是從MapReduce開始的。時至今日,微軟一共提供了兩種不同的方式讓C#編寫的MapReduce任務可以在Hadoop集群上執行,當然這些API也都是基於Hadoop Streaming,因為不管是基於Windows的大數據集群,還是基於Linux的大數據集群,它們都是運行於JVM之上。至少在很長一段時間內.NET CLR都不能和JVM共同工作。(當下,微軟文檔中也明確提到使用Hadoop Streaming由於數據在JVM和其他運行環境如.NET CLR之間傳輸會導致性能損失,微軟也建議使用Java來編寫MR程序,文檔中的也有Java編寫MR的示例)

隨着微軟擁抱Linux,基於Windows的大數據集群不再被支持(HDP for Windows也不再有后續版本了),微軟也全面轉向基於Linux的大數據集群(包括部署在Azure中的HDInsights也都是運行在Linux系統之上),這些C#寫的API都不再被支持(主要原因還是這些基於.NET Framework的程序無法運行在Linux上,只能等未來.NET Core普及了)。

雖然這些API都已過時,但為了讓大家了解C#技術棧這么多年來掙扎在大數據邊緣的過程。下面對它們都進行了簡單的介紹。

Hadoop API for .NET

以下代碼示例主要來自CodePlex上那篇多年沒有更新的文章

Hadoop API for .NET是微軟推出的第一套用於Hadoop的.NET庫。Hadoop組件中Hadoop Streaming用來支持與其它語言協同工作完成MapReduce(按國際慣例下文縮寫為MR)任務的編寫。Hadoop API for .NET包裝了Hadoop Streaming方便使用.NET平台語言來編寫MR任務。
使用Hadoop API for .NET:

  • 可以更方便的提交MR任務,而不用通過Hadoop Streaming命令行
  • 提供了MapperReducerCombiner更好的包裝類作為基類,方便MR的編寫
  • 自動包含依賴的.NET程序集一起作為streaming任務提交
  • 提供StreamingUnit進行對MapperReducerCombiner的本地單元測試
  • 通過JSON序列化及反序列化,支持輸入輸出的強類型

Hadoop API for .NET支持的存儲包括HDFS和Azure Blob Storage。輸入內容的格式就是慣常的\n\r分割行(記錄),\r分割列。
如果既有Mapper又有Reducer則所需要的Key Value中的Key為純文本,以便通過StringComparison.Ordinal進行排序並存儲。
其它情況下記錄可以是任意結構化數據的文本表示,如字符串表示的JSON數據,如果是二進制數據(如docx文件),則記錄將是文件存儲的路徑。

時至今日,這其中大部分API都已被廢棄。

MR程序組成

一個.NET編寫的MR程序包含如下幾部分:

  • 任務定義,包括MapperTypeReducerTypeCombinerType 和其它設置
  • MapperReducerCombiner
  • 存儲於HDFS或Azure Blob Storage的輸入數據
  • 任務執行器。可以通過MRRunner.exe運行.NET MR任務,也可以在Main函數中調用HadoopJobExecutor

編寫的.NET MR任務(本示例僅有Mapper)

  1. 創建一個名為HadoopNET的C#控制台應用程序,查找名為Microsoft.Hadoop.MapReduce的Nuget包並安裝。這將向項目中添加Microsoft.Hadoop.MapReduce.dllMRRunner.exe等工具。

  2. 開始編寫Mapper。創建一個名為FirstMapper的類並實現MapperBase這個基類。

     public class FirstMapper : MapperBase
     {
         public override void Map(string inputLine, MapperContext context)
         {
             // 輸入
             int inputValue = int.Parse(inputLine);
             // 任務
             var sqrt = Math.Sqrt(inputValue);
             // 寫入輸出
             context.EmitKeyValue(inputValue.ToString(), sqrt.ToString());
         }
     }
    

VS或Resharper的重構功能會自動添加using Microsoft.Hadoop.MapReduce;這個引用

  1. 創建任務類FirstJob,該類實現HadoopJob<FirstMapper>

     public class FirstJob : HadoopJob<FirstMapper>
     {
         public override HadoopJobConfiguration Configure(ExecutorContext context)
         {
             HadoopJobConfiguration config = new HadoopJobConfiguration();
             config.InputPath = "input/SqrtJob";
             config.OutputFolder = "output/SqrtJob";
             return config;
         }
     }
    

配置測試運行環境

要運行這個示例,需要在Windows上安裝前面提到的Microsoft HDInsight Emulator for Windows Azure。這個工具需要通過Microsoft Web Platform Installer來安裝,搜索hdinsight,結果第一項就是我們要安裝的工具:

點擊添加 - 安裝,耐心等候個半小時(需要下載大約1.2G的文件)

這其中最重要的部分就是用於Windows平台的HDP(Hortonworks Data Platform for Windows),其所使用的是OpenJDK1.7的一個分支 - AZUL公司的Zulu。

安裝完成后,桌面會多出3個快捷方式:

  • Hadoop Command Line
  • Hadoop Name Node Status
  • Hadoop YARN Status

Hadoop Command Line是一個命令行的快捷方式,在這個控制台中我們可以直接使用如hadoop fs這樣的命令。默認安裝下控制台進入后目錄為C:\hdp\hadoop-2.4.0.2.1.3.0-1981hadoop等應用程序就在這個目錄下的bin目錄中。
這個目錄的上級目錄中(C:\hdp)包含了一些批處理及PowerShell腳本,我們就使用其中的start_local_hdp_services.cmdstop_local_hdp_services.cmd來起停這個Windows上的大數據集群。
運行start_local_hdp_services.cmd輸入下面的信息表示已經成功啟動。

starting zkServer
starting namenode
starting secondarynamenode
starting datanode
starting resourcemanager
starting timelineserver
starting nodemanager
starting jobhistoryserver
starting hiveserver2
starting metastore
starting derbyserver
starting templeton
starting oozieservice
Sent all start commands.
total services
13
running services
13

運行另外兩個快捷方式分別可以看到HDFS和YARN的狀態。
為了運行后面的程序,還需要給HDFS創建一個主目錄

hadoop fs -mkdir -p /user/username #username是當前登陸的Windows用戶名

創建測試數據並運行MR任務

為了運行程序,首先向HDFS中添加測試輸入數據。創建一個文本文件input.txt,每行一個數字,類似:

9
16
25
36
81

將該文件上傳到FisrtJob中配置的那個目錄:

hadoop fs -mkdir input
hadoop fs -mkdir input/SqrtJob #如果目錄不存在,先創建
hadoop fs -put input.txt input/SqrtJob/input.txt

剩下的就是編譯這個程序並通過MRRunner執行,我們測試項目的名字為HadoopNet,編譯后得到HadoopNet.exe。
通過代碼可以看到,我們沒有在Main函數中實現任何代碼,我們要介紹的第一種提交作業的方式是使用MRRunner來提交,將輸出的exe作為dll使用,運行下面的命令:

MRRunner.exe -dll HadoopNET.exe

不出意外的話,MR會開始執行。輸出中的前半部分部分內容是MR執行過程中的日志。最后會打印實際提交的Hadoop平台的命令,MRRunner的作用也就在於生成這樣的一條命令並將其提交到Hadoop Streaming來處理:

>>CMD: C:\hdp\hadoop-2.4.0.2.1.3.0-1981\bin\hadoop.cmd jar C:\hdp\hadoop-2.4.0.2.1.3.0-1981\share\hadoop\tools\lib\hadoop-streaming-2.4.0.2.1.3.0-1981.jar -D "mapred.reduce.tasks=0" -D "mapred.map.max.attempts=1" -D "mapred.reduce.max.attempts=1" -files "hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/HadoopNET.exe,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.Hadoop.MapReduce.dll,hdfs:///username/hystar/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.Hadoop.WebClient.dll,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Newtonsoft.Json.dll,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.Hadoop.MapDriver.exe,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.Hadoop.ReduceDriver.exe,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.Hadoop.CombineDriver.exe,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.WindowsAzure.Management.Framework.Threading.dll" -input hdfs:///user/hystar/input/SqrtJob -output hdfs:///user/hystar/output/SqrtJob -mapper Microsoft.Hadoop.MapDriver.exe -reducer NONE -cmdenv "MSFT_HADOOP_MAPPER_DLL=HadoopNET.exe" -cmdenv "MSFT_HADOOP_MAPPER_TYPE=HadoopNET.FirstMapper"

使用代碼來提交MR作業

另一個運行MR程序的方式就是使用代碼,我們在Main函數中添加如下代碼:

var hadoop = Hadoop.Connect();
hadoop.MapReduceJob.ExecuteJob<FirstJob>();

編譯生成HadoopNET.exe。這次我們不需要MRRunner可以直接運行Hadoop.exe。
執行過程和執行結果都與使用MRRunner一致。

注意這個測試用途的大數據平台,在MR任務輸出目錄已存在時會自動將其刪除,而不是像原生Hadoop平台那樣遇到輸出目錄存在時報錯處理。

Microsoft.Azure.Management.HDInsight.Job

后來,微軟技術路線改變,廢棄了Azure Service Manager (ASM)-based tools中一列些工具,包括命令行CLI,PowerShell 和HDInsight .NET SDK(上一小節介紹的Microsoft.WindowsAzure.Management.HDInsight和Microsoft.Hadoop.Client都被廢棄了)。全面轉向Azure Resource Manager所包含的一些列庫與工具。
雖然微軟總是變來變去,但這也是為了迎合開源社區變化,以及讓自身產品更有體系,更有可擴展性的無奈之舉。
新的工具中與MapReduce相關的庫為Microsoft.Azure.Management.HDInsight.Job下到一系列類。不同於之前的庫,新的庫只是提供一種通過.NET平台技術(包括C#與PowerShell)來提交MR任務的方法。而沒有再提供一些輔助的類用於編寫MR任務,新的MR任務編寫應該是借鑒了Python等利用Hadoop Streaming進行大數據處理的方法,將Map和Reduce分別編寫成可以獨立執行的程序,然后提交到Hadoop Streaming去執行。下文會給出一個示例。
使用.NET C#編寫的MR任務,需要基於Windows的Azure HDInsight環境來運行。這個目前沒有模擬器,所以我們使用真實環境來測試。

這里沒有使用世紀互聯運營的Azure,而是使用了Azure全球服務,地區選擇香港,可以使用大陸的手機號和大陸發行的Visa卡進行驗證即可順利激活1600HKD的試用額度。

登陸Azure控制台后,在左側導航欄選擇大象圖標的HDInsight集群菜單(默認情況下,需要點擊更多,在彈出的菜單中才能看到此項,可以點擊后面的星按鈕添加收藏,以使此菜單出現在第一屏中)。
新建集群,類型選擇Windows,可以自定義集群,在測試目的中選擇最低配置的集群結點(節省銀子,雖然是免費額度),如下圖。

按下圖所示選擇結點的配置

第一次使用Azure一般都需要創建新的存儲賬戶,這里的容器就相當於我們這個集群存儲的根目錄。在Azure中,使用Azure Storage Blob作為類似HDFS的存在。

最后的摘要頁面也有明確提示,從集群創建到被刪除這個過程中將會一直按照右下角顯示價格進行計費,無論是否運行任務。所以對於Azure HDInsight的新用戶記得用后要刪除是很重要的。另外Azure PowerShell和Azure .NET SDK都提供了使用代碼創建與刪除HDInsight集群的方法,方便將集群的創建,任務部署與集群刪除作為一系列自動化任務來完成。

點擊創建按鈕,集群創建工作隨之開始進行。

創建前,可以點擊下載模板鏈接,將集群創建參數作為模板保存,由於創建過程還是稍顯復雜,而有了這個模板就可以在本地通過Azure CLI,PowerShell或C#代碼來完成集群的創建。詳見此博文

這個過程大約持續20分鍾,直到儀表盤中的正在創建變成正在運行。
集群創建好后,可以進入集群的控制台看看:

除了首頁為Windows平台Azure HDInsights專有,其他頁面都是到YARN,HDFS(Azure Blob Storage)及Job History原有管理頁面的鏈接。通過各自的管理頁面,可以了解到MR任務的工作情況,數據存儲等。
有了運行環境,我們來實現兩個簡單的基於.NET的MR任務,分別創建名為NetMapper和NetReducer的兩個控制台應用程序。然后添加如下代碼:

這些代碼來自微軟官方示例

// Mapper
class NetMapper
{
    static void Main(string[] args)
    {
        if (args.Length > 0)
        {
            Console.SetIn(new StreamReader(args[0]));
        }

        string line;
        while ((line = Console.ReadLine()) != null)
        {
            Console.WriteLine(line);
        }
    }
}

// Reducer
class NetReducer
{
    static void Main(string[] args)
    {
        string line;
        var count = 0;

        if (args.Length > 0)
        {
            Console.SetIn(new StreamReader(args[0]));
        }

        while ((line = Console.ReadLine()) != null)
        {
            count += line.Count(cr => (cr == ' ' || cr == '\n'));
        }
        Console.WriteLine(count);
    }
}

將這兩個項目分別生成,得到NetMapper.exe與NetReducer.exe。
接着在解決方案中新建一個項目SubmitNet用於提交MR任務到Azure中,提交代碼如下:

class Program
{
    private static HDInsightJobManagementClient _hdiJobManagementClient;

    private const string ExistingClusterUri = "test-netcore.azurehdinsight.net";
    private const string ExistingClusterUsername = "admin"; //HDInsight集群默認用戶名就是admin
    private const string ExistingClusterPassword = "創建集群時設置的密碼";

    private const string DefaultStorageAccountName = "hdinsighthystar"; //存儲賬戶名
    private const string DefaultStorageAccountKey = "存儲賬戶Key";
    private const string DefaultStorageContainerName = "與HDInsight集群關聯的容器的名稱";

    static void Main(string[] args)
    {
        Console.WriteLine("The application is running ...");

        var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = ExistingClusterUsername, Password = ExistingClusterPassword };
        _hdiJobManagementClient = new HDInsightJobManagementClient(ExistingClusterUri, clusterCredentials);

        SubmitMRJob();

        Console.WriteLine("Press ENTER to continue ...");
        Console.ReadLine();
    }

    private static void SubmitMRJob()
    {
        var paras = new MapReduceStreamingJobSubmissionParameters
        {
            Files = new List<string>() { "/example/app/NetMapper.exe", "/example/app/NetReducer.exe" },
            Mapper = "NetMapper.exe",
            Reducer = "NetReducer.exe",
            Input= "/example/data/gutenberg/davinci.txt",
            Output = "/example/data/StreamingOutput/wc.txt"
        };

        Console.WriteLine("Submitting the MR job to the cluster...");
        var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceStreamingJob(paras);
        var jobId = jobResponse.JobSubmissionJsonResponse.Id;
        Console.WriteLine("Response status code is " + jobResponse.StatusCode);
        Console.WriteLine("JobId is " + jobId);

        Console.WriteLine("Waiting for the job completion ...");

        // Wait for job completion
        var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
        while (!jobDetail.Status.JobComplete)
        {
            Thread.Sleep(1000);
            jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
        }

        // Get job output
        var storageAccess = new AzureStorageAccess(DefaultStorageAccountName, DefaultStorageAccountKey,
            DefaultStorageContainerName);
        var output = (jobDetail.ExitValue == 0)
            ? _hdiJobManagementClient.JobManagement.GetJobOutput(jobId, storageAccess) // fetch stdout output in case of success
            : _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); // fetch stderr output in case of failure

        Console.WriteLine("Job output is: ");

        using (var reader = new StreamReader(output, Encoding.UTF8))
        {
            string value = reader.ReadToEnd();
            Console.WriteLine(value);
        }
    }
}

代碼改自文檔中提交Job的代碼,原文檔中使用MapReduceJobSubmissionParameters提交JVM平台語言編寫的jar文件。上面的例子使用MapReduceStreamingJobSubmissionParameters作為替換來提交Hadoop Streaming任務。
代碼中的存儲賬戶關聯的容器可以使用Azure PowerShell快速查看。使用Azure PowerShell需要首先安裝。在管理員提升的PowerShell窗口中執行:

Install-Module AzureRM

第一次使用Install-Module可能會提示需要安裝NuGet提供程序,按Y繼續就可以了。(PowerShell基於.NET,所以其擴展組件也被托管在NuGet上)
如果提示不受信任的存儲庫,也按Y繼續即可。
成功安裝基於PowerShell的Azure Resource Management工具后,在使用與賬戶相關的命令前都要先登錄:

Login-AzureRmAccount

按提示輸入相關信息,登陸成功后就可以執行查詢命令了。必須獲取HDInsight集群關聯的存儲賬戶。

Get-AzureRmHDInsightCluster -ClusterName "testmr"

返回信息中會包含如下內容,這就是我們想要的。

DefaultStorageAccount     : hdinsighthystar.blob.core.windows.net
DefaultStorageContainer   : testmr-2017-03-26t03-57-40-342z

程序准備完后,在提交任務前,我們還需要將兩個exe文件與輸入上傳到Azure Blob存儲中。
向Azure Blob存儲中拷貝文件有n多種方式,詳見此文檔。最直觀的方式是使用宇宙第一IDE - VS中的Azure插件。Azure插件更新較頻繁,請確保使用最新版的軟件。本文編寫時最新版本為Azure SDK for .NET 2.9.6。
安裝插件后,在Cloud Explorer中可以看到容器(在服務器資源管理器中Azure結點也能看到存儲賬戶的容器),右鍵快捷菜單中選擇查看容器。

打開容器瀏覽界面,點擊上傳按鈕打開上傳對話框,選擇要上傳的文件,並輸入路徑。
注意輸入路徑會自動創建文件夾,這也是使用此插件來創建文件夾的唯一方式。

點擊確定上傳即可。
其他向Blob上傳文件的方式還包括使用Azure CLI,AzCopy應用程序及Azure PowerShell。
另外由於Azure Blob存儲與HDFS兼容,也可以直接使用hadoop fs命令來操作Azure Blob存儲。
比如:

hadoop fs -put /home/testfile.txt wasbs://CONTAINER@ACCOUNT.blob.core.windows.net/testpath/

其中的容器名與賬戶名可以通過前文介紹的Azure PowerShell命令來獲取。
如果本地Windows中沒有安裝hadoop,可以通過遠程桌面連接到HDInsight集群,將文件復制到遠程機器,並在遠程桌面中使用hadoop導入文件到Azure Blob存儲。(遠程桌面環境下導入可以省略wasbs://CONTAINER@ACCOUNT.blob.core.windows.net,直接使用相對路徑)

一切准備妥當后,運行SubmitNet項目,就可以提交任務:

測試結束后要記得刪除集群:

刪除HDInsight集群,不會同時刪除存儲賬戶,存儲賬戶只要存在也會按照使用量計費,區域香港的話,每個容器每天要0.01港幣。
所以,存儲賬戶幾乎不占成本,可以保留,以后創建新集群時直接選擇現有存儲賬戶的容器,可以直接執行其中的任務,查看其中的文件。

基於Windows的Azure HDInsight將於未來幾個月內停止服務。但我們上面介紹的內容只要做一定就該就可以遷移到基於Linux的HDInsight中。首先,提交任務部分的代碼是幾乎不需要變(需要變的是指定Map和Reduce任務的參數,后文有說明),只要我們還在Windows系統中提交任務即可。使用.NET編寫的Map和Reduce程序需要進行一定修改以便可以在.NET Core下運行。這個在下一小節有詳細介紹。

.NET Core

隨着.NET Core的日益完善,基於.NET Core開發可以讓C# MapReduce程序運行在基於Linux的大數據集群中的程序已經成為可能。在Azure HDInsight(基於Linux)中(Ubuntu16.04)甚至都已經預裝了.NET Core。參照上一小節的方法,我們可以讓.NET Core編寫的MR任務運行在基於Linux的HDInsight中。
首先我們在Azure中創建一個集群基於Linux的HDInsight集群。Linux版的HDInsight使用Ambari作為門戶頁面(關於Ambari的介紹,官方文檔在此),而沒有了像Windows版中那樣一個頁面,同時為了安全考慮YARN,JobHistory及存儲的Web管理界面也沒有直接暴露出來。訪問這些頁面需要使用SSH與Linux集群建立隧道,這個道理和我們平時用SSH搭梯子翻牆是一樣的(官方文檔在此)。
首先做一點准備工作,使用https://[cluster-name].azurehdinsight.net這個地址(或直接在集群主頁點擊儀表盤或Ambari視圖)打開Ambari,登陸后,查找並記錄下headnode0主機的地址。

如圖,Hosts標簽頁中名稱hn0開頭的就是我們要找的結點。點開查看詳情:

記錄下這個ip,后面可以直接使用ip來登陸這個結點。

樓主一般比較喜歡用SecureCRT作為終端,使用BitviseSSH來搭梯子。(雖然它們各自都可以完成另一方的工作,但個人還是喜歡這樣搭配使用)
使用BitviseSSH建立與HDInsight Linux集群的通道設置方式見下圖:

幾個注意的地方,SSH Host的地址不同於集群門戶頁面地址,其中多了個-ssh,默認ssh用戶用戶名為sshuser(在創建集群時可以更改)。第二圖可以不設置,這樣設置可以讓我們的BitviseSSH功能保持單一,就是進行轉發。第三圖最終要的就是端口號。鏈接成功后,我們可以在Firefox中使用FoxProxy等類似代理軟件配置轉發。
配置好瀏覽器轉發后,我們打開之前記錄的ip的8080端口。由於我們通過SSH建立了隧道,所以這個ip會被正確的路由。

注意:某些情況下,使用headnode0無法訪問,可以嘗試使用headnode1。

如果一切正常,會再次彈出Ambari的登陸框,登陸后,在這個新的Ambari登陸中的HDFS或YARN界面中的Quick Links菜單中就可以找到我們需要的那些管理網站入口。如圖:

HDP2.7版本之后的HDInsight集群,ResourceMananger等管理頁面貌似可以直接訪問而不再需要上面的限制。

下面大致來介紹如果在基於Linux的集群上運行基於.NET Core的App。我們還是使用上一部分的Map和Reduce程序的代碼。為了使用.NET Core的Framework,需要進行一些小小的改造。
新建兩個.NET Core控制台應用程序,分別命名為NetCoreMapperNetCoreReducer。其各自的Main方法如下:

namespace NetCoreMapper
{
    public class Program
    {
        public static void Main(string[] args)
        {
            if (args.Length > 0)
            {
                Stream stream = new FileStream(args[0], FileMode.Open, FileAccess.Read, FileShare.Read);
                Console.SetIn(new StreamReader(stream));
            }

            string line;
            while ((line = Console.ReadLine()) != null)
            {
                Console.WriteLine(line);
            }

        }
    }
}
namespace NetCoreReducer
{
    public class Program
    {
        static void Main(string[] args)
        {
            ILoggerFactory loggerFactory = new LoggerFactory()
                .AddConsole()
                .AddDebug();
            ILogger logger = loggerFactory.CreateLogger<Program>();
            logger.LogInformation(
                "This is a test of the emergency broadcast system.");

            string line;
            var count = 0;

            if (args.Length > 0)
            {
                Console.SetIn(new StreamReader(new FileStream(args[0], FileMode.Open, FileAccess.Read)));
            }

            while ((line = Console.ReadLine()) != null)
            {
                count += line.Count(cr => (cr == ' ' || cr == '\n'));
            }
            Console.WriteLine(count);
        }
    }
}

使用命令分別生成項目:

dotnet publish --framework netcoreapp1.0 --configuration release --output publish

把兩個項目的輸出放到一個文件夾中,按上面的配置,應該最終得到8個文件,其中兩個pdb文件可以安全刪除。

然后,我們將這些文件上傳到Azure Blob Storage與HDInsight集群對應的容器中的/example/coreapp目錄下(如果沒有這個目錄請先新建,如果不使用這個目錄,在下文的提交任務的代碼中要把程序路徑換成相應的路徑)

接下來需要修改下提交任務的代碼:

private static void SubmitMRJob()
{
    var paras = new MapReduceStreamingJobSubmissionParameters
    {
        Files = new List<string>()
        {
            "/example/coreapp",
        },
        Mapper = "dotnet coreapp/NetCoreMapper.dll",
        Reducer = "dotnet coreapp/NetCoreReducer.dll",
        Input = "/example/data/gutenberg/davinci.txt",
        Output = "/example/data/StreamingOutput/wc.txt"

    };

    Console.WriteLine("Submitting the MR job to the cluster...");
    var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceStreamingJob(paras);
    var jobId = jobResponse.JobSubmissionJsonResponse.Id;
    Console.WriteLine("Response status code is " + jobResponse.StatusCode);
    Console.WriteLine("JobId is " + jobId);

    Console.WriteLine("Waiting for the job completion ...");

    // Wait for job completion
    var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
    while (!jobDetail.Status.JobComplete)
    {
        Thread.Sleep(1000);
        jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
    }

    // Get job output
    var storageAccess = new AzureStorageAccess(DefaultStorageAccountName, DefaultStorageAccountKey,
        DefaultStorageContainerName);
    var output = (jobDetail.ExitValue == 0)
        ? _hdiJobManagementClient.JobManagement.GetJobOutput(jobId, storageAccess) // fetch stdout output in case of success
        : _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); // fetch stderr output in case of failure

    Console.WriteLine("Job output is: ");

    using (var reader = new StreamReader(output, Encoding.UTF8))
    {
        string value = reader.ReadToEnd();
        Console.WriteLine(value);
    }
}

尤其需要注意的是FilesMapperReducer這三個屬性中路徑的寫法,只有這樣MR任務才能成功執行。不要問我怎么知道的,說多了都是淚。
運行任務提交程序,當看到控制台如下輸出時表示任務已經成功提交並執行完畢。

否則,控制台會輸出錯誤。
下面總結一下樓主遇到的錯誤以及MR任務查找錯誤原因的方法。
查找錯誤原因,可以訪問ResourceManager管理界面(地址一般為headnode:8088),在界面中找到最近出錯的任務,如下圖:

點擊進入任務詳情界面:

點擊“任務歷史”鏈接進入HistoryServer的UI界面:

在任務歷史中,點擊圖片中標出的Url,可以進入失敗的Map或Reduce任務的列表界面:

點擊Logs鏈接可以看到任務的詳情,從中一定可以找到任務失敗的原因。
下面兩圖就是樓主因為在SubmitMRJob方法中沒有以正確的方式填充Files屬性所報的錯。

當時,樓主將.NET Core程序所輸出的每個文件作為Files屬性數組的一條進行提交,導致如圖,程序文件被傳到Blob后被放在不同的臨時目錄中,從而導致運行程序失敗。
另外樓主遇到的問題還包括,目前Azure HDInsight預裝的.NET Core版本較低(1.0.1),而使用VS2017來生成.NETCore App,即使所選的Target Framework為netcoreapp1.0,生成出來的程序也是1.0.4版,直接放到集群的主機上執行會報如下錯誤。

在使用Hadoop執行前,先直接執行下程序看看是否可以正確啟動運行是一種很好的排除錯誤保證成功率的方法。

The specified framework 'Microsoft.NETCore.App', version '1.0.4' was not found.

  • Check application dependencies and target a framework version installed at:
    /usr/share/dotnet/shared/Microsoft.NETCore.App
  • The following versions are installed:
    1.0.1
  • Alternatively, install the framework version '1.0.4'.

上面的報錯可以清楚的了解到,HDInsight集群的主機安裝了.NETCore但是版本只有1.0.1,而VS2017生成的.NETCore App至少需要1.0.4版本的運行時。
按照官方文檔的說明,卸載舊版並安裝新版。安裝后繼續測試程序是否可以在shell里直接運行知道成功。
特別注意,我們需要在實際干活的結點,即NodeManager運行的結點,安裝適當的.NET Core,因為這些結點是實際運行.NET Core程序的結點。參考前文圖片,這些結點以wn開頭,我們可以在建立隧道后通過IP ssh到這些主機並安裝.NET Core以及測試。

如果不方便使用Azure HDInsight,來進行測試,使用HDP沙盒也同樣可以。
唯一不便的就是不能使用Microsoft.Azure.Management.HDInsight.Job來方便的提交任務,而需要自行使用hadoop CLI來完成,但這對於我們測試.NET Core編寫的業務邏輯沒有影響。
關於這個話題詳見這篇博文

微軟4月份更新版的Azure在線文檔中介紹了使用Mono在基於Linux的HDP集群上運行C#編寫的MapReduce的方法,具體文章鏈接(這個連接可能在將來隨時發生變化)。

最后

本文及后續文章都是對於將微軟技術用於大數據可能性的一種探討,歡迎大家一起討論。

本文示例相關示例在此


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM