spark安裝和使用


local模式

概述

local模式就是在一台計算機上運行spark程序,通常用於在本機上練手和測試,它將線程映射為worker。   

1)local: 所有計算都運行在一個線程當中,沒有任何並行計算,通常我們在本機執行一些測試代碼,或者練手,就用這種模式;

2)local[K]: 指定使用幾個線程來運行計算,比如local[4]就是運行4個Worker線程。通常我們的Cpu有幾個Core,就指定幾個線程,最大化利用Cpu的計算能力;

3)local[*]: 這種模式直接幫你按照Cpu最多Cores來設置線程數了

 

安裝使用

 

1)上傳並解壓spark安裝包

 

tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/

 

2) 官方求PI案例

 

bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--executor-memory 1G \

--total-executor-cores 2 \

./examples/jars/spark-examples_2.11-2.1.1.jar \

100

 
        

 

該算法是利用蒙特·卡羅算法求PI,結果如下:

 

 

 

 

standalone模式

構建一個由Master+Slave構成的Spark集群,采用Spark原生的資源管理器,Spark運行在集群中。

 

Yarn模式

概述:

Spark客戶端直接連接Yarn,不需要額外構建Spark集群。有yarn-client和yarn-cluster兩種模式,主要區別在於:Driver程序的運行節點。

yarn-client:主程序邏輯運行在本地,任務運行在Yarn集群中

yarn-cluster:APPMaster;主程序邏輯和任務都運行在Yarn集群中。適用於生產環境。

 

安裝使用:

1)修改hadoop配置文件yarn-site.xml,添加如下內容:

 <!--是否啟動一個線程檢查每個任務正使用的物理內存量,如果任務超出分配值,則直接將其殺掉,默認是true -->
        <property>
                <name>yarn.nodemanager.pmem-check-enabled</name>
 <value>false</value>
        </property>
        <!--是否啟動一個線程檢查每個任務正使用的虛內存量,如果任務超出分配值,則直接將其殺掉,默認是true -->
        <property>
                <name>yarn.nodemanager.vmem-check-enabled</name>
                <value>false</value>
        </property>

 

2)分發配置文件

xsync /opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml

 

3)修改spark-env.sh,添加如下配置:

YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop

 

4)執行一個程序

$ bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100

注意:

1.運行程序之前要確保hdfs和yarn已經正常啟動

2.修改yarn的配置文件之后要重啟yarn以讓配置文件生效

3.在yarn模式下,是否需要在所有的結點都部屬spark程序包?不需要,因為是運行在yarn上,資源管理和調度是由yarn負責的。只需要在其中一個結點提供部署spark的jar包,並通過driver提交作業到yarn集群。

 

spark-shell

啟動spark-shell的正確姿勢是:進入到spark的底層目錄,輸入bin/spark-shell 

關閉spark-shell的正確姿勢是::quit,注意冒號

其部分參數如下(非常類似於下文的spark-submit,均可通過--help參數來獲取):

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).

 

 

spark-submit

基本語法:
bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

 

\是分隔符;

 

--表示這個選項是可有可無,而且順序是可變換的

 

--master 指定Master的地址,默認為Local。

  如果是yarn模式,則是yarn

  如果是standalone模式,則是spark://master:port

  如果是local模式,則是local,local[n](n表示核數),local[*](*表示按照cpu核數來設定線程數)

 

--class: 你的應用的啟動類 (如 org.apache.spark.examples.SparkPi)

--deploy-mode: 是否發布你的驅動到worker節點(cluster) 或者作為一個本地客戶端 (client) (default: client)*

 

--conf: 任意的Spark配置屬性, 格式key=value. 如果值包含空格,可以加引號“key=value”

application-jar: 打包好的應用jar,包含依賴. 這個URL在集群中全局可見。 比如hdfs:// 共享存儲系統, 如果是 file:// path, 那么所有的節點的path都包含同樣的jar

application-arguments: 傳給main()方法的參數

--executor-memory 1G 指定每個executor可用內存為1G

--total-executor-cores 2 指定每個executor使用的cup核數為2個

 

利用idea開發spark程序

Spark Shell僅在測試和驗證我們的程序時使用的較多,在生產環境中,通常會在IDE中編制程序,然后打成jar包,然后提交到集群,最常用的是創建一個Maven項目,利用Maven來管理jar包的依賴。

1) 創建一個Maven項目WordCount並導入依賴

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
    </dependency>
</dependencies>
<build>
        <finalName>WordCount</finalName>
        <plugins>
<plugin>
                <groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                       <goals>
                          <goal>compile</goal>
                          <goal>testCompile</goal>
                       </goals>
                    </execution>
                 </executions>
            </plugin>
        </plugins>
</build>

2)編寫代碼

package com.atguigu

import org.apache.spark.{SparkConf, SparkContext}

object WordCount{

  def main(args: Array[String]): Unit = {

//1.創建SparkConf並設置App名稱
    val conf = new SparkConf().setAppName("WC")

//2.創建SparkContext,該對象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.使用sc創建RDD並執行相應的transformation和action
    sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).sortBy(_._2, false).saveAsTextFile(args(1))

//4.關閉連接
    sc.stop()
  }
}

3)打包插件

              <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                    <manifest>
                            <mainClass>Hello</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
      </plugin>

4)打包到集群測試

bin/spark-submit \
--class WordCount \
--master spark://hadoop102:7077 \
WordCount.jar \
/word.txt \
/out

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM