What is HDInsight?
Microsoft Azure HDInsight 是基於 Hortonoworks Data Platform (HDP) 的 Hadoop 集群,包括Storm, HBase, Pig, Hive, Sqoop, Oozie, Ambari等(具體的組件請參看最后的附錄)。Azure HDInsight 支持 Windows的集群部署,也支持 Linux 集群部署。Hortonworks 是我目前所知唯一支持在 Windows 上部署的 Hadoop Cluster。
以下是 HDInsight 在兩個平台上部署的比較:
Category |
Hadoop on Linux |
Hadoop on Windows |
Cluster OS |
Ubuntu 12.04 Long Term Support (LTS) |
Windows Server 2012 R2 |
Cluster Type |
Hadoop |
Hadoop, HBase, Storm |
Deployment |
Azure Management Portal, Azure CLI, Azure PowerShell |
Azure Management Portal, Azure CLI, Azure PowerShell, HDInsight .NET SDK |
Cluster UI |
Ambari |
Cluster Dashboard |
Remote Access |
Secure Shell (SSH) |
Remote Desktop Protocol (RDP) |
What is Spark?
Spark 是基於內存計算的大數據並行計算框架,快如閃電的大數據分析工具。Spark 於2009年誕生於加州大學伯克利分校 AMP Lab,目前已是 Apache 軟件基金旗下的頂級開源項目。Spark支持Python、Java和Scala編程語言。您無需是專家級的編程者即可從 Spark 中受益。
Spark本身用Scala語言編寫,運行於Java虛擬機(JVM)。只要在安裝了Java 6以上版本的便攜式計算機或者集群。如果您想使用Python API需要安裝Python解釋器(2.6或者更高版本),請注意Spark暫不支持Python 3。
Which version of Spark can I install?
In this topic, we use a Script Action custom script to install Spark on an HDInsight cluster. This script can install Spark 1.2.0 or Spark 1.0.2 depending on the version of the HDInsight cluster you provision.
-
If you use the script while provisioning an HDInsight 3.2 cluster, it installs Spark 1.2.0.
-
If you use the script while provisioning an HDInsight 3.1 cluster, it installs Spark 1.0.2.
You can modify this script or create your own script to install other versions of Spark.
Using the Spark shell to run interactive queries
Perform the following steps to run Spark queries from an interactive Spark shell. In this section, we run a Spark query on a sample data file (/example/data/gutenberg/davinci.txt) that is available on HDInsight clusters by default.
-
From the Azure portal, enable Remote Desktop for the cluster you created with Spark installed, and then remote into the cluster. For instructions, see Connect to HDInsight clusters using RDP.
-
In the Remote Desktop Protocol (RDP) session, from the desktop, open the Hadoop command line (from a desktop shortcut), and navigate to the location where Spark is installed; for example, C:\apps\dist\spark-1.2.0.
-
Run the following command to start the Spark shell:
.\bin\spark-shell --master yarn
After the command finishes running, you should get a Scala prompt:
scala>
-
On the Scala prompt, enter the Spark query shown below. This query counts the occurrence of each word in the davinci.txt file that is available at the /example/data/gutenberg/ location on the Azure Blob storage associated with the cluster.
val file = sc.textFile("/example/data/gutenberg/davinci.txt")
val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts.toArray().foreach(println)
-
The output should resemble the following:
-
Enter :q to exit the Scala prompt.
:q
Spark核心概念
現在您已經在shell中運行了第一個Spark代碼,是時候開始學習更深入的編程了。
每一個Spark應用程序都包含在集群上運行各種並行操作的驅動程序,驅動程序包含應用程序的主函數和定義在集群上的分布式數據集。在前面的示例中,驅動程序是Spark shell本身,您只需輸入您想要執行的操作即可。
驅動程序通過 SparkContext 對象訪問Spark計算集群。在shell中,SparkContext被自動創建為名稱是sc的變量,在示例1-1中我們輸入sc,則shell顯示其類型。
Example 1-1. Examining the sc variable
>>> sc
<pyspark.context.SparkContext object at 0x1025b8f90>
在創建了SparkContext對象之后,您就可創建RDD。在示例2-1和示例2-2中,我們調用 sc.textFile() 創建RDD,以變量lines記錄讀入的文本文件內容。
若要運行這些操作,驅動程序通常管理者多個擁有 executor的工作節點。比如,我們在集群中執行count()操作,不同的機器可能計算lines變量不同的部分。我們只在本地運行Spark shell,則它被執行在單機中,如果我們將shell連接至集群它也可並行的分析數據。示例1-1展示如何將Spark執行在集群之上。
圖1-1. Components for distributed execution in Spark
Spark 的 API 很大程度上依靠在驅動程序里傳遞函數到集群上運行。比如,我們擴展上面的README示例,篩選文本中包含的特定關鍵詞"Python",代碼如示例1-2(Python),示例1-3(Scala)。
示例1-2 Python filtering example
>>> lines = sc.textFile("README.md")
>>> pythonLines = lines.filter(lambda line: "Python" in line)
>>> pythonLines.first() u'## Interactive Python Shell'
Example 1-3. Scala filtering example
scala> val lines = sc.textFile("README.md") // Create an RDD called lines lines: spark.RDD[String] = MappedRDD[...]
scala> val pythonLines = lines.filter(line => line.contains("Python")) pythonLines: spark.RDD[String] = FilteredRDD[...]
scala> pythonLines.first() res0: String = ## Interactive Python Shell
Spark傳遞函數 如果您不熟悉示例1-2和1-3中的 lambda表達式 或者 => 語法,那么在此說明其實它是在Python和Scala中的定義內聯函數的簡短寫法。如果您在Spark中使用這些語言,您可定義函數然后將其名稱傳遞給Spark。比如,在Python語言中: def hasPython(line): return "Python" in line pythonLines = lines.filter(hasPython)
Spark傳遞函數也支持Java語言,但在此情況下傳遞函數被定義為類,實現調用函數的接口。比如: JavaRDD<String> pythonLines = lines.filter( new Function<String, Boolean>() { Boolean call(String line) { return line.contains("Python"); } } ); Java 8 中介紹了調用了lambda的的簡短寫法,與Python和Scala很類似。 JavaRDD<String> pythonLines = lines.filter(line -> line.contains("Python"));
We discuss passing functions further in "Passing Functions to Spark" on page 30. 我們在30頁的"Spark傳遞函數"中深入討論傳遞函數。 |
Spark API包含許多魅力無窮的基於函數的操作可基於集群並行計算,比如篩選(filter)操作,我們在后面的文章詳細介紹。Spark自動將您的函數傳遞給執行(executor)節點。因此,您可在單獨的驅動程序中編寫代碼,它會自動的在多個節點中運行。
What are the Hadoop components?
In addition to the previous overall configurations, the following individual components are also included on HDInsight clusters.
-
Ambari: Cluster provisioning, management, and monitoring.
-
Avro (Microsoft .NET Library for Avro): Data serialization for the Microsoft .NET environment.
-
Hive & HCatalog: Structured Query Language (SQL)-like querying, and a table and storage management layer.
-
Mahout: Machine learning.
-
MapReduce and YARN: Distributed processing and resource management.
-
Oozie: Workflow management.
-
Phoenix: Relational database layer over HBase.
-
Pig: Simpler scripting for MapReduce transformations.
-
Sqoop: Data import and export.
-
Tez: Allows data-intensive processes to run efficiently at scale.
-
ZooKeeper: Coordination of processes in distributed systems.